workflow_example.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. """
  2. Schema工作流编排器使用示例
  3. 演示如何使用SchemaWorkflowOrchestrator执行完整的工作流程
  4. """
  5. import asyncio
  6. import sys
  7. import os
  8. from pathlib import Path
  9. # 添加项目根目录到Python路径
  10. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  11. from schema_tools.schema_workflow_orchestrator import SchemaWorkflowOrchestrator
  12. from schema_tools.utils.logger import setup_logging
  13. async def example_complete_workflow():
  14. """完整工作流程示例"""
  15. print("=" * 60)
  16. print("完整工作流程示例")
  17. print("=" * 60)
  18. # 设置日志
  19. setup_logging(verbose=True)
  20. # 配置参数
  21. db_connection = "postgresql://user:password@localhost:5432/test_db"
  22. table_list_file = "schema_tools/tables.txt"
  23. business_context = "高速公路服务区管理系统"
  24. db_name = "highway_db"
  25. output_dir = "./example_output"
  26. try:
  27. # 创建工作流编排器
  28. orchestrator = SchemaWorkflowOrchestrator(
  29. db_connection=db_connection,
  30. table_list_file=table_list_file,
  31. business_context=business_context,
  32. db_name=db_name,
  33. output_dir=output_dir,
  34. enable_sql_validation=True,
  35. enable_llm_repair=True,
  36. modify_original_file=True
  37. )
  38. print(f"🚀 开始执行完整工作流程...")
  39. print(f"📁 输出目录: {output_dir}")
  40. print(f"🏢 业务背景: {business_context}")
  41. print(f"💾 数据库: {db_name}")
  42. # 执行完整工作流程
  43. report = await orchestrator.execute_complete_workflow()
  44. # 打印详细摘要
  45. orchestrator.print_final_summary(report)
  46. # 分析结果
  47. if report["success"]:
  48. print(f"\n🎉 工作流程执行成功!")
  49. # 显示各步骤详情
  50. results = report["processing_results"]
  51. if "ddl_md_generation" in results:
  52. ddl_md = results["ddl_md_generation"]
  53. print(f"📋 步骤1 - DDL/MD生成:")
  54. print(f" 处理表数: {ddl_md.get('processed_successfully', 0)}")
  55. print(f" 生成文件: {ddl_md.get('files_generated', 0)}")
  56. print(f" 耗时: {ddl_md.get('duration', 0):.2f}秒")
  57. if "question_sql_generation" in results:
  58. qs = results["question_sql_generation"]
  59. print(f"🤖 步骤2 - Question-SQL生成:")
  60. print(f" 生成主题: {qs.get('total_themes', 0)}")
  61. print(f" 成功主题: {qs.get('successful_themes', 0)}")
  62. print(f" 问答对数: {qs.get('total_questions', 0)}")
  63. print(f" 耗时: {qs.get('duration', 0):.2f}秒")
  64. if "sql_validation" in results:
  65. validation = results["sql_validation"]
  66. print(f"🔍 步骤3 - SQL验证:")
  67. print(f" 原始SQL数: {validation.get('original_sql_count', 0)}")
  68. print(f" 有效SQL数: {validation.get('valid_sql_count', 0)}")
  69. print(f" 成功率: {validation.get('success_rate', 0):.1%}")
  70. print(f" 耗时: {validation.get('duration', 0):.2f}秒")
  71. outputs = report["final_outputs"]
  72. print(f"\n📄 最终输出:")
  73. print(f" 主要文件: {outputs['primary_output_file']}")
  74. print(f" 问题总数: {outputs['final_question_count']}")
  75. else:
  76. print(f"\n❌ 工作流程执行失败:")
  77. error = report["error"]
  78. print(f" 失败步骤: {error['failed_step']}")
  79. print(f" 错误信息: {error['message']}")
  80. # 显示已完成的步骤
  81. completed = report["workflow_summary"]["completed_steps"]
  82. if completed:
  83. print(f" 已完成步骤: {', '.join(completed)}")
  84. except Exception as e:
  85. print(f"\n❌ 示例执行失败: {e}")
  86. import traceback
  87. traceback.print_exc()
  88. async def example_skip_validation():
  89. """跳过验证的工作流程示例"""
  90. print("=" * 60)
  91. print("跳过验证的工作流程示例")
  92. print("=" * 60)
  93. # 设置日志
  94. setup_logging(verbose=True)
  95. # 配置参数(跳过SQL验证)
  96. db_connection = "postgresql://user:password@localhost:5432/test_db"
  97. table_list_file = "schema_tools/tables.txt"
  98. business_context = "电商系统"
  99. db_name = "ecommerce_db"
  100. output_dir = "./example_output_no_validation"
  101. try:
  102. # 创建工作流编排器(跳过验证)
  103. orchestrator = SchemaWorkflowOrchestrator(
  104. db_connection=db_connection,
  105. table_list_file=table_list_file,
  106. business_context=business_context,
  107. db_name=db_name,
  108. output_dir=output_dir,
  109. enable_sql_validation=False, # 跳过SQL验证
  110. enable_llm_repair=False,
  111. modify_original_file=False
  112. )
  113. print(f"🚀 开始执行工作流程(跳过验证)...")
  114. # 执行工作流程
  115. report = await orchestrator.execute_complete_workflow()
  116. # 打印摘要
  117. orchestrator.print_final_summary(report)
  118. print(f"\n📊 执行结果:")
  119. print(f" 成功: {'是' if report['success'] else '否'}")
  120. print(f" 完成步骤数: {len(report['workflow_summary']['completed_steps'])}")
  121. print(f" 总耗时: {report['workflow_summary']['total_duration']}秒")
  122. except Exception as e:
  123. print(f"\n❌ 示例执行失败: {e}")
  124. async def example_error_handling():
  125. """错误处理示例"""
  126. print("=" * 60)
  127. print("错误处理示例")
  128. print("=" * 60)
  129. # 设置日志
  130. setup_logging(verbose=True)
  131. # 故意使用错误的配置来演示错误处理
  132. db_connection = "postgresql://invalid:invalid@localhost:5432/invalid_db"
  133. table_list_file = "nonexistent_tables.txt"
  134. business_context = "测试系统"
  135. db_name = "test_db"
  136. output_dir = "./example_error_output"
  137. try:
  138. # 创建工作流编排器
  139. orchestrator = SchemaWorkflowOrchestrator(
  140. db_connection=db_connection,
  141. table_list_file=table_list_file,
  142. business_context=business_context,
  143. db_name=db_name,
  144. output_dir=output_dir
  145. )
  146. print(f"🚀 开始执行工作流程(故意触发错误)...")
  147. # 执行工作流程
  148. report = await orchestrator.execute_complete_workflow()
  149. # 分析错误报告
  150. if not report["success"]:
  151. print(f"\n🔍 错误分析:")
  152. error = report["error"]
  153. print(f" 错误类型: {error['type']}")
  154. print(f" 错误信息: {error['message']}")
  155. print(f" 失败步骤: {error['failed_step']}")
  156. # 显示部分结果
  157. partial = report.get("partial_results", {})
  158. if partial:
  159. print(f" 部分结果: {list(partial.keys())}")
  160. except Exception as e:
  161. print(f"\n❌ 预期的错误: {e}")
  162. print("这是演示错误处理的正常情况")
  163. def show_usage_examples():
  164. """显示使用示例"""
  165. print("=" * 60)
  166. print("SchemaWorkflowOrchestrator 使用示例")
  167. print("=" * 60)
  168. examples = [
  169. {
  170. "title": "1. 编程方式 - 完整工作流程",
  171. "code": """
  172. import asyncio
  173. from schema_tools.schema_workflow_orchestrator import SchemaWorkflowOrchestrator
  174. async def run_complete_workflow():
  175. orchestrator = SchemaWorkflowOrchestrator(
  176. db_connection="postgresql://user:pass@localhost:5432/dbname",
  177. table_list_file="tables.txt",
  178. business_context="高速公路服务区管理系统",
  179. db_name="highway_db",
  180. output_dir="./output"
  181. )
  182. # 一键执行完整流程
  183. report = await orchestrator.execute_complete_workflow()
  184. if report["success"]:
  185. print(f"✅ 编排完成!最终生成 {report['final_outputs']['final_question_count']} 个问答对")
  186. print(f"📄 输出文件: {report['final_outputs']['primary_output_file']}")
  187. else:
  188. print(f"❌ 编排失败: {report['error']['message']}")
  189. asyncio.run(run_complete_workflow())
  190. """
  191. },
  192. {
  193. "title": "2. 命令行方式 - 完整工作流程",
  194. "code": """
  195. python -m schema_tools.schema_workflow_orchestrator \\
  196. --db-connection "postgresql://user:pass@localhost:5432/dbname" \\
  197. --table-list tables.txt \\
  198. --business-context "高速公路服务区管理系统" \\
  199. --db-name highway_db \\
  200. --output-dir ./output
  201. """
  202. },
  203. {
  204. "title": "3. 命令行方式 - 跳过验证",
  205. "code": """
  206. python -m schema_tools.schema_workflow_orchestrator \\
  207. --db-connection "postgresql://user:pass@localhost:5432/dbname" \\
  208. --table-list tables.txt \\
  209. --business-context "电商系统" \\
  210. --db-name ecommerce_db \\
  211. --skip-validation
  212. """
  213. },
  214. {
  215. "title": "4. 命令行方式 - 禁用LLM修复",
  216. "code": """
  217. python -m schema_tools.schema_workflow_orchestrator \\
  218. --db-connection "postgresql://user:pass@localhost:5432/dbname" \\
  219. --table-list tables.txt \\
  220. --business-context "管理系统" \\
  221. --db-name management_db \\
  222. --disable-llm-repair \\
  223. --verbose
  224. """
  225. }
  226. ]
  227. for example in examples:
  228. print(f"\n{example['title']}:")
  229. print(example['code'])
  230. async def main():
  231. """主函数"""
  232. print("Schema工作流编排器使用示例")
  233. print("请选择要运行的示例:")
  234. print("1. 完整工作流程示例")
  235. print("2. 跳过验证的工作流程示例")
  236. print("3. 错误处理示例")
  237. print("4. 显示使用示例代码")
  238. print("0. 退出")
  239. try:
  240. choice = input("\n请输入选择 (0-4): ").strip()
  241. if choice == "1":
  242. await example_complete_workflow()
  243. elif choice == "2":
  244. await example_skip_validation()
  245. elif choice == "3":
  246. await example_error_handling()
  247. elif choice == "4":
  248. show_usage_examples()
  249. elif choice == "0":
  250. print("退出示例程序")
  251. else:
  252. print("无效选择")
  253. except KeyboardInterrupt:
  254. print("\n\n用户中断,退出程序")
  255. except Exception as e:
  256. print(f"\n示例执行失败: {e}")
  257. if __name__ == "__main__":
  258. asyncio.run(main())