12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 |
- #!/usr/bin/env python3
- """
- Data Pipeline 独立任务执行器
- 专门用于subprocess调用,执行data pipeline任务
- """
- import sys
- import asyncio
- import argparse
- import json
- from pathlib import Path
- # 确保能够导入项目模块
- sys.path.insert(0, str(Path(__file__).parent.parent))
- from data_pipeline.api.simple_workflow import SimpleWorkflowExecutor
- def main():
- """主执行函数"""
- parser = argparse.ArgumentParser(description='Data Pipeline 任务执行器')
- parser.add_argument('--task-id', required=True, help='任务ID')
- parser.add_argument('--execution-mode', default='complete', choices=['complete', 'step'], help='执行模式')
- parser.add_argument('--step-name', help='步骤名称(当execution-mode=step时必需)')
-
- # 新增:Vector表管理参数
- parser.add_argument('--backup-vector-tables', action='store_true', help='备份vector表数据')
- parser.add_argument('--truncate-vector-tables', action='store_true', help='清空vector表数据(自动启用备份)')
- parser.add_argument('--skip-training', action='store_true', help='跳过训练文件处理,仅执行Vector表管理')
-
- args = parser.parse_args()
-
- # 初始化日志系统(不需要,使用独立的日志系统)
- pass
-
- # 验证参数
- if args.execution_mode == 'step' and not args.step_name:
- print("错误: 步骤执行模式需要指定--step-name参数", file=sys.stderr)
- sys.exit(1)
-
- try:
- # 传递新参数到execute_task
- result = asyncio.run(execute_task(
- args.task_id,
- args.execution_mode,
- args.step_name,
- args.backup_vector_tables,
- args.truncate_vector_tables,
- args.skip_training
- ))
-
- # 输出结果到stdout(供父进程读取)
- print(json.dumps(result, ensure_ascii=False, default=str))
-
- # 设置退出码
- sys.exit(0 if result.get('success', False) else 1)
-
- except Exception as e:
- error_result = {
- "success": False,
- "error": str(e),
- "task_id": args.task_id,
- "execution_mode": args.execution_mode
- }
- print(json.dumps(error_result, ensure_ascii=False), file=sys.stderr)
- sys.exit(1)
- async def execute_task(task_id: str, execution_mode: str, step_name: str = None,
- backup_vector_tables: bool = False, truncate_vector_tables: bool = False,
- skip_training: bool = False):
- """执行任务的异步函数"""
- executor = None
- try:
- executor = SimpleWorkflowExecutor(task_id, backup_vector_tables, truncate_vector_tables, skip_training)
-
- if execution_mode == "complete":
- return await executor.execute_complete_workflow()
- elif execution_mode == "step":
- return await executor.execute_single_step(step_name)
- else:
- raise ValueError(f"不支持的执行模式: {execution_mode}")
-
- finally:
- if executor:
- executor.cleanup()
- if __name__ == "__main__":
- main()
|