task_executor.py 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. #!/usr/bin/env python3
  2. """
  3. Data Pipeline 独立任务执行器
  4. 专门用于subprocess调用,执行data pipeline任务
  5. """
  6. import sys
  7. import asyncio
  8. import argparse
  9. import json
  10. from pathlib import Path
  11. # 确保能够导入项目模块
  12. sys.path.insert(0, str(Path(__file__).parent.parent))
  13. from data_pipeline.api.simple_workflow import SimpleWorkflowExecutor
  14. def main():
  15. """主执行函数"""
  16. parser = argparse.ArgumentParser(description='Data Pipeline 任务执行器')
  17. parser.add_argument('--task-id', required=True, help='任务ID')
  18. parser.add_argument('--execution-mode', default='complete', choices=['complete', 'step'], help='执行模式')
  19. parser.add_argument('--step-name', help='步骤名称(当execution-mode=step时必需)')
  20. # 新增:Vector表管理参数
  21. parser.add_argument('--backup-vector-tables', action='store_true', help='备份vector表数据')
  22. parser.add_argument('--truncate-vector-tables', action='store_true', help='清空vector表数据(自动启用备份)')
  23. parser.add_argument('--skip-training', action='store_true', help='跳过训练文件处理,仅执行Vector表管理')
  24. args = parser.parse_args()
  25. # 初始化日志系统(不需要,使用独立的日志系统)
  26. pass
  27. # 验证参数
  28. if args.execution_mode == 'step' and not args.step_name:
  29. print("错误: 步骤执行模式需要指定--step-name参数", file=sys.stderr)
  30. sys.exit(1)
  31. try:
  32. # 传递新参数到execute_task
  33. result = asyncio.run(execute_task(
  34. args.task_id,
  35. args.execution_mode,
  36. args.step_name,
  37. args.backup_vector_tables,
  38. args.truncate_vector_tables,
  39. args.skip_training
  40. ))
  41. # 输出结果到stdout(供父进程读取)
  42. print(json.dumps(result, ensure_ascii=False, default=str))
  43. # 设置退出码
  44. sys.exit(0 if result.get('success', False) else 1)
  45. except Exception as e:
  46. error_result = {
  47. "success": False,
  48. "error": str(e),
  49. "task_id": args.task_id,
  50. "execution_mode": args.execution_mode
  51. }
  52. print(json.dumps(error_result, ensure_ascii=False), file=sys.stderr)
  53. sys.exit(1)
  54. async def execute_task(task_id: str, execution_mode: str, step_name: str = None,
  55. backup_vector_tables: bool = False, truncate_vector_tables: bool = False,
  56. skip_training: bool = False):
  57. """执行任务的异步函数"""
  58. executor = None
  59. try:
  60. executor = SimpleWorkflowExecutor(task_id, backup_vector_tables, truncate_vector_tables, skip_training)
  61. if execution_mode == "complete":
  62. return await executor.execute_complete_workflow()
  63. elif execution_mode == "step":
  64. return await executor.execute_single_step(step_name)
  65. else:
  66. raise ValueError(f"不支持的执行模式: {execution_mode}")
  67. finally:
  68. if executor:
  69. executor.cleanup()
  70. if __name__ == "__main__":
  71. main()