task_executor.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. #!/usr/bin/env python3
  2. """
  3. Data Pipeline 独立任务执行器
  4. 专门用于subprocess调用,执行data pipeline任务
  5. """
  6. import sys
  7. import asyncio
  8. import argparse
  9. import json
  10. from pathlib import Path
  11. # 确保能够导入项目模块
  12. sys.path.insert(0, str(Path(__file__).parent.parent))
  13. from data_pipeline.api.simple_workflow import SimpleWorkflowExecutor
  14. from core.logging import initialize_logging
  15. def main():
  16. """主执行函数"""
  17. parser = argparse.ArgumentParser(description='Data Pipeline 任务执行器')
  18. parser.add_argument('--task-id', required=True, help='任务ID')
  19. parser.add_argument('--execution-mode', default='complete', choices=['complete', 'step'], help='执行模式')
  20. parser.add_argument('--step-name', help='步骤名称(当execution-mode=step时必需)')
  21. args = parser.parse_args()
  22. # 初始化日志系统
  23. initialize_logging()
  24. # 验证参数
  25. if args.execution_mode == 'step' and not args.step_name:
  26. print("错误: 步骤执行模式需要指定--step-name参数", file=sys.stderr)
  27. sys.exit(1)
  28. try:
  29. # 执行任务
  30. result = asyncio.run(execute_task(args.task_id, args.execution_mode, args.step_name))
  31. # 输出结果到stdout(供父进程读取)
  32. print(json.dumps(result, ensure_ascii=False, default=str))
  33. # 设置退出码
  34. sys.exit(0 if result.get('success', False) else 1)
  35. except Exception as e:
  36. error_result = {
  37. "success": False,
  38. "error": str(e),
  39. "task_id": args.task_id,
  40. "execution_mode": args.execution_mode
  41. }
  42. print(json.dumps(error_result, ensure_ascii=False), file=sys.stderr)
  43. sys.exit(1)
  44. async def execute_task(task_id: str, execution_mode: str, step_name: str = None):
  45. """执行任务的异步函数"""
  46. executor = None
  47. try:
  48. executor = SimpleWorkflowExecutor(task_id)
  49. if execution_mode == "complete":
  50. return await executor.execute_complete_workflow()
  51. elif execution_mode == "step":
  52. return await executor.execute_single_step(step_name)
  53. else:
  54. raise ValueError(f"不支持的执行模式: {execution_mode}")
  55. finally:
  56. if executor:
  57. executor.cleanup()
  58. if __name__ == "__main__":
  59. main()