simple_workflow.py 26 KB


  1. """
  2. Data Pipeline API 简化任务工作流
  3. 集成简化后的数据库管理器和文件管理器,提供任务执行功能
  4. """
  5. import asyncio
  6. import json
  7. import os
  8. import logging
  9. from datetime import datetime
  10. from pathlib import Path
  11. from typing import Dict, Any, Optional, List
  12. from contextlib import contextmanager
  13. from data_pipeline.schema_workflow import SchemaWorkflowOrchestrator
  14. from data_pipeline.api.simple_db_manager import SimpleTaskManager
  15. from data_pipeline.api.simple_file_manager import SimpleFileManager
  16. from data_pipeline.dp_logging import get_logger
  17. class SimpleWorkflowExecutor:
  18. """简化的任务工作流执行器"""
  19. def __init__(self, task_id: str):
  20. """
  21. 初始化工作流执行器
  22. Args:
  23. task_id: 任务ID
  24. """
  25. self.task_id = task_id
  26. self.logger = get_logger("SimpleWorkflowExecutor", task_id)
  27. # 初始化管理器
  28. self.task_manager = SimpleTaskManager()
  29. self.file_manager = SimpleFileManager()
  30. # 任务目录日志记录器
  31. self.task_dir_logger = None
  32. # 加载任务信息
  33. self.task_info = None
  34. self.task_params = None
  35. self._load_task_info()
  36. def _load_task_info(self):
  37. """加载任务信息"""
  38. try:
  39. self.task_info = self.task_manager.get_task(self.task_id)
  40. if self.task_info:
  41. self.task_params = self.task_info.get('parameters', {})
  42. else:
  43. raise ValueError(f"任务不存在: {self.task_id}")
  44. except Exception as e:
  45. self.logger.error(f"加载任务信息失败: {e}")
  46. raise
  47. def _ensure_task_directory(self) -> bool:
  48. """确保任务目录存在"""
  49. try:
  50. success = self.file_manager.create_task_directory(self.task_id)
  51. if success:
  52. # 写入任务配置文件
  53. self._write_task_config()
  54. # 初始化任务目录日志记录器
  55. self._setup_task_directory_logger()
  56. return success
  57. except Exception as e:
  58. self.logger.error(f"创建任务目录失败: {e}")
  59. return False
  60. def _write_task_config(self):
  61. """写入任务配置文件"""
  62. try:
  63. task_dir = self.file_manager.get_task_directory(self.task_id)
  64. config_file = task_dir / "task_config.json"
  65. config_data = {
  66. "task_id": self.task_id,
  67. "created_at": self.task_info.get('created_at').isoformat() if self.task_info.get('created_at') else None,
  68. "parameters": self.task_params,
  69. "output_directory": str(task_dir)
  70. }
  71. with open(config_file, 'w', encoding='utf-8') as f:
  72. json.dump(config_data, f, ensure_ascii=False, indent=2, default=str)
  73. except Exception as e:
  74. self.logger.error(f"写入任务配置失败: {e}")
  75. def _setup_task_directory_logger(self):
  76. """设置任务目录日志记录器"""
  77. try:
  78. task_dir = self.file_manager.get_task_directory(self.task_id)
  79. log_file = task_dir / "data_pipeline.log"
  80. # 创建专门的任务目录日志记录器
  81. self.task_dir_logger = logging.getLogger(f"TaskDir_{self.task_id}")
  82. self.task_dir_logger.setLevel(logging.DEBUG)
  83. # 清除已有处理器
  84. self.task_dir_logger.handlers.clear()
  85. self.task_dir_logger.propagate = False
  86. # 创建文件处理器
  87. file_handler = logging.FileHandler(log_file, encoding='utf-8')
  88. file_handler.setLevel(logging.DEBUG)
  89. # 设置详细的日志格式
  90. formatter = logging.Formatter(
  91. '%(asctime)s [%(levelname)s] %(name)s: %(message)s',
  92. datefmt='%Y-%m-%d %H:%M:%S'
  93. )
  94. file_handler.setFormatter(formatter)
  95. self.task_dir_logger.addHandler(file_handler)
  96. # 记录初始化信息
  97. self.task_dir_logger.info(f"任务目录日志初始化完成 - 任务ID: {self.task_id}")
  98. self.task_dir_logger.info(f"任务参数: {json.dumps(self.task_params, ensure_ascii=False, default=str)}")
  99. except Exception as e:
  100. self.logger.error(f"设置任务目录日志记录器失败: {e}")
  101. def _log_to_task_directory(self, level: str, message: str, step_name: str = None):
  102. """记录日志到任务目录"""
  103. if self.task_dir_logger:
  104. try:
  105. if step_name:
  106. message = f"[{step_name}] {message}"
  107. log_level = getattr(logging, level.upper(), logging.INFO)
  108. self.task_dir_logger.log(log_level, message)
  109. except Exception as e:
  110. self.logger.error(f"记录任务目录日志失败: {e}")
  111. def _resolve_table_list_file_path(self) -> str:
  112. """解析表清单文件路径"""
  113. table_list_file = self.task_params['table_list_file']
  114. # 检查是否使用文件上传模式
  115. if self.task_params.get('file_upload_mode', False) or '{task_directory}' in table_list_file:
  116. # 文件上传模式:检查任务目录中的文件
  117. task_dir = self.file_manager.get_task_directory(self.task_id)
  118. # 替换占位符
  119. if '{task_directory}' in table_list_file:
  120. resolved_path = table_list_file.replace('{task_directory}', str(task_dir))
  121. else:
  122. from data_pipeline.config import SCHEMA_TOOLS_CONFIG
  123. upload_config = SCHEMA_TOOLS_CONFIG.get("file_upload", {})
  124. target_filename = upload_config.get("target_filename", "table_list.txt")
  125. resolved_path = str(task_dir / target_filename)
  126. # 检查文件是否存在
  127. if not Path(resolved_path).exists():
  128. raise FileNotFoundError(
  129. f"表清单文件不存在: {resolved_path}。"
  130. f"请先上传表清单文件到任务 {self.task_id},然后再执行工作流。"
  131. )
  132. return resolved_path
  133. else:
  134. # 传统模式:使用指定的文件路径
  135. if not Path(table_list_file).exists():
  136. raise FileNotFoundError(f"表清单文件不存在: {table_list_file}")
  137. return table_list_file
  138. def _create_orchestrator(self) -> SchemaWorkflowOrchestrator:
  139. """创建工作流编排器"""
  140. task_dir = self.file_manager.get_task_directory(self.task_id)
  141. # 解析表清单文件路径
  142. table_list_file = self._resolve_table_list_file_path()
  143. return SchemaWorkflowOrchestrator(
  144. db_connection=self.task_params['db_connection'],
  145. table_list_file=table_list_file,
  146. business_context=self.task_params['business_context'],
  147. output_dir=str(task_dir),
  148. task_id=self.task_id, # 传递task_id给编排器
  149. enable_sql_validation=self.task_params.get('enable_sql_validation', True),
  150. enable_llm_repair=self.task_params.get('enable_llm_repair', True),
  151. modify_original_file=self.task_params.get('modify_original_file', True),
  152. enable_training_data_load=self.task_params.get('enable_training_data_load', True)
  153. )
  154. @contextmanager
  155. def _step_execution(self, step_name: str):
  156. """步骤执行上下文管理器"""
  157. execution_id = None
  158. try:
  159. # 开始执行
  160. execution_id = self.task_manager.start_step(self.task_id, step_name)
  161. # 记录到任务目录日志
  162. self._log_to_task_directory("INFO", f"开始执行步骤: {step_name}", step_name)
  163. yield execution_id
  164. # 成功完成
  165. self.task_manager.complete_step(self.task_id, step_name, 'completed')
  166. # 记录到任务目录日志
  167. self._log_to_task_directory("INFO", f"步骤执行完成: {step_name}", step_name)
  168. except Exception as e:
  169. # 执行失败
  170. error_msg = str(e)
  171. self.task_manager.complete_step(self.task_id, step_name, 'failed', error_msg)
  172. # 记录到任务目录日志
  173. self._log_to_task_directory("ERROR", f"步骤执行失败: {step_name} - {error_msg}", step_name)
  174. raise
  175. async def execute_complete_workflow(self) -> Dict[str, Any]:
  176. """执行完整工作流"""
  177. try:
  178. # 确保任务目录存在
  179. if not self._ensure_task_directory():
  180. raise Exception("无法创建任务目录")
  181. # 开始任务
  182. self.task_manager.update_task_status(self.task_id, 'in_progress')
  183. # 记录到任务目录日志
  184. self._log_to_task_directory("INFO", "完整工作流任务开始执行")
  185. # 创建工作流编排器
  186. orchestrator = self._create_orchestrator()
  187. # 重定向SchemaWorkflowOrchestrator的日志到任务目录
  188. self._redirect_orchestrator_logs(orchestrator)
  189. # 分别执行各个步骤,每个步骤都用_step_execution包装
  190. try:
  191. # 步骤1: DDL/MD生成
  192. with self._step_execution("ddl_generation") as execution_id:
  193. self._log_to_task_directory("INFO", "开始执行DDL/MD生成步骤", "ddl_generation")
  194. await orchestrator._execute_step_1_ddl_md_generation()
  195. self._log_to_task_directory("INFO", "DDL/MD生成步骤完成", "ddl_generation")
  196. # 步骤2: Question-SQL生成
  197. with self._step_execution("qa_generation") as execution_id:
  198. self._log_to_task_directory("INFO", "开始执行Question-SQL生成步骤", "qa_generation")
  199. await orchestrator._execute_step_2_question_sql_generation()
  200. self._log_to_task_directory("INFO", "Question-SQL生成步骤完成", "qa_generation")
  201. # 步骤3: SQL验证(如果启用)
  202. if orchestrator.enable_sql_validation:
  203. with self._step_execution("sql_validation") as execution_id:
  204. self._log_to_task_directory("INFO", "开始执行SQL验证步骤", "sql_validation")
  205. await orchestrator._execute_step_3_sql_validation()
  206. self._log_to_task_directory("INFO", "SQL验证步骤完成", "sql_validation")
  207. else:
  208. self._log_to_task_directory("INFO", "跳过SQL验证步骤(未启用)", "sql_validation")
  209. # 步骤4: 训练数据加载(如果启用)
  210. if orchestrator.enable_training_data_load:
  211. with self._step_execution("training_load") as execution_id:
  212. self._log_to_task_directory("INFO", "开始执行训练数据加载步骤", "training_load")
  213. await orchestrator._execute_step_4_training_data_load()
  214. self._log_to_task_directory("INFO", "训练数据加载步骤完成", "training_load")
  215. else:
  216. self._log_to_task_directory("INFO", "跳过训练数据加载步骤(未启用)", "training_load")
  217. # 获取工作流结果
  218. result = {
  219. "success": True,
  220. "workflow_state": orchestrator.workflow_state,
  221. "artifacts": orchestrator.workflow_state.get("artifacts", {})
  222. }
  223. # 写入结果文件
  224. self._write_result_file(result)
  225. except Exception as step_error:
  226. self.logger.error(f"工作流步骤执行失败: {step_error}")
  227. # 记录到任务目录日志
  228. self._log_to_task_directory("ERROR", f"工作流步骤执行失败: {step_error}")
  229. raise
  230. # 完成任务
  231. self.task_manager.update_task_status(self.task_id, 'completed')
  232. # 记录到任务目录日志
  233. self._log_to_task_directory("INFO", "完整工作流任务执行完成")
  234. return {
  235. "success": True,
  236. "task_id": self.task_id,
  237. "execution_mode": "complete",
  238. "result": result
  239. }
  240. except Exception as e:
  241. # 记录错误
  242. error_msg = str(e)
  243. self.task_manager.update_task_status(self.task_id, 'failed', error_msg)
  244. # 记录到任务目录日志
  245. self._log_to_task_directory("ERROR", f"完整工作流任务执行失败: {error_msg}")
  246. return {
  247. "success": False,
  248. "task_id": self.task_id,
  249. "execution_mode": "complete",
  250. "error": error_msg
  251. }
  252. async def execute_single_step(self, step_name: str) -> Dict[str, Any]:
  253. """执行单个步骤"""
  254. try:
  255. # 确保任务目录存在
  256. if not self._ensure_task_directory():
  257. raise Exception("无法创建任务目录")
  258. # 更新任务状态
  259. self.task_manager.update_task_status(self.task_id, 'in_progress')
  260. # 创建工作流编排器
  261. orchestrator = self._create_orchestrator()
  262. # 重定向SchemaWorkflowOrchestrator的日志到任务目录
  263. self._redirect_orchestrator_logs(orchestrator)
  264. # 执行指定步骤
  265. result = None
  266. with self._step_execution(step_name) as execution_id:
  267. if step_name == "ddl_generation":
  268. await orchestrator._execute_step_1_ddl_md_generation()
  269. result = orchestrator.workflow_state["artifacts"].get("ddl_md_generation", {})
  270. elif step_name == "qa_generation":
  271. await orchestrator._execute_step_2_question_sql_generation()
  272. result = orchestrator.workflow_state["artifacts"].get("question_sql_generation", {})
  273. elif step_name == "sql_validation":
  274. await orchestrator._execute_step_3_sql_validation()
  275. result = orchestrator.workflow_state["artifacts"].get("sql_validation", {})
  276. elif step_name == "training_load":
  277. await orchestrator._execute_step_4_training_data_load()
  278. result = orchestrator.workflow_state["artifacts"].get("training_data_load", {})
  279. else:
  280. raise ValueError(f"不支持的步骤: {step_name}")
  281. # 写入步骤结果文件
  282. self._write_step_result_file(step_name, result)
  283. # 检查是否所有步骤都已完成
  284. self._update_overall_task_status()
  285. return {
  286. "success": True,
  287. "task_id": self.task_id,
  288. "execution_mode": "step",
  289. "step_name": step_name,
  290. "result": result
  291. }
  292. except Exception as e:
  293. # 记录错误
  294. error_msg = str(e)
  295. self.task_manager.update_task_status(self.task_id, 'failed', error_msg)
  296. # 记录到任务目录日志
  297. self._log_to_task_directory("ERROR", f"步骤执行失败: {step_name} - {error_msg}", step_name)
  298. return {
  299. "success": False,
  300. "task_id": self.task_id,
  301. "execution_mode": "step",
  302. "step_name": step_name,
  303. "error": error_msg
  304. }
  305. def _write_result_file(self, result: Dict[str, Any]):
  306. """写入完整结果文件"""
  307. try:
  308. task_dir = self.file_manager.get_task_directory(self.task_id)
  309. result_file = task_dir / "task_result.json"
  310. with open(result_file, 'w', encoding='utf-8') as f:
  311. json.dump(result, f, ensure_ascii=False, indent=2, default=str)
  312. except Exception as e:
  313. self.logger.error(f"写入结果文件失败: {e}")
  314. def _write_step_result_file(self, step_name: str, result: Dict[str, Any]):
  315. """写入步骤结果文件"""
  316. try:
  317. task_dir = self.file_manager.get_task_directory(self.task_id)
  318. result_file = task_dir / f"{step_name}_result.json"
  319. with open(result_file, 'w', encoding='utf-8') as f:
  320. json.dump(result, f, ensure_ascii=False, indent=2, default=str)
  321. except Exception as e:
  322. self.logger.error(f"写入步骤结果文件失败: {e}")
  323. def _update_overall_task_status(self):
  324. """更新整体任务状态"""
  325. try:
  326. # 检查所有步骤的完成情况
  327. steps = self.task_manager.get_task_steps(self.task_id)
  328. completed_steps = set()
  329. failed_steps = set()
  330. for step in steps:
  331. if step['step_status'] == 'completed':
  332. completed_steps.add(step['step_name'])
  333. elif step['step_status'] == 'failed':
  334. failed_steps.add(step['step_name'])
  335. # 检查是否有失败的步骤
  336. if failed_steps:
  337. self.task_manager.update_task_status(self.task_id, 'failed')
  338. return
  339. # 检查是否完成了必要步骤
  340. required_steps = {"ddl_generation", "qa_generation"}
  341. if required_steps.issubset(completed_steps):
  342. # 检查是否有可选步骤完成
  343. optional_steps = {"sql_validation", "training_load"}
  344. if completed_steps.intersection(optional_steps):
  345. if len(completed_steps) >= 3:
  346. self.task_manager.update_task_status(self.task_id, 'completed')
  347. else:
  348. self.task_manager.update_task_status(self.task_id, 'partial_completed')
  349. else:
  350. self.task_manager.update_task_status(self.task_id, 'partial_completed')
  351. except Exception as e:
  352. self.logger.error(f"更新任务状态失败: {e}")
  353. def _redirect_orchestrator_logs(self, orchestrator):
  354. """重定向SchemaWorkflowOrchestrator的日志到任务目录"""
  355. if self.task_dir_logger and hasattr(orchestrator, 'logger'):
  356. try:
  357. # 为orchestrator的logger添加任务目录文件处理器
  358. for handler in self.task_dir_logger.handlers:
  359. if isinstance(handler, logging.FileHandler):
  360. orchestrator.logger.addHandler(handler)
  361. break
  362. except Exception as e:
  363. self.logger.error(f"重定向orchestrator日志失败: {e}")
  364. def query_logs_advanced(self,
  365. page: int = 1,
  366. page_size: int = 50,
  367. level: str = None,
  368. start_time: str = None,
  369. end_time: str = None,
  370. keyword: str = None,
  371. logger_name: str = None,
  372. step_name: str = None,
  373. sort_by: str = "timestamp",
  374. sort_order: str = "desc") -> dict:
  375. """
  376. 高级日志查询(工作流层)
  377. Args:
  378. page: 页码,必须大于0,默认1
  379. page_size: 每页大小,1-500之间,默认50
  380. level: 可选,日志级别筛选
  381. start_time: 可选,开始时间范围
  382. end_time: 可选,结束时间范围
  383. keyword: 可选,关键字搜索
  384. logger_name: 可选,日志记录器名称
  385. step_name: 可选,执行步骤名称
  386. sort_by: 可选,排序字段
  387. sort_order: 可选,排序方向
  388. Returns:
  389. 日志查询结果
  390. """
  391. try:
  392. # 调用数据库层方法
  393. result = self.task_manager.query_logs_advanced(
  394. task_id=self.task_id,
  395. page=page,
  396. page_size=page_size,
  397. level=level,
  398. start_time=start_time,
  399. end_time=end_time,
  400. keyword=keyword,
  401. logger_name=logger_name,
  402. step_name=step_name,
  403. sort_by=sort_by,
  404. sort_order=sort_order
  405. )
  406. # 记录查询操作
  407. self.logger.info(f"日志查询完成: {self.task_id}, 页码: {page}, 结果数: {len(result.get('logs', []))}")
  408. return result
  409. except Exception as e:
  410. self.logger.error(f"日志查询失败: {e}")
  411. return {
  412. "logs": [],
  413. "pagination": {
  414. "page": page,
  415. "page_size": page_size,
  416. "total": 0,
  417. "total_pages": 0,
  418. "has_next": False,
  419. "has_prev": False
  420. },
  421. "log_file_info": {
  422. "exists": False,
  423. "error": str(e)
  424. },
  425. "query_time": "0.000s"
  426. }
  427. def cleanup(self):
  428. """清理资源"""
  429. try:
  430. # 清理任务目录日志记录器
  431. if self.task_dir_logger:
  432. for handler in self.task_dir_logger.handlers:
  433. handler.close()
  434. self.task_dir_logger.handlers.clear()
  435. self.task_manager.close_connection()
  436. except Exception as e:
  437. self.logger.error(f"清理资源失败: {e}")
  438. class SimpleWorkflowManager:
  439. """简化的任务工作流管理器"""
  440. def __init__(self):
  441. """初始化工作流管理器"""
  442. self.task_manager = SimpleTaskManager()
  443. self.file_manager = SimpleFileManager()
  444. # 使用简单的控制台日志,不使用文件日志
  445. self.logger = logging.getLogger("SimpleWorkflowManager")
  446. self.logger.setLevel(logging.INFO)
  447. def create_task(self,
  448. table_list_file: str = None,
  449. business_context: str = None,
  450. db_name: str = None,
  451. **kwargs) -> str:
  452. """创建新任务"""
  453. try:
  454. # 如果提供了table_list_file,验证文件存在
  455. if table_list_file and not os.path.exists(table_list_file):
  456. raise FileNotFoundError(f"表清单文件不存在: {table_list_file}")
  457. # 创建任务(使用app_config中的数据库配置)
  458. task_id = self.task_manager.create_task(
  459. table_list_file=table_list_file,
  460. business_context=business_context,
  461. db_name=db_name,
  462. **kwargs
  463. )
  464. return task_id
  465. except Exception as e:
  466. self.logger.error(f"创建任务失败: {e}")
  467. raise
  468. async def execute_task(self,
  469. task_id: str,
  470. execution_mode: str = "complete",
  471. step_name: Optional[str] = None) -> Dict[str, Any]:
  472. """执行任务"""
  473. executor = None
  474. try:
  475. executor = SimpleWorkflowExecutor(task_id)
  476. if execution_mode == "complete":
  477. return await executor.execute_complete_workflow()
  478. elif execution_mode == "step":
  479. if not step_name:
  480. raise ValueError("步骤执行模式需要指定step_name")
  481. return await executor.execute_single_step(step_name)
  482. else:
  483. raise ValueError(f"不支持的执行模式: {execution_mode}")
  484. finally:
  485. if executor:
  486. executor.cleanup()
  487. def get_task_status(self, task_id: str) -> Optional[Dict[str, Any]]:
  488. """获取任务状态"""
  489. return self.task_manager.get_task(task_id)
  490. def get_task_files(self, task_id: str) -> List[Dict[str, Any]]:
  491. """获取任务文件列表"""
  492. return self.file_manager.get_task_files(task_id)
  493. def get_task_steps(self, task_id: str) -> List[Dict[str, Any]]:
  494. """获取任务步骤状态"""
  495. return self.task_manager.get_task_steps(task_id)
  496. def get_tasks_list(self, **kwargs) -> List[Dict[str, Any]]:
  497. """获取任务列表"""
  498. return self.task_manager.get_tasks_list(**kwargs)
  499. def query_tasks_advanced(self, **kwargs) -> dict:
  500. """
  501. 高级任务查询,支持复杂筛选、排序、分页
  502. Args:
  503. **kwargs: 传递给数据库层的查询参数
  504. Returns:
  505. 包含任务列表和分页信息的字典
  506. """
  507. return self.task_manager.query_tasks_advanced(**kwargs)
  508. def cleanup(self):
  509. """清理资源"""
  510. try:
  511. self.task_manager.close_connection()
  512. except Exception as e:
  513. self.logger.error(f"清理资源失败: {e}")