simple_workflow.py 22 KB


  1. """
  2. Data Pipeline API 简化任务工作流
  3. 集成简化后的数据库管理器和文件管理器,提供任务执行功能
  4. """
  5. import asyncio
  6. import json
  7. import os
  8. import logging
  9. from datetime import datetime
  10. from pathlib import Path
  11. from typing import Dict, Any, Optional, List
  12. from contextlib import contextmanager
  13. from data_pipeline.schema_workflow import SchemaWorkflowOrchestrator
  14. from data_pipeline.api.simple_db_manager import SimpleTaskManager
  15. from data_pipeline.api.simple_file_manager import SimpleFileManager
  16. from core.logging import get_data_pipeline_logger
  17. class SimpleWorkflowExecutor:
  18. """简化的任务工作流执行器"""
  19. def __init__(self, task_id: str):
  20. """
  21. 初始化工作流执行器
  22. Args:
  23. task_id: 任务ID
  24. """
  25. self.task_id = task_id
  26. self.logger = get_data_pipeline_logger("SimpleWorkflowExecutor")
  27. # 初始化管理器
  28. self.task_manager = SimpleTaskManager()
  29. self.file_manager = SimpleFileManager()
  30. # 任务目录日志记录器
  31. self.task_dir_logger = None
  32. # 加载任务信息
  33. self.task_info = None
  34. self.task_params = None
  35. self._load_task_info()
  36. def _load_task_info(self):
  37. """加载任务信息"""
  38. try:
  39. self.task_info = self.task_manager.get_task(self.task_id)
  40. if self.task_info:
  41. self.task_params = self.task_info.get('parameters', {})
  42. else:
  43. raise ValueError(f"任务不存在: {self.task_id}")
  44. except Exception as e:
  45. self.logger.error(f"加载任务信息失败: {e}")
  46. raise
  47. def _ensure_task_directory(self) -> bool:
  48. """确保任务目录存在"""
  49. try:
  50. success = self.file_manager.create_task_directory(self.task_id)
  51. if success:
  52. # 写入任务配置文件
  53. self._write_task_config()
  54. # 初始化任务目录日志记录器
  55. self._setup_task_directory_logger()
  56. return success
  57. except Exception as e:
  58. self.logger.error(f"创建任务目录失败: {e}")
  59. return False
  60. def _write_task_config(self):
  61. """写入任务配置文件"""
  62. try:
  63. task_dir = self.file_manager.get_task_directory(self.task_id)
  64. config_file = task_dir / "task_config.json"
  65. config_data = {
  66. "task_id": self.task_id,
  67. "created_at": self.task_info.get('created_at').isoformat() if self.task_info.get('created_at') else None,
  68. "parameters": self.task_params,
  69. "output_directory": str(task_dir)
  70. }
  71. with open(config_file, 'w', encoding='utf-8') as f:
  72. json.dump(config_data, f, ensure_ascii=False, indent=2, default=str)
  73. except Exception as e:
  74. self.logger.error(f"写入任务配置失败: {e}")
  75. def _setup_task_directory_logger(self):
  76. """设置任务目录日志记录器"""
  77. try:
  78. task_dir = self.file_manager.get_task_directory(self.task_id)
  79. log_file = task_dir / "data_pipeline.log"
  80. # 创建专门的任务目录日志记录器
  81. self.task_dir_logger = logging.getLogger(f"TaskDir_{self.task_id}")
  82. self.task_dir_logger.setLevel(logging.DEBUG)
  83. # 清除已有处理器
  84. self.task_dir_logger.handlers.clear()
  85. self.task_dir_logger.propagate = False
  86. # 创建文件处理器
  87. file_handler = logging.FileHandler(log_file, encoding='utf-8')
  88. file_handler.setLevel(logging.DEBUG)
  89. # 设置详细的日志格式
  90. formatter = logging.Formatter(
  91. '%(asctime)s [%(levelname)s] %(name)s: %(message)s',
  92. datefmt='%Y-%m-%d %H:%M:%S'
  93. )
  94. file_handler.setFormatter(formatter)
  95. self.task_dir_logger.addHandler(file_handler)
  96. # 记录初始化信息
  97. self.task_dir_logger.info(f"任务目录日志初始化完成 - 任务ID: {self.task_id}")
  98. self.task_dir_logger.info(f"任务参数: {json.dumps(self.task_params, ensure_ascii=False, default=str)}")
  99. except Exception as e:
  100. self.logger.error(f"设置任务目录日志记录器失败: {e}")
  101. def _log_to_task_directory(self, level: str, message: str, step_name: str = None):
  102. """记录日志到任务目录"""
  103. if self.task_dir_logger:
  104. try:
  105. if step_name:
  106. message = f"[{step_name}] {message}"
  107. log_level = getattr(logging, level.upper(), logging.INFO)
  108. self.task_dir_logger.log(log_level, message)
  109. except Exception as e:
  110. self.logger.error(f"记录任务目录日志失败: {e}")
  111. def _create_orchestrator(self) -> SchemaWorkflowOrchestrator:
  112. """创建工作流编排器"""
  113. task_dir = self.file_manager.get_task_directory(self.task_id)
  114. return SchemaWorkflowOrchestrator(
  115. db_connection=self.task_params['db_connection'],
  116. table_list_file=self.task_params['table_list_file'],
  117. business_context=self.task_params['business_context'],
  118. output_dir=str(task_dir),
  119. enable_sql_validation=self.task_params.get('enable_sql_validation', True),
  120. enable_llm_repair=self.task_params.get('enable_llm_repair', True),
  121. modify_original_file=self.task_params.get('modify_original_file', True),
  122. enable_training_data_load=self.task_params.get('enable_training_data_load', True)
  123. )
  124. @contextmanager
  125. def _step_execution(self, step_name: str):
  126. """步骤执行上下文管理器"""
  127. execution_id = None
  128. try:
  129. # 开始执行
  130. execution_id = self.task_manager.create_execution(self.task_id, step_name)
  131. self.task_manager.update_step_status(self.task_id, step_name, "running")
  132. self.task_manager.record_log(self.task_id, "INFO", f"开始执行步骤: {step_name}", execution_id, step_name)
  133. # 记录到任务目录日志
  134. self._log_to_task_directory("INFO", f"开始执行步骤: {step_name}", step_name)
  135. yield execution_id
  136. # 成功完成
  137. self.task_manager.complete_execution(execution_id, 'completed')
  138. self.task_manager.update_step_status(self.task_id, step_name, "completed")
  139. self.task_manager.record_log(self.task_id, "INFO", f"步骤执行完成: {step_name}", execution_id, step_name)
  140. # 记录到任务目录日志
  141. self._log_to_task_directory("INFO", f"步骤执行完成: {step_name}", step_name)
  142. except Exception as e:
  143. # 执行失败
  144. error_msg = str(e)
  145. if execution_id:
  146. self.task_manager.complete_execution(execution_id, 'failed', error_msg)
  147. self.task_manager.update_step_status(self.task_id, step_name, "failed")
  148. self.task_manager.record_log(self.task_id, "ERROR", f"步骤执行失败: {step_name} - {error_msg}", execution_id, step_name)
  149. # 记录到任务目录日志
  150. self._log_to_task_directory("ERROR", f"步骤执行失败: {step_name} - {error_msg}", step_name)
  151. raise
  152. async def execute_complete_workflow(self) -> Dict[str, Any]:
  153. """执行完整工作流"""
  154. try:
  155. # 确保任务目录存在
  156. if not self._ensure_task_directory():
  157. raise Exception("无法创建任务目录")
  158. # 开始任务
  159. self.task_manager.update_task_status(self.task_id, 'in_progress')
  160. self.task_manager.record_log(self.task_id, "INFO", "任务开始执行")
  161. # 记录到任务目录日志
  162. self._log_to_task_directory("INFO", "完整工作流任务开始执行")
  163. # 创建工作流编排器
  164. orchestrator = self._create_orchestrator()
  165. # 执行完整工作流
  166. with self._step_execution("complete") as execution_id:
  167. self.task_manager.record_log(self.task_id, "INFO", "开始执行完整工作流", execution_id, "complete")
  168. # 重定向SchemaWorkflowOrchestrator的日志到任务目录
  169. self._redirect_orchestrator_logs(orchestrator)
  170. result = await orchestrator.execute_complete_workflow()
  171. # 写入结果文件
  172. self._write_result_file(result)
  173. self.task_manager.record_log(self.task_id, "INFO", "完整工作流执行完成", execution_id, "complete")
  174. # 更新所有子步骤状态为完成
  175. self._update_all_step_status_for_complete_workflow(result)
  176. # 完成任务
  177. self.task_manager.update_task_status(self.task_id, 'completed')
  178. self.task_manager.record_log(self.task_id, "INFO", "任务执行完成")
  179. # 记录到任务目录日志
  180. self._log_to_task_directory("INFO", "完整工作流任务执行完成")
  181. return {
  182. "success": True,
  183. "task_id": self.task_id,
  184. "execution_mode": "complete",
  185. "result": result
  186. }
  187. except Exception as e:
  188. # 记录错误
  189. error_msg = str(e)
  190. self.task_manager.record_log(self.task_id, "ERROR", f"任务执行失败: {error_msg}")
  191. self.task_manager.update_task_status(self.task_id, 'failed', error_msg)
  192. # 记录到任务目录日志
  193. self._log_to_task_directory("ERROR", f"完整工作流任务执行失败: {error_msg}")
  194. return {
  195. "success": False,
  196. "task_id": self.task_id,
  197. "execution_mode": "complete",
  198. "error": error_msg
  199. }
  200. async def execute_single_step(self, step_name: str) -> Dict[str, Any]:
  201. """执行单个步骤"""
  202. try:
  203. # 确保任务目录存在
  204. if not self._ensure_task_directory():
  205. raise Exception("无法创建任务目录")
  206. # 更新任务状态
  207. self.task_manager.update_task_status(self.task_id, 'in_progress')
  208. # 创建工作流编排器
  209. orchestrator = self._create_orchestrator()
  210. # 重定向SchemaWorkflowOrchestrator的日志到任务目录
  211. self._redirect_orchestrator_logs(orchestrator)
  212. # 执行指定步骤
  213. result = None
  214. with self._step_execution(step_name) as execution_id:
  215. if step_name == "ddl_generation":
  216. await orchestrator._execute_step_1_ddl_md_generation()
  217. result = orchestrator.workflow_state["artifacts"].get("ddl_md_generation", {})
  218. elif step_name == "qa_generation":
  219. await orchestrator._execute_step_2_question_sql_generation()
  220. result = orchestrator.workflow_state["artifacts"].get("question_sql_generation", {})
  221. elif step_name == "sql_validation":
  222. await orchestrator._execute_step_3_sql_validation()
  223. result = orchestrator.workflow_state["artifacts"].get("sql_validation", {})
  224. elif step_name == "training_load":
  225. await orchestrator._execute_step_4_training_data_load()
  226. result = orchestrator.workflow_state["artifacts"].get("training_data_load", {})
  227. else:
  228. raise ValueError(f"不支持的步骤: {step_name}")
  229. # 写入步骤结果文件
  230. self._write_step_result_file(step_name, result)
  231. # 检查是否所有步骤都已完成
  232. self._update_overall_task_status()
  233. return {
  234. "success": True,
  235. "task_id": self.task_id,
  236. "execution_mode": "step",
  237. "step_name": step_name,
  238. "result": result
  239. }
  240. except Exception as e:
  241. # 记录错误
  242. error_msg = str(e)
  243. self.task_manager.record_log(self.task_id, "ERROR", f"步骤执行失败: {step_name} - {error_msg}")
  244. self.task_manager.update_task_status(self.task_id, 'failed', error_msg)
  245. # 记录到任务目录日志
  246. self._log_to_task_directory("ERROR", f"步骤执行失败: {step_name} - {error_msg}", step_name)
  247. return {
  248. "success": False,
  249. "task_id": self.task_id,
  250. "execution_mode": "step",
  251. "step_name": step_name,
  252. "error": error_msg
  253. }
  254. def _write_result_file(self, result: Dict[str, Any]):
  255. """写入完整结果文件"""
  256. try:
  257. task_dir = self.file_manager.get_task_directory(self.task_id)
  258. result_file = task_dir / "task_result.json"
  259. with open(result_file, 'w', encoding='utf-8') as f:
  260. json.dump(result, f, ensure_ascii=False, indent=2, default=str)
  261. except Exception as e:
  262. self.logger.error(f"写入结果文件失败: {e}")
  263. def _write_step_result_file(self, step_name: str, result: Dict[str, Any]):
  264. """写入步骤结果文件"""
  265. try:
  266. task_dir = self.file_manager.get_task_directory(self.task_id)
  267. result_file = task_dir / f"{step_name}_result.json"
  268. with open(result_file, 'w', encoding='utf-8') as f:
  269. json.dump(result, f, ensure_ascii=False, indent=2, default=str)
  270. except Exception as e:
  271. self.logger.error(f"写入步骤结果文件失败: {e}")
  272. def _update_overall_task_status(self):
  273. """更新整体任务状态"""
  274. try:
  275. # 检查所有步骤的完成情况
  276. executions = self.task_manager.get_task_executions(self.task_id)
  277. completed_steps = set()
  278. failed_steps = set()
  279. for execution in executions:
  280. if execution['status'] == 'completed':
  281. completed_steps.add(execution['execution_step'])
  282. elif execution['status'] == 'failed':
  283. failed_steps.add(execution['execution_step'])
  284. # 检查是否有失败的步骤
  285. if failed_steps:
  286. self.task_manager.update_task_status(self.task_id, 'failed')
  287. return
  288. # 检查是否完成了必要步骤
  289. required_steps = {"ddl_generation", "qa_generation"}
  290. if required_steps.issubset(completed_steps):
  291. # 检查是否有可选步骤完成
  292. optional_steps = {"sql_validation", "training_load"}
  293. if completed_steps.intersection(optional_steps):
  294. if len(completed_steps) >= 3:
  295. self.task_manager.update_task_status(self.task_id, 'completed')
  296. else:
  297. self.task_manager.update_task_status(self.task_id, 'partial_completed')
  298. else:
  299. self.task_manager.update_task_status(self.task_id, 'partial_completed')
  300. except Exception as e:
  301. self.logger.error(f"更新任务状态失败: {e}")
  302. def _redirect_orchestrator_logs(self, orchestrator):
  303. """重定向SchemaWorkflowOrchestrator的日志到任务目录"""
  304. if self.task_dir_logger and hasattr(orchestrator, 'logger'):
  305. try:
  306. # 为orchestrator的logger添加任务目录文件处理器
  307. for handler in self.task_dir_logger.handlers:
  308. if isinstance(handler, logging.FileHandler):
  309. orchestrator.logger.addHandler(handler)
  310. break
  311. except Exception as e:
  312. self.logger.error(f"重定向orchestrator日志失败: {e}")
  313. def _update_all_step_status_for_complete_workflow(self, result: Dict[str, Any]):
  314. """完整工作流成功后,更新所有子步骤状态为完成"""
  315. try:
  316. # 定义完整工作流包含的所有步骤
  317. workflow_steps = ["ddl_generation", "qa_generation", "sql_validation", "training_load"]
  318. # 记录日志
  319. self._log_to_task_directory("INFO", "开始更新完整工作流各步骤状态为完成")
  320. # 逐一更新每个步骤的状态为完成
  321. for step_name in workflow_steps:
  322. try:
  323. self.task_manager.update_step_status(self.task_id, step_name, "completed")
  324. self.task_manager.record_log(
  325. self.task_id,
  326. "INFO",
  327. f"完整工作流执行成功,更新步骤状态为完成: {step_name}",
  328. step_name=step_name
  329. )
  330. self._log_to_task_directory("INFO", f"更新步骤状态为完成: {step_name}", step_name)
  331. except Exception as step_error:
  332. self.logger.error(f"更新步骤状态失败 {step_name}: {step_error}")
  333. self._log_to_task_directory("ERROR", f"更新步骤状态失败: {step_name} - {step_error}", step_name)
  334. self._log_to_task_directory("INFO", "完整工作流各步骤状态更新完成")
  335. except Exception as e:
  336. self.logger.error(f"更新完整工作流步骤状态失败: {e}")
  337. self._log_to_task_directory("ERROR", f"更新完整工作流步骤状态失败: {e}")
  338. def cleanup(self):
  339. """清理资源"""
  340. try:
  341. # 清理任务目录日志记录器
  342. if self.task_dir_logger:
  343. for handler in self.task_dir_logger.handlers:
  344. handler.close()
  345. self.task_dir_logger.handlers.clear()
  346. self.task_manager.close_connection()
  347. except Exception as e:
  348. self.logger.error(f"清理资源失败: {e}")
  349. class SimpleWorkflowManager:
  350. """简化的任务工作流管理器"""
  351. def __init__(self):
  352. """初始化工作流管理器"""
  353. self.task_manager = SimpleTaskManager()
  354. self.file_manager = SimpleFileManager()
  355. self.logger = get_data_pipeline_logger("SimpleWorkflowManager")
  356. def create_task(self,
  357. table_list_file: str,
  358. business_context: str,
  359. db_name: str = None,
  360. **kwargs) -> str:
  361. """创建新任务"""
  362. try:
  363. # 验证表清单文件存在
  364. if not os.path.exists(table_list_file):
  365. raise FileNotFoundError(f"表清单文件不存在: {table_list_file}")
  366. # 创建任务(使用app_config中的数据库配置)
  367. task_id = self.task_manager.create_task(
  368. table_list_file=table_list_file,
  369. business_context=business_context,
  370. db_name=db_name,
  371. **kwargs
  372. )
  373. return task_id
  374. except Exception as e:
  375. self.logger.error(f"创建任务失败: {e}")
  376. raise
  377. async def execute_task(self,
  378. task_id: str,
  379. execution_mode: str = "complete",
  380. step_name: Optional[str] = None) -> Dict[str, Any]:
  381. """执行任务"""
  382. executor = None
  383. try:
  384. executor = SimpleWorkflowExecutor(task_id)
  385. if execution_mode == "complete":
  386. return await executor.execute_complete_workflow()
  387. elif execution_mode == "step":
  388. if not step_name:
  389. raise ValueError("步骤执行模式需要指定step_name")
  390. return await executor.execute_single_step(step_name)
  391. else:
  392. raise ValueError(f"不支持的执行模式: {execution_mode}")
  393. finally:
  394. if executor:
  395. executor.cleanup()
  396. def get_task_status(self, task_id: str) -> Optional[Dict[str, Any]]:
  397. """获取任务状态"""
  398. return self.task_manager.get_task(task_id)
  399. def get_task_logs(self, task_id: str, limit: int = 100) -> List[Dict[str, Any]]:
  400. """获取任务日志"""
  401. return self.task_manager.get_task_logs(task_id, limit)
  402. def get_task_files(self, task_id: str) -> List[Dict[str, Any]]:
  403. """获取任务文件列表"""
  404. return self.file_manager.get_task_files(task_id)
  405. def get_task_executions(self, task_id: str) -> List[Dict[str, Any]]:
  406. """获取任务执行记录"""
  407. return self.task_manager.get_task_executions(task_id)
  408. def get_tasks_list(self, **kwargs) -> List[Dict[str, Any]]:
  409. """获取任务列表"""
  410. return self.task_manager.get_tasks_list(**kwargs)
  411. def cleanup(self):
  412. """清理资源"""
  413. try:
  414. self.task_manager.close_connection()
  415. except Exception as e:
  416. self.logger.error(f"清理资源失败: {e}")