create_task_cli.py 8.8 KB


  1. """
  2. Data Pipeline 命令行任务创建工具
  3. 专门用于手动创建任务,生成manual_前缀的task_id
  4. 仅创建任务目录,不涉及数据库或配置文件
  5. """
  6. import argparse
  7. import os
  8. import sys
  9. from datetime import datetime
  10. from pathlib import Path
  11. def generate_manual_task_id() -> str:
  12. """生成手动任务ID,格式: manual_YYYYMMDD_HHMMSS"""
  13. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  14. return f"manual_{timestamp}"
  15. def resolve_base_directory():
  16. """解析基础输出目录"""
  17. try:
  18. from data_pipeline.config import SCHEMA_TOOLS_CONFIG
  19. base_dir = SCHEMA_TOOLS_CONFIG.get("output_directory", "./data_pipeline/training_data/")
  20. except ImportError:
  21. # 如果无法导入配置,使用默认路径
  22. base_dir = "./data_pipeline/training_data/"
  23. # 处理相对路径
  24. if not Path(base_dir).is_absolute():
  25. # 相对于项目根目录解析
  26. project_root = Path(__file__).parent.parent
  27. base_dir = project_root / base_dir
  28. return Path(base_dir)
  29. def create_task_directory(task_id: str, logger) -> Path:
  30. """创建任务目录"""
  31. base_dir = resolve_base_directory()
  32. task_dir = base_dir / task_id
  33. try:
  34. task_dir.mkdir(parents=True, exist_ok=True)
  35. logger.info(f"任务目录已创建: {task_dir}")
  36. return task_dir
  37. except Exception as e:
  38. logger.error(f"创建任务目录失败: {e}")
  39. raise
  40. def extract_db_name_from_connection(connection_string: str) -> str:
  41. """从数据库连接字符串中提取数据库名称"""
  42. try:
  43. if '/' in connection_string:
  44. db_name = connection_string.split('/')[-1]
  45. if '?' in db_name:
  46. db_name = db_name.split('?')[0]
  47. return db_name if db_name else "database"
  48. else:
  49. return "database"
  50. except Exception:
  51. return "database"
  52. def setup_argument_parser():
  53. """设置命令行参数解析器"""
  54. parser = argparse.ArgumentParser(
  55. description='Data Pipeline 任务创建工具 - 创建手动执行的训练任务',
  56. formatter_class=argparse.RawDescriptionHelpFormatter,
  57. epilog="""
  58. 示例用法:
  59. # 基本创建
  60. python -m data_pipeline.create_task_cli --business-context "电商系统" --db-connection "postgresql://user:pass@localhost:5432/ecommerce_db"
  61. # 指定表清单文件
  62. python -m data_pipeline.create_task_cli --table-list tables.txt --business-context "高速公路管理系统" --db-connection "postgresql://user:pass@localhost:5432/highway_db"
  63. # 指定任务名称
  64. python -m data_pipeline.create_task_cli --task-name "电商数据训练" --business-context "电商系统" --db-connection "postgresql://user:pass@localhost:5432/ecommerce_db"
  65. 创建成功后,可以使用返回的task_id进行分步执行:
  66. python -m data_pipeline.ddl_generation.ddl_md_generator --task-id <task_id> --db-connection "..." --table-list tables.txt --business-context "..."
  67. """
  68. )
  69. # 必需参数
  70. parser.add_argument(
  71. '--business-context',
  72. required=True,
  73. help='业务上下文描述'
  74. )
  75. parser.add_argument(
  76. '--db-connection',
  77. required=True,
  78. help='数据库连接字符串 (postgresql://user:pass@host:port/dbname)'
  79. )
  80. # 可选参数
  81. parser.add_argument(
  82. '--table-list',
  83. help='表清单文件路径'
  84. )
  85. parser.add_argument(
  86. '--task-name',
  87. help='任务名称'
  88. )
  89. parser.add_argument(
  90. '--db-name',
  91. help='数据库名称(如果不提供,将从连接字符串中提取)'
  92. )
  93. parser.add_argument(
  94. '--verbose', '-v',
  95. action='store_true',
  96. help='启用详细输出和日志'
  97. )
  98. return parser
  99. def print_usage_instructions(task_id: str, task_dir: Path, logger, **params):
  100. """输出使用说明"""
  101. # 总是向控制台输出结果,同时记录到日志
  102. output_lines = [
  103. "",
  104. "=" * 60,
  105. "🎉 任务创建成功!",
  106. "=" * 60,
  107. f"📋 任务ID: {task_id}",
  108. f"📁 任务目录: {task_dir}"
  109. ]
  110. if params.get('task_name'):
  111. output_lines.append(f"🎯 任务名称: {params['task_name']}")
  112. if params.get('db_name'):
  113. output_lines.append(f"🗄️ 数据库: {params['db_name']}")
  114. output_lines.append(f"🏢 业务背景: {params['business_context']}")
  115. if params.get('table_list'):
  116. output_lines.append(f"📋 表清单文件: {params['table_list']}")
  117. output_lines.extend([
  118. "",
  119. "💡 现在可以使用以下命令执行分步操作:",
  120. "=" * 60
  121. ])
  122. # 构建示例命令
  123. db_conn = params['db_connection']
  124. business_context = params['business_context']
  125. table_list = params.get('table_list', 'tables.txt')
  126. command_lines = [
  127. "# 步骤1: 生成DDL和MD文件",
  128. f'python -m data_pipeline.ddl_generation.ddl_md_generator \\',
  129. f' --task-id {task_id} \\',
  130. f' --db-connection "{db_conn}" \\',
  131. f' --table-list {table_list} \\',
  132. f' --business-context "{business_context}"',
  133. "",
  134. "# 步骤2: 生成Question-SQL对",
  135. f'python -m data_pipeline.qa_generation.qs_generator \\',
  136. f' --task-id {task_id} \\',
  137. f' --table-list {table_list} \\',
  138. f' --business-context "{business_context}"',
  139. "",
  140. "# 步骤3: 验证和修正SQL",
  141. f'python -m data_pipeline.validators.sql_validate_cli \\',
  142. f' --task-id {task_id} \\',
  143. f' --db-connection "{db_conn}"',
  144. "",
  145. "# 步骤4: 训练数据加载",
  146. f'python -m data_pipeline.trainer.run_training \\',
  147. f' --task-id {task_id}',
  148. "",
  149. "=" * 60
  150. ]
  151. # 输出到控制台(总是显示)
  152. for line in output_lines + command_lines:
  153. print(line)
  154. # 记录到日志
  155. logger.info("任务创建成功总结:")
  156. for line in output_lines[2:]: # 跳过装饰线
  157. if line and not line.startswith("="):
  158. logger.info(f" {line}")
  159. logger.info("分步执行命令:")
  160. for line in command_lines:
  161. if line and not line.startswith("#") and line.strip():
  162. logger.info(f" {line}")
  163. def main():
  164. """主入口函数"""
  165. parser = setup_argument_parser()
  166. args = parser.parse_args()
  167. # 生成任务ID
  168. task_id = generate_manual_task_id()
  169. # 初始化统一日志服务
  170. try:
  171. from data_pipeline.dp_logging import get_logger
  172. logger = get_logger("CreateTaskCLI", task_id)
  173. logger.info(f"开始创建手动任务: {task_id}")
  174. except ImportError:
  175. # 如果无法导入统一日志服务,创建简单的logger
  176. import logging
  177. logger = logging.getLogger("CreateTaskCLI")
  178. logger.setLevel(logging.INFO)
  179. if not logger.handlers:
  180. handler = logging.StreamHandler()
  181. formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(name)s: %(message)s')
  182. handler.setFormatter(formatter)
  183. logger.addHandler(handler)
  184. logger.warning("无法导入统一日志服务,使用简单日志")
  185. try:
  186. logger.info(f"生成任务ID: {task_id}")
  187. # 提取数据库名称
  188. db_name = args.db_name or extract_db_name_from_connection(args.db_connection)
  189. logger.info(f"数据库名称: {db_name}")
  190. # 验证表清单文件(如果提供)
  191. if args.table_list:
  192. if not os.path.exists(args.table_list):
  193. error_msg = f"表清单文件不存在: {args.table_list}"
  194. logger.error(error_msg)
  195. sys.exit(1)
  196. else:
  197. logger.info(f"表清单文件验证通过: {args.table_list}")
  198. # 创建任务目录
  199. task_dir = create_task_directory(task_id, logger)
  200. logger.info(f"任务创建完成: {task_id}")
  201. logger.info(f"参数信息: 业务背景='{args.business_context}', 数据库='{db_name}', 表清单='{args.table_list}'")
  202. # 输出使用说明
  203. print_usage_instructions(
  204. task_id=task_id,
  205. task_dir=task_dir,
  206. logger=logger,
  207. task_name=args.task_name,
  208. db_name=db_name,
  209. business_context=args.business_context,
  210. table_list=args.table_list,
  211. db_connection=args.db_connection
  212. )
  213. logger.info("任务创建工具执行完成")
  214. sys.exit(0)
  215. except KeyboardInterrupt:
  216. logger.warning("用户中断,程序退出")
  217. sys.exit(130)
  218. except Exception as e:
  219. logger.error(f"任务创建失败: {e}", exc_info=args.verbose)
  220. sys.exit(1)
  221. if __name__ == "__main__":
  222. main()