__main__.py 7.4 KB


  1. import argparse
  2. import asyncio
  3. import sys
  4. import os
  5. import logging
  6. from pathlib import Path
  7. def setup_argument_parser():
  8. """设置命令行参数解析器"""
  9. parser = argparse.ArgumentParser(
  10. description='Schema Tools - 自动生成数据库训练数据',
  11. formatter_class=argparse.RawDescriptionHelpFormatter,
  12. epilog="""
  13. 示例用法:
  14. # 基本使用
  15. python -m schema_tools --db-connection "postgresql://user:pass@host:5432/db" --table-list tables.txt
  16. # 指定业务上下文和输出目录
  17. python -m schema_tools --db-connection "..." --table-list tables.txt --business-context "电商系统" --output-dir output
  18. # 仅生成DDL文件
  19. python -m schema_tools --db-connection "..." --table-list tables.txt --pipeline ddl_only
  20. # 权限检查模式
  21. python -m schema_tools --db-connection "..." --check-permissions-only
  22. """
  23. )
  24. # 必需参数
  25. parser.add_argument(
  26. '--db-connection',
  27. required=True,
  28. help='数据库连接字符串 (例如: postgresql://user:pass@localhost:5432/dbname)'
  29. )
  30. # 可选参数
  31. parser.add_argument(
  32. '--table-list',
  33. help='表清单文件路径'
  34. )
  35. parser.add_argument(
  36. '--business-context',
  37. help='业务上下文描述'
  38. )
  39. parser.add_argument(
  40. '--business-context-file',
  41. help='业务上下文文件路径'
  42. )
  43. parser.add_argument(
  44. '--output-dir',
  45. help='输出目录路径'
  46. )
  47. parser.add_argument(
  48. '--pipeline',
  49. choices=['full', 'ddl_only', 'analysis_only'],
  50. help='处理链类型'
  51. )
  52. parser.add_argument(
  53. '--max-concurrent',
  54. type=int,
  55. help='最大并发表数量'
  56. )
  57. # 功能开关
  58. parser.add_argument(
  59. '--no-filter-system-tables',
  60. action='store_true',
  61. help='禁用系统表过滤'
  62. )
  63. parser.add_argument(
  64. '--check-permissions-only',
  65. action='store_true',
  66. help='仅检查数据库权限,不处理表'
  67. )
  68. parser.add_argument(
  69. '--verbose', '-v',
  70. action='store_true',
  71. help='启用详细日志输出'
  72. )
  73. parser.add_argument(
  74. '--log-file',
  75. help='日志文件路径'
  76. )
  77. return parser
  78. def load_config_with_overrides(args):
  79. """加载配置并应用命令行覆盖"""
  80. from schema_tools.config import SCHEMA_TOOLS_CONFIG
  81. config = SCHEMA_TOOLS_CONFIG.copy()
  82. # 命令行参数覆盖配置
  83. if args.output_dir:
  84. config["output_directory"] = args.output_dir
  85. if args.pipeline:
  86. config["default_pipeline"] = args.pipeline
  87. if args.max_concurrent:
  88. config["max_concurrent_tables"] = args.max_concurrent
  89. if args.no_filter_system_tables:
  90. config["filter_system_tables"] = False
  91. if args.log_file:
  92. config["log_file"] = args.log_file
  93. return config
  94. def load_business_context(args):
  95. """加载业务上下文"""
  96. if args.business_context_file:
  97. try:
  98. with open(args.business_context_file, 'r', encoding='utf-8') as f:
  99. return f.read().strip()
  100. except Exception as e:
  101. print(f"警告: 无法读取业务上下文文件 {args.business_context_file}: {e}")
  102. if args.business_context:
  103. return args.business_context
  104. from schema_tools.config import SCHEMA_TOOLS_CONFIG
  105. return SCHEMA_TOOLS_CONFIG.get("default_business_context", "数据库管理系统")
  106. async def check_permissions_only(db_connection: str):
  107. """仅检查数据库权限"""
  108. from schema_tools.training_data_agent import SchemaTrainingDataAgent
  109. print("🔍 检查数据库权限...")
  110. try:
  111. agent = SchemaTrainingDataAgent(
  112. db_connection=db_connection,
  113. table_list_file="", # 不需要表清单
  114. business_context="" # 不需要业务上下文
  115. )
  116. # 初始化Agent以建立数据库连接
  117. await agent._initialize()
  118. # 检查权限
  119. permissions = await agent.check_database_permissions()
  120. print("\n📋 权限检查结果:")
  121. print(f" ✅ 数据库连接: {'可用' if permissions['connect'] else '不可用'}")
  122. print(f" ✅ 元数据查询: {'可用' if permissions['select_metadata'] else '不可用'}")
  123. print(f" ✅ 数据查询: {'可用' if permissions['select_data'] else '不可用'}")
  124. print(f" ℹ️ 数据库类型: {'只读' if permissions['is_readonly'] else '读写'}")
  125. # 修复判断逻辑:is_readonly=False表示可读写,是好事
  126. required_permissions = ['connect', 'select_metadata', 'select_data']
  127. has_required_permissions = all(permissions.get(perm, False) for perm in required_permissions)
  128. if has_required_permissions:
  129. print("\n✅ 数据库权限检查通过,可以开始处理")
  130. return True
  131. else:
  132. print("\n❌ 数据库权限不足,请检查配置")
  133. return False
  134. except Exception as e:
  135. print(f"\n❌ 权限检查失败: {e}")
  136. return False
  137. async def main():
  138. """主入口函数"""
  139. parser = setup_argument_parser()
  140. args = parser.parse_args()
  141. # 设置日志
  142. from schema_tools.utils.logger import setup_logging
  143. setup_logging(
  144. verbose=args.verbose,
  145. log_file=args.log_file
  146. )
  147. # 仅权限检查模式
  148. if args.check_permissions_only:
  149. success = await check_permissions_only(args.db_connection)
  150. sys.exit(0 if success else 1)
  151. # 验证必需参数
  152. if not args.table_list:
  153. print("错误: 需要指定 --table-list 参数")
  154. parser.print_help()
  155. sys.exit(1)
  156. if not os.path.exists(args.table_list):
  157. print(f"错误: 表清单文件不存在: {args.table_list}")
  158. sys.exit(1)
  159. try:
  160. # 加载配置和业务上下文
  161. config = load_config_with_overrides(args)
  162. business_context = load_business_context(args)
  163. # 创建Agent
  164. from schema_tools.training_data_agent import SchemaTrainingDataAgent
  165. agent = SchemaTrainingDataAgent(
  166. db_connection=args.db_connection,
  167. table_list_file=args.table_list,
  168. business_context=business_context,
  169. output_dir=config["output_directory"],
  170. pipeline=config["default_pipeline"]
  171. )
  172. # 执行生成
  173. print("🚀 开始生成Schema训练数据...")
  174. report = await agent.generate_training_data()
  175. # 输出结果
  176. if report['summary']['failed'] == 0:
  177. print("\n🎉 所有表处理成功!")
  178. else:
  179. print(f"\n⚠️ 处理完成,但有 {report['summary']['failed']} 个表失败")
  180. print(f"📁 输出目录: {config['output_directory']}")
  181. # 如果有失败的表,返回非零退出码
  182. sys.exit(1 if report['summary']['failed'] > 0 else 0)
  183. except KeyboardInterrupt:
  184. print("\n\n⏹️ 用户中断,程序退出")
  185. sys.exit(130)
  186. except Exception as e:
  187. print(f"\n❌ 程序执行失败: {e}")
  188. if args.verbose:
  189. import traceback
  190. traceback.print_exc()
  191. sys.exit(1)
  192. if __name__ == "__main__":
  193. asyncio.run(main())