123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313 |
- """
- Schema工作流编排器使用示例
- 演示如何使用SchemaWorkflowOrchestrator执行完整的工作流程
- """
- import asyncio
- import sys
- import os
- from pathlib import Path
- # 添加项目根目录到Python路径
- sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- from schema_tools.schema_workflow_orchestrator import SchemaWorkflowOrchestrator
- from schema_tools.utils.logger import setup_logging
- async def example_complete_workflow():
- """完整工作流程示例"""
- print("=" * 60)
- print("完整工作流程示例")
- print("=" * 60)
-
- # 设置日志
- setup_logging(verbose=True)
-
- # 配置参数
- db_connection = "postgresql://user:password@localhost:5432/test_db"
- table_list_file = "schema_tools/tables.txt"
- business_context = "高速公路服务区管理系统"
- db_name = "highway_db"
- output_dir = "./example_output"
-
- try:
- # 创建工作流编排器
- orchestrator = SchemaWorkflowOrchestrator(
- db_connection=db_connection,
- table_list_file=table_list_file,
- business_context=business_context,
- db_name=db_name,
- output_dir=output_dir,
- enable_sql_validation=True,
- enable_llm_repair=True,
- modify_original_file=True
- )
-
- print(f"🚀 开始执行完整工作流程...")
- print(f"📁 输出目录: {output_dir}")
- print(f"🏢 业务背景: {business_context}")
- print(f"💾 数据库: {db_name}")
-
- # 执行完整工作流程
- report = await orchestrator.execute_complete_workflow()
-
- # 打印详细摘要
- orchestrator.print_final_summary(report)
-
- # 分析结果
- if report["success"]:
- print(f"\n🎉 工作流程执行成功!")
-
- # 显示各步骤详情
- results = report["processing_results"]
-
- if "ddl_md_generation" in results:
- ddl_md = results["ddl_md_generation"]
- print(f"📋 步骤1 - DDL/MD生成:")
- print(f" 处理表数: {ddl_md.get('processed_successfully', 0)}")
- print(f" 生成文件: {ddl_md.get('files_generated', 0)}")
- print(f" 耗时: {ddl_md.get('duration', 0):.2f}秒")
-
- if "question_sql_generation" in results:
- qs = results["question_sql_generation"]
- print(f"🤖 步骤2 - Question-SQL生成:")
- print(f" 生成主题: {qs.get('total_themes', 0)}")
- print(f" 成功主题: {qs.get('successful_themes', 0)}")
- print(f" 问答对数: {qs.get('total_questions', 0)}")
- print(f" 耗时: {qs.get('duration', 0):.2f}秒")
-
- if "sql_validation" in results:
- validation = results["sql_validation"]
- print(f"🔍 步骤3 - SQL验证:")
- print(f" 原始SQL数: {validation.get('original_sql_count', 0)}")
- print(f" 有效SQL数: {validation.get('valid_sql_count', 0)}")
- print(f" 成功率: {validation.get('success_rate', 0):.1%}")
- print(f" 耗时: {validation.get('duration', 0):.2f}秒")
-
- outputs = report["final_outputs"]
- print(f"\n📄 最终输出:")
- print(f" 主要文件: {outputs['primary_output_file']}")
- print(f" 问题总数: {outputs['final_question_count']}")
-
- else:
- print(f"\n❌ 工作流程执行失败:")
- error = report["error"]
- print(f" 失败步骤: {error['failed_step']}")
- print(f" 错误信息: {error['message']}")
-
- # 显示已完成的步骤
- completed = report["workflow_summary"]["completed_steps"]
- if completed:
- print(f" 已完成步骤: {', '.join(completed)}")
-
- except Exception as e:
- print(f"\n❌ 示例执行失败: {e}")
- import traceback
- traceback.print_exc()
- async def example_skip_validation():
- """跳过验证的工作流程示例"""
- print("=" * 60)
- print("跳过验证的工作流程示例")
- print("=" * 60)
-
- # 设置日志
- setup_logging(verbose=True)
-
- # 配置参数(跳过SQL验证)
- db_connection = "postgresql://user:password@localhost:5432/test_db"
- table_list_file = "schema_tools/tables.txt"
- business_context = "电商系统"
- db_name = "ecommerce_db"
- output_dir = "./example_output_no_validation"
-
- try:
- # 创建工作流编排器(跳过验证)
- orchestrator = SchemaWorkflowOrchestrator(
- db_connection=db_connection,
- table_list_file=table_list_file,
- business_context=business_context,
- db_name=db_name,
- output_dir=output_dir,
- enable_sql_validation=False, # 跳过SQL验证
- enable_llm_repair=False,
- modify_original_file=False
- )
-
- print(f"🚀 开始执行工作流程(跳过验证)...")
-
- # 执行工作流程
- report = await orchestrator.execute_complete_workflow()
-
- # 打印摘要
- orchestrator.print_final_summary(report)
-
- print(f"\n📊 执行结果:")
- print(f" 成功: {'是' if report['success'] else '否'}")
- print(f" 完成步骤数: {len(report['workflow_summary']['completed_steps'])}")
- print(f" 总耗时: {report['workflow_summary']['total_duration']}秒")
-
- except Exception as e:
- print(f"\n❌ 示例执行失败: {e}")
- async def example_error_handling():
- """错误处理示例"""
- print("=" * 60)
- print("错误处理示例")
- print("=" * 60)
-
- # 设置日志
- setup_logging(verbose=True)
-
- # 故意使用错误的配置来演示错误处理
- db_connection = "postgresql://invalid:invalid@localhost:5432/invalid_db"
- table_list_file = "nonexistent_tables.txt"
- business_context = "测试系统"
- db_name = "test_db"
- output_dir = "./example_error_output"
-
- try:
- # 创建工作流编排器
- orchestrator = SchemaWorkflowOrchestrator(
- db_connection=db_connection,
- table_list_file=table_list_file,
- business_context=business_context,
- db_name=db_name,
- output_dir=output_dir
- )
-
- print(f"🚀 开始执行工作流程(故意触发错误)...")
-
- # 执行工作流程
- report = await orchestrator.execute_complete_workflow()
-
- # 分析错误报告
- if not report["success"]:
- print(f"\n🔍 错误分析:")
- error = report["error"]
- print(f" 错误类型: {error['type']}")
- print(f" 错误信息: {error['message']}")
- print(f" 失败步骤: {error['failed_step']}")
-
- # 显示部分结果
- partial = report.get("partial_results", {})
- if partial:
- print(f" 部分结果: {list(partial.keys())}")
-
- except Exception as e:
- print(f"\n❌ 预期的错误: {e}")
- print("这是演示错误处理的正常情况")
- def show_usage_examples():
- """显示使用示例"""
- print("=" * 60)
- print("SchemaWorkflowOrchestrator 使用示例")
- print("=" * 60)
-
- examples = [
- {
- "title": "1. 编程方式 - 完整工作流程",
- "code": """
- import asyncio
- from schema_tools.schema_workflow_orchestrator import SchemaWorkflowOrchestrator
- async def run_complete_workflow():
- orchestrator = SchemaWorkflowOrchestrator(
- db_connection="postgresql://user:pass@localhost:5432/dbname",
- table_list_file="tables.txt",
- business_context="高速公路服务区管理系统",
- db_name="highway_db",
- output_dir="./output"
- )
-
- # 一键执行完整流程
- report = await orchestrator.execute_complete_workflow()
-
- if report["success"]:
- print(f"✅ 编排完成!最终生成 {report['final_outputs']['final_question_count']} 个问答对")
- print(f"📄 输出文件: {report['final_outputs']['primary_output_file']}")
- else:
- print(f"❌ 编排失败: {report['error']['message']}")
- asyncio.run(run_complete_workflow())
- """
- },
- {
- "title": "2. 命令行方式 - 完整工作流程",
- "code": """
- python -m schema_tools.schema_workflow_orchestrator \\
- --db-connection "postgresql://user:pass@localhost:5432/dbname" \\
- --table-list tables.txt \\
- --business-context "高速公路服务区管理系统" \\
- --db-name highway_db \\
- --output-dir ./output
- """
- },
- {
- "title": "3. 命令行方式 - 跳过验证",
- "code": """
- python -m schema_tools.schema_workflow_orchestrator \\
- --db-connection "postgresql://user:pass@localhost:5432/dbname" \\
- --table-list tables.txt \\
- --business-context "电商系统" \\
- --db-name ecommerce_db \\
- --skip-validation
- """
- },
- {
- "title": "4. 命令行方式 - 禁用LLM修复",
- "code": """
- python -m schema_tools.schema_workflow_orchestrator \\
- --db-connection "postgresql://user:pass@localhost:5432/dbname" \\
- --table-list tables.txt \\
- --business-context "管理系统" \\
- --db-name management_db \\
- --disable-llm-repair \\
- --verbose
- """
- }
- ]
-
- for example in examples:
- print(f"\n{example['title']}:")
- print(example['code'])
- async def main():
- """主函数"""
- print("Schema工作流编排器使用示例")
- print("请选择要运行的示例:")
- print("1. 完整工作流程示例")
- print("2. 跳过验证的工作流程示例")
- print("3. 错误处理示例")
- print("4. 显示使用示例代码")
- print("0. 退出")
-
- try:
- choice = input("\n请输入选择 (0-4): ").strip()
-
- if choice == "1":
- await example_complete_workflow()
- elif choice == "2":
- await example_skip_validation()
- elif choice == "3":
- await example_error_handling()
- elif choice == "4":
- show_usage_examples()
- elif choice == "0":
- print("退出示例程序")
- else:
- print("无效选择")
-
- except KeyboardInterrupt:
- print("\n\n用户中断,退出程序")
- except Exception as e:
- print(f"\n示例执行失败: {e}")
- if __name__ == "__main__":
- asyncio.run(main())
|