|
@@ -0,0 +1,1136 @@
|
|
|
|
+# Data Pipeline API 详细设计文档
|
|
|
|
+
|
|
|
|
+## 项目概述
|
|
|
|
+
|
|
|
|
+本文档是基于概要设计文档和现有代码结构,对Data Pipeline API系统的详细技术实现设计。该系统将为Web UI提供完整的数据管道调度、执行监控和日志管理功能。
|
|
|
|
+
|
|
|
|
+## 核心需求分析
|
|
|
|
+
|
|
|
|
+### 1. 业务需求
|
|
|
|
+- **API调度执行**:通过REST API调度执行 `./data_pipeline/schema_workflow.py`
|
|
|
|
+- **执行监控**:实时查看任务执行状态和进度
|
|
|
|
+- **日志集中管理**:所有日志写入任务特定的子目录
|
|
|
|
+- **步骤控制**:支持通过参数控制执行特定步骤
|
|
|
|
+- **数据库日志记录**:关键步骤信息写入PostgreSQL数据库
|
|
|
|
+
|
|
|
|
+### 2. 技术约束
|
|
|
|
+- 复用现有的 `SchemaWorkflowOrchestrator` 架构
|
|
|
|
+- 集成现有的日志系统 (`core.logging`)
|
|
|
|
+- 使用现有的Flask应用 (`citu_app.py`) 作为API承载
|
|
|
|
+- 保持与现有数据库配置的兼容性
|
|
|
|
+
|
|
|
|
+## 系统架构设计
|
|
|
|
+
|
|
|
|
+### 1. 整体架构
|
|
|
|
+
|
|
|
|
+```
|
|
|
|
+┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐
|
|
|
|
+│ Web Frontend │ │ Flask API │ │ Schema Workflow │
|
|
|
|
+│ │ ─→ │ (citu_app.py) │ ─→ │ (subprocess) │
|
|
|
|
+│ - 任务创建表单 │ │ - 任务调度 │ │ - DDL生成 │
|
|
|
|
+│ - 进度监控界面 │ │ - 状态查询 │ │ - Q&A生成 │
|
|
|
|
+│ - 日志查看器 │ │ - 日志API │ │ - SQL验证 │
|
|
|
|
+│ - 文件管理器 │ │ - 文件管理 │ │ - 训练数据加载 │
|
|
|
|
+└─────────────────────┘ └─────────────────────┘ └─────────────────────┘
|
|
|
|
+ │ │
|
|
|
|
+ ▼ ▼
|
|
|
|
+ ┌─────────────────────┐ ┌─────────────────────┐
|
|
|
|
+ │ PostgreSQL DB │ │ File System │
|
|
|
|
+ │ - 任务状态表 │ │ - 任务目录 │
|
|
|
|
+ │ - 日志记录表 │ │ - 输出文件 │
|
|
|
|
+ │ - 文件输出表 │ │ - 日志文件 │
|
|
|
|
+ └─────────────────────┘ └─────────────────────┘
|
|
|
|
+```
|
|
|
|
+
|
|
|
|
+### 2. 进程分离设计
|
|
|
|
+
|
|
|
|
+```
|
|
|
|
+HTTP Request ──┐
|
|
|
|
+ │
|
|
|
|
+ ▼
|
|
|
|
+ ┌─────────────┐ subprocess.Popen ┌──────────────────┐
|
|
|
|
+ │ Flask API │ ──────────────────────→ │ task_executor.py │
|
|
|
|
+ │ Process │ │ Process │
|
|
|
|
+ │ │ Database Bridge │ │
|
|
|
|
+ │ - 任务调度 │ ←─────────────────────→ │ - SimpleWorkflow │
|
|
|
|
+ │ - 状态查询 │ │ - 进度更新 │
|
|
|
|
+ │ - 文件管理 │ │ - 双日志记录 │
|
|
|
|
+ └─────────────┘ └──────────────────┘
|
|
|
|
+ │ │
|
|
|
|
+ ▼ ▼
|
|
|
|
+ 立即返回task_id 独立执行工作流+日志到任务目录
|
|
|
|
+```
|
|
|
|
+
|
|
|
|
+## 数据库设计详细说明
|
|
|
|
+
|
|
|
|
+### 1. 表结构设计
|
|
|
|
+
|
|
|
|
+#### 任务主表 (data_pipeline_tasks)
|
|
|
|
+```sql
|
|
|
|
+CREATE TABLE data_pipeline_tasks (
|
|
|
|
+ -- 主键:时间戳格式的任务ID
|
|
|
|
+ id VARCHAR(32) PRIMARY KEY, -- 'task_20250627_143052'
|
|
|
|
+
|
|
|
|
+ -- 任务基本信息
|
|
|
|
+ task_type VARCHAR(50) NOT NULL DEFAULT 'data_workflow',
|
|
|
|
+ status VARCHAR(20) NOT NULL DEFAULT 'pending', -- pending/in_progress/partial_completed/completed/failed
|
|
|
|
+
|
|
|
|
+ -- 配置和结果(JSON格式)
|
|
|
|
+ parameters JSONB NOT NULL, -- 任务配置参数
|
|
|
|
+ result JSONB, -- 最终执行结果
|
|
|
|
+
|
|
|
|
+ -- 错误处理
|
|
|
|
+ error_message TEXT, -- 错误详细信息
|
|
|
|
+
|
|
|
|
+ -- 步骤状态跟踪
|
|
|
|
+ step_status JSONB DEFAULT '{ -- 各步骤状态
|
|
|
|
+ "ddl_generation": "pending",
|
|
|
|
+ "qa_generation": "pending",
|
|
|
|
+ "sql_validation": "pending",
|
|
|
|
+ "training_load": "pending"
|
|
|
|
+ }',
|
|
|
|
+
|
|
|
|
+ -- 时间戳
|
|
|
|
+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
|
+ started_at TIMESTAMP,
|
|
|
|
+ completed_at TIMESTAMP,
|
|
|
|
+
|
|
|
|
+ -- 创建者信息
|
|
|
|
+ created_by VARCHAR(50) DEFAULT 'api', -- 'api', 'manual', 'system'
|
|
|
|
+
|
|
|
|
+ -- 输出目录
|
|
|
|
+ output_directory TEXT, -- 任务输出目录路径
|
|
|
|
+
|
|
|
|
+ -- 索引字段
|
|
|
|
+ db_name VARCHAR(100), -- 数据库名称(便于筛选)
|
|
|
|
+ business_context TEXT -- 业务上下文(便于搜索)
|
|
|
|
+);
|
|
|
|
+
|
|
|
|
+-- 创建索引
|
|
|
|
+CREATE INDEX idx_tasks_status ON data_pipeline_tasks(status);
|
|
|
|
+CREATE INDEX idx_tasks_created_at ON data_pipeline_tasks(created_at DESC);
|
|
|
|
+CREATE INDEX idx_tasks_db_name ON data_pipeline_tasks(db_name);
|
|
|
|
+CREATE INDEX idx_tasks_created_by ON data_pipeline_tasks(created_by);
|
|
|
|
+```
|
|
|
|
+
|
|
|
|
+#### 任务执行记录表 (data_pipeline_task_executions)
|
|
|
|
+```sql
|
|
|
|
+CREATE TABLE data_pipeline_task_executions (
|
|
|
|
+ id SERIAL PRIMARY KEY,
|
|
|
|
+ task_id VARCHAR(32) REFERENCES data_pipeline_tasks(id) ON DELETE CASCADE,
|
|
|
|
+ execution_step VARCHAR(50) NOT NULL, -- 'ddl_generation', 'qa_generation', 'sql_validation', 'training_load', 'complete'
|
|
|
|
+ status VARCHAR(20) NOT NULL, -- 'running', 'completed', 'failed'
|
|
|
|
+ started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
|
+ completed_at TIMESTAMP,
|
|
|
|
+ error_message TEXT,
|
|
|
|
+ execution_result JSONB, -- 步骤执行结果
|
|
|
|
+ execution_id VARCHAR(100) UNIQUE, -- {task_id}_step_{step_name}_exec_{timestamp}
|
|
|
|
+ force_executed BOOLEAN DEFAULT FALSE, -- 是否强制执行
|
|
|
|
+ files_cleaned BOOLEAN DEFAULT FALSE, -- 是否清理了旧文件
|
|
|
|
+ duration_seconds INTEGER -- 执行时长(秒)
|
|
|
|
+);
|
|
|
|
+
|
|
|
|
+-- 创建索引
|
|
|
|
+CREATE INDEX idx_executions_task_id ON data_pipeline_task_executions(task_id);
|
|
|
|
+CREATE INDEX idx_executions_step ON data_pipeline_task_executions(execution_step);
|
|
|
|
+CREATE INDEX idx_executions_status ON data_pipeline_task_executions(status);
|
|
|
|
+CREATE INDEX idx_executions_started_at ON data_pipeline_task_executions(started_at DESC);
|
|
|
|
+```
|
|
|
|
+
|
|
|
|
+#### 任务日志表 (data_pipeline_task_logs)
|
|
|
|
+```sql
|
|
|
|
+CREATE TABLE data_pipeline_task_logs (
|
|
|
|
+ id SERIAL PRIMARY KEY,
|
|
|
|
+ task_id VARCHAR(32) REFERENCES data_pipeline_tasks(id) ON DELETE CASCADE,
|
|
|
|
+ execution_id VARCHAR(100) REFERENCES data_pipeline_task_executions(execution_id),
|
|
|
|
+
|
|
|
|
+ -- 日志内容
|
|
|
|
+ log_level VARCHAR(10) NOT NULL, -- 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'
|
|
|
|
+ message TEXT NOT NULL, -- 日志消息内容
|
|
|
|
+
|
|
|
|
+ -- 上下文信息
|
|
|
|
+ step_name VARCHAR(50), -- 执行步骤名称
|
|
|
|
+ module_name VARCHAR(100), -- 模块名称
|
|
|
|
+ function_name VARCHAR(100), -- 函数名称
|
|
|
|
+
|
|
|
|
+ -- 时间戳
|
|
|
|
+ timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
|
+
|
|
|
|
+ -- 额外信息(JSON格式)
|
|
|
|
+ extra_data JSONB DEFAULT '{}' -- 额外的结构化信息
|
|
|
|
+);
|
|
|
|
+
|
|
|
|
+-- 创建索引
|
|
|
|
+CREATE INDEX idx_logs_task_id ON data_pipeline_task_logs(task_id);
|
|
|
|
+CREATE INDEX idx_logs_execution_id ON data_pipeline_task_logs(execution_id);
|
|
|
|
+CREATE INDEX idx_logs_timestamp ON data_pipeline_task_logs(timestamp DESC);
|
|
|
|
+CREATE INDEX idx_logs_level ON data_pipeline_task_logs(log_level);
|
|
|
|
+CREATE INDEX idx_logs_step ON data_pipeline_task_logs(step_name);
|
|
|
|
+```
|
|
|
|
+
|
|
|
|
+#### 任务输出文件表 (data_pipeline_task_outputs)
|
|
|
|
+```sql
|
|
|
|
+CREATE TABLE data_pipeline_task_outputs (
|
|
|
|
+ id SERIAL PRIMARY KEY,
|
|
|
|
+ task_id VARCHAR(32) REFERENCES data_pipeline_tasks(id) ON DELETE CASCADE,
|
|
|
|
+ execution_id VARCHAR(100) REFERENCES data_pipeline_task_executions(execution_id),
|
|
|
|
+
|
|
|
|
+ -- 文件信息
|
|
|
|
+ file_type VARCHAR(50) NOT NULL, -- 'ddl', 'md', 'json', 'log', 'report'
|
|
|
|
+ file_name VARCHAR(255) NOT NULL, -- 文件名
|
|
|
|
+ file_path TEXT NOT NULL, -- 相对路径
|
|
|
|
+ file_size BIGINT DEFAULT 0, -- 文件大小(字节)
|
|
|
|
+
|
|
|
|
+ -- 文件内容摘要
|
|
|
|
+ content_hash VARCHAR(64), -- 文件内容hash
|
|
|
|
+ description TEXT, -- 文件描述
|
|
|
|
+
|
|
|
|
+ -- 时间戳
|
|
|
|
+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
|
+ modified_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
|
+
|
|
|
|
+ -- 状态
|
|
|
|
+ is_primary BOOLEAN DEFAULT FALSE, -- 是否为主要输出文件
|
|
|
|
+ is_downloadable BOOLEAN DEFAULT TRUE -- 是否可下载
|
|
|
|
+);
|
|
|
|
+
|
|
|
|
+-- 创建索引
|
|
|
|
+CREATE INDEX idx_outputs_task_id ON data_pipeline_task_outputs(task_id);
|
|
|
|
+CREATE INDEX idx_outputs_execution_id ON data_pipeline_task_outputs(execution_id);
|
|
|
|
+CREATE INDEX idx_outputs_file_type ON data_pipeline_task_outputs(file_type);
|
|
|
|
+CREATE INDEX idx_outputs_primary ON data_pipeline_task_outputs(is_primary) WHERE is_primary = TRUE;
|
|
|
|
+```
|
|
|
|
+
|
|
|
|
+### 2. 数据库操作类设计
|
|
|
|
+
|
|
|
|
+```python
|
|
|
|
+# data_pipeline/api/simple_db_manager.py
|
|
|
|
+class SimpleTaskManager:
|
|
|
|
+ """简化的数据管道任务数据库管理器"""
|
|
|
|
+
|
|
|
|
+ def __init__(self):
|
|
|
|
+ self.logger = get_data_pipeline_logger("SimpleTaskManager")
|
|
|
|
+ self._connection = None
|
|
|
|
+ self._connect_to_pgvector()
|
|
|
|
+
|
|
|
|
+ def create_task(self, db_connection: str, table_list_file: str,
|
|
|
|
+ business_context: str, **kwargs) -> str:
|
|
|
|
+ """创建新任务记录,返回task_id"""
|
|
|
|
+
|
|
|
|
+ def update_task_status(self, task_id: str, status: str,
|
|
|
|
+ error_message: str = None) -> bool:
|
|
|
|
+ """更新任务状态"""
|
|
|
|
+
|
|
|
|
+ def update_step_status(self, task_id: str, step_name: str,
|
|
|
|
+ status: str) -> bool:
|
|
|
|
+ """更新步骤状态"""
|
|
|
|
+
|
|
|
|
+ def get_task(self, task_id: str) -> dict:
|
|
|
|
+ """获取任务详情"""
|
|
|
|
+
|
|
|
|
+ def get_tasks_list(self, limit: int = 50, status: str = None) -> list:
|
|
|
|
+ """获取任务列表"""
|
|
|
|
+
|
|
|
|
+ def create_execution(self, task_id: str, step_name: str) -> str:
|
|
|
|
+ """创建执行记录,返回execution_id"""
|
|
|
|
+
|
|
|
|
+ def complete_execution(self, execution_id: str, status: str,
|
|
|
|
+ error_message: str = None) -> bool:
|
|
|
|
+ """完成执行记录"""
|
|
|
|
+
|
|
|
|
+ def record_log(self, task_id: str, level: str, message: str,
|
|
|
|
+ execution_id: str = None, step_name: str = None) -> bool:
|
|
|
|
+ """记录任务日志"""
|
|
|
|
+
|
|
|
|
+ def get_task_logs(self, task_id: str, limit: int = 100) -> list:
|
|
|
|
+ """获取任务日志"""
|
|
|
|
+
|
|
|
|
+ def get_task_outputs(self, task_id: str) -> list:
|
|
|
|
+ """获取任务输出文件列表"""
|
|
|
|
+```
|
|
|
|
+
|
|
|
|
+## API接口详细设计
|
|
|
|
+
|
|
|
|
+### 1. API路由设计
|
|
|
|
+
|
|
|
|
+所有API都在 `citu_app.py` 中实现,路由前缀为 `/api/v0/data_pipeline/`
|
|
|
|
+
|
|
|
|
+```python
|
|
|
|
+# citu_app.py 中添加的路由
|
|
|
|
+@app.flask_app.route('/api/v0/data_pipeline/tasks', methods=['POST'])
|
|
|
|
+def create_data_pipeline_task():
|
|
|
|
+ """创建数据管道任务"""
|
|
|
|
+
|
|
|
|
+@app.flask_app.route('/api/v0/data_pipeline/tasks', methods=['GET'])
|
|
|
|
+def get_data_pipeline_tasks():
|
|
|
|
+ """获取任务列表"""
|
|
|
|
+
|
|
|
|
+@app.flask_app.route('/api/v0/data_pipeline/tasks/<task_id>', methods=['GET'])
|
|
|
|
+def get_data_pipeline_task(task_id):
|
|
|
|
+ """获取单个任务详情"""
|
|
|
|
+
|
|
|
|
+@app.flask_app.route('/api/v0/data_pipeline/tasks/active', methods=['GET'])
|
|
|
|
+def get_active_data_pipeline_task():
|
|
|
|
+ """获取当前活跃任务"""
|
|
|
|
+
|
|
|
|
+@app.flask_app.route('/api/v0/data_pipeline/tasks/<task_id>/logs', methods=['GET'])
|
|
|
|
+def get_data_pipeline_task_logs(task_id):
|
|
|
|
+ """获取任务日志"""
|
|
|
|
+
|
|
|
|
+@app.flask_app.route('/api/v0/data_pipeline/tasks/<task_id>/files', methods=['GET'])
|
|
|
|
+def get_data_pipeline_task_files(task_id):
|
|
|
|
+ """获取任务输出文件列表"""
|
|
|
|
+
|
|
|
|
+@app.flask_app.route('/api/v0/data_pipeline/tasks/<task_id>/files/download/<filename>', methods=['GET'])
|
|
|
|
+def download_data_pipeline_task_file(task_id, filename):
|
|
|
|
+ """下载任务输出文件"""
|
|
|
|
+
|
|
|
|
+@app.flask_app.route('/api/v0/data_pipeline/tasks/<task_id>', methods=['DELETE'])
|
|
|
|
+def delete_data_pipeline_task(task_id):
|
|
|
|
+ """删除任务(清理)"""
|
|
|
|
+```
|
|
|
|
+
|
|
|
|
+### 2. API接口实现详情
|
|
|
|
+
|
|
|
|
+#### 2.1 创建任务接口
|
|
|
|
+
|
|
|
|
+```python
|
|
|
|
+@app.flask_app.route('/api/v0/data_pipeline/tasks', methods=['POST'])
|
|
|
|
+def create_data_pipeline_task():
|
|
|
|
+ """
|
|
|
|
+ 创建数据管道任务
|
|
|
|
+
|
|
|
|
+ Request Body:
|
|
|
|
+ {
|
|
|
|
+ "task_type": "complete_workflow",
|
|
|
|
+ "parameters": {
|
|
|
|
+ "db_connection": "postgresql://...",
|
|
|
|
+ "table_list_file": "tables.txt",
|
|
|
|
+ "business_context": "业务描述",
|
|
|
|
+ "output_dir": "./data_pipeline/training_data/",
|
|
|
|
+ "execution_mode": "complete",
|
|
|
|
+ "single_step": null
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ """
|
|
|
|
+ try:
|
|
|
|
+ # 1. 参数验证
|
|
|
|
+ req_data = request.get_json()
|
|
|
|
+ if not req_data:
|
|
|
|
+ return jsonify(bad_request_response("请求体不能为空")), 400
|
|
|
|
+
|
|
|
|
+ task_type = req_data.get('task_type', 'complete_workflow')
|
|
|
|
+ parameters = req_data.get('parameters', {})
|
|
|
|
+
|
|
|
|
+ # 验证必需参数
|
|
|
|
+ required_params = ['db_connection', 'table_list_file', 'business_context']
|
|
|
|
+ missing_params = [p for p in required_params if not parameters.get(p)]
|
|
|
|
+ if missing_params:
|
|
|
|
+ return jsonify(bad_request_response(
|
|
|
|
+ f"缺少必需参数: {', '.join(missing_params)}",
|
|
|
|
+ missing_params=missing_params
|
|
|
|
+ )), 400
|
|
|
|
+
|
|
|
|
+ # 验证执行模式参数
|
|
|
|
+ execution_mode = parameters.get('execution_mode', 'complete')
|
|
|
|
+ single_step = parameters.get('single_step')
|
|
|
|
+
|
|
|
|
+ if execution_mode not in ['complete', 'single']:
|
|
|
|
+ return jsonify(bad_request_response("execution_mode必须是complete或single")), 400
|
|
|
|
+
|
|
|
|
+ if execution_mode == 'single':
|
|
|
|
+ if not single_step or single_step not in [1, 2, 3, 4]:
|
|
|
|
+ return jsonify(bad_request_response("单步模式下single_step必须是1、2、3、4中的一个")), 400
|
|
|
|
+ elif execution_mode == 'complete' and single_step:
|
|
|
|
+ return jsonify(bad_request_response("完整模式下不应提供single_step参数")), 400
|
|
|
|
+
|
|
|
|
+ # 2. 并发检查 - 简化版本(依赖SimpleWorkflowManager)
|
|
|
|
+ workflow_manager = SimpleWorkflowManager()
|
|
|
|
+
|
|
|
|
+ # 3. 创建任务记录(返回task_id)
|
|
|
|
+ task_id = workflow_manager.create_task(
|
|
|
|
+ db_connection=parameters['db_connection'],
|
|
|
|
+ table_list_file=parameters['table_list_file'],
|
|
|
|
+ business_context=parameters['business_context'],
|
|
|
|
+ **{k: v for k, v in parameters.items()
|
|
|
|
+ if k not in ['db_connection', 'table_list_file', 'business_context']}
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ # 4. 启动后台进程
|
|
|
|
+ import subprocess
|
|
|
|
+ import sys
|
|
|
|
+ from pathlib import Path
|
|
|
|
+
|
|
|
|
+ # 构建任务执行器命令
|
|
|
|
+ cmd_args = [
|
|
|
|
+ sys.executable,
|
|
|
|
+ str(Path(__file__).parent / "data_pipeline" / "task_executor.py"),
|
|
|
|
+ '--task-id', task_id,
|
|
|
|
+ '--execution-mode', 'complete'
|
|
|
|
+ ]
|
|
|
|
+
|
|
|
|
+ # 如果是单步执行,添加步骤参数
|
|
|
|
+ if execution_mode == 'step' and single_step:
|
|
|
|
+ cmd_args.extend(['--step-name', f'step_{single_step}'])
|
|
|
|
+
|
|
|
|
+ # 启动后台进程
|
|
|
|
+ try:
|
|
|
|
+ process = subprocess.Popen(
|
|
|
|
+ cmd_args,
|
|
|
|
+ stdout=subprocess.PIPE,
|
|
|
|
+ stderr=subprocess.PIPE,
|
|
|
|
+ text=True,
|
|
|
|
+ cwd=Path(__file__).parent
|
|
|
|
+ )
|
|
|
|
+ logger.info(f"启动任务进程: PID={process.pid}, task_id={task_id}")
|
|
|
|
+ except Exception as e:
|
|
|
|
+ # 清理任务记录
|
|
|
|
+ workflow_manager.cleanup()
|
|
|
|
+ return jsonify(internal_error_response(f"启动后台进程失败: {str(e)}")), 500
|
|
|
|
+
|
|
|
|
+ # 5. 返回成功响应
|
|
|
|
+
|
|
|
|
+ # 启动进程
|
|
|
|
+ try:
|
|
|
|
+ log_file_path = os.path.join(task_dir, 'data_pipeline.log')
|
|
|
|
+ process = subprocess.Popen(
|
|
|
|
+ cmd_args,
|
|
|
|
+ stdout=open(log_file_path, 'w', encoding='utf-8'),
|
|
|
|
+ stderr=subprocess.STDOUT,
|
|
|
|
+ cwd=os.getcwd(),
|
|
|
|
+ start_new_session=True
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ logger.info(f"启动后台任务: {task_id}, PID: {process.pid}")
|
|
|
|
+
|
|
|
|
+ except Exception as e:
|
|
|
|
+ # 清理资源
|
|
|
|
+ task_manager.update_task_status(task_id, 'failed', error_message=f"启动进程失败: {str(e)}")
|
|
|
|
+ shutil.rmtree(task_dir, ignore_errors=True)
|
|
|
|
+ return jsonify(internal_error_response(f"启动任务失败: {str(e)}")), 500
|
|
|
|
+
|
|
|
|
+ # 9. 返回成功响应
|
|
|
|
+ return jsonify(success_response(
|
|
|
|
+ message="任务创建成功",
|
|
|
|
+ data={
|
|
|
|
+ "task_id": task_id,
|
|
|
|
+ "status": "pending",
|
|
|
|
+ "created_at": datetime.now().isoformat(),
|
|
|
|
+ "output_directory": task_dir
|
|
|
|
+ }
|
|
|
|
+ )), 201
|
|
|
|
+
|
|
|
|
+ except Exception as e:
|
|
|
|
+ logger.exception(f"创建任务失败: {str(e)}")
|
|
|
|
+ return jsonify(internal_error_response("创建任务失败")), 500
|
|
|
|
+```
|
|
|
|
+
|
|
|
|
+#### 2.2 获取任务详情接口
|
|
|
|
+
|
|
|
|
+```python
|
|
|
|
+@app.flask_app.route('/api/v0/data_pipeline/tasks/<task_id>', methods=['GET'])
|
|
|
|
+def get_data_pipeline_task(task_id):
|
|
|
|
+ """
|
|
|
|
+ 获取单个任务详情
|
|
|
|
+
|
|
|
|
+ Response:
|
|
|
|
+ {
|
|
|
|
+ "success": true,
|
|
|
|
+ "data": {
|
|
|
|
+ "task_id": "task_20250627_143052",
|
|
|
|
+ "task_type": "complete_workflow",
|
|
|
|
+ "status": "running",
|
|
|
|
+ "progress": 45,
|
|
|
|
+ "current_step": "question_sql_generation",
|
|
|
|
+ "parameters": {...},
|
|
|
|
+ "result": {...},
|
|
|
|
+ "error_message": null,
|
|
|
|
+ "step_details": [...],
|
|
|
|
+ "created_at": "2025-06-27T14:30:52",
|
|
|
|
+ "started_at": "2025-06-27T14:30:53",
|
|
|
|
+ "completed_at": null,
|
|
|
|
+ "duration": 125.5,
|
|
|
|
+ "output_directory": "./data_pipeline/training_data/task_20250627_143052"
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ """
|
|
|
|
+ try:
|
|
|
|
+ # 参数验证
|
|
|
|
+ if not task_id or not task_id.startswith('task_'):
|
|
|
|
+ return jsonify(bad_request_response("无效的任务ID格式")), 400
|
|
|
|
+
|
|
|
|
+ workflow_manager = SimpleWorkflowManager()
|
|
|
|
+ task_data = workflow_manager.get_task_status(task_id)
|
|
|
|
+
|
|
|
|
+ if not task_data:
|
|
|
|
+ return jsonify(not_found_response(f"任务不存在: {task_id}")), 404
|
|
|
|
+
|
|
|
|
+ # 计算执行时长
|
|
|
|
+ duration = None
|
|
|
|
+ if task_data.get('started_at'):
|
|
|
|
+ end_time = task_data.get('completed_at') or datetime.now()
|
|
|
|
+ start_time = task_data['started_at']
|
|
|
|
+ if isinstance(start_time, str):
|
|
|
|
+ start_time = datetime.fromisoformat(start_time)
|
|
|
|
+ if isinstance(end_time, str):
|
|
|
|
+ end_time = datetime.fromisoformat(end_time)
|
|
|
|
+ duration = (end_time - start_time).total_seconds()
|
|
|
|
+
|
|
|
|
+ # 获取步骤详情
|
|
|
|
+ step_details = []
|
|
|
|
+ step_stats = task_data.get('step_stats', {})
|
|
|
|
+
|
|
|
|
+ for step_name in ['ddl_md_generation', 'question_sql_generation', 'sql_validation', 'training_data_load']:
|
|
|
|
+ step_info = step_stats.get(step_name, {})
|
|
|
|
+ step_details.append({
|
|
|
|
+ "step": step_name,
|
|
|
|
+ "status": step_info.get('status', 'pending'),
|
|
|
|
+ "started_at": step_info.get('started_at'),
|
|
|
|
+ "completed_at": step_info.get('completed_at'),
|
|
|
|
+ "duration": step_info.get('duration'),
|
|
|
|
+ "error_message": step_info.get('error_message')
|
|
|
|
+ })
|
|
|
|
+
|
|
|
|
+ response_data = {
|
|
|
|
+ **task_data,
|
|
|
|
+ "duration": duration,
|
|
|
|
+ "step_details": step_details
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return jsonify(success_response("获取任务详情成功", data=response_data))
|
|
|
|
+
|
|
|
|
+ except Exception as e:
|
|
|
|
+ logger.exception(f"获取任务详情失败: {str(e)}")
|
|
|
|
+ return jsonify(internal_error_response("获取任务详情失败")), 500
|
|
|
|
+```
|
|
|
|
+
|
|
|
|
+## Schema Workflow 集成设计
|
|
|
|
+
|
|
|
|
+### 1. 命令行参数扩展
|
|
|
|
+
|
|
|
|
+在现有的 `setup_argument_parser()` 函数中添加新参数:
|
|
|
|
+
|
|
|
|
+```python
|
|
|
|
+def setup_argument_parser():
|
|
|
|
+ """设置命令行参数解析器"""
|
|
|
|
+ parser = argparse.ArgumentParser(
|
|
|
|
+ description="Schema工作流编排器 - 端到端的Schema处理流程",
|
|
|
|
+ formatter_class=argparse.RawDescriptionHelpFormatter
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ # ... 现有参数 ...
|
|
|
|
+
|
|
|
|
+ # 新增API集成参数
|
|
|
|
+ parser.add_argument(
|
|
|
|
+ "--task-id",
|
|
|
|
+ required=False,
|
|
|
|
+ help="任务ID(API调用时提供,手动执行时自动生成)"
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ parser.add_argument(
|
|
|
|
+ "--no-db-tracking",
|
|
|
|
+ action="store_true",
|
|
|
|
+ help="禁用数据库任务追踪(不记录到任务表)"
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ # 新增执行模式参数
|
|
|
|
+ parser.add_argument(
|
|
|
|
+ "--execution-mode",
|
|
|
|
+ choices=['complete', 'single'],
|
|
|
|
+ default='complete',
|
|
|
|
+ help="执行模式:complete=完整工作流,single=单步执行"
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ parser.add_argument(
|
|
|
|
+ "--single-step",
|
|
|
|
+ type=int,
|
|
|
|
+ choices=[1, 2, 3, 4],
|
|
|
|
+ help="单步执行时指定步骤号(1=DDL生成,2=Q&A生成,3=SQL验证,4=训练数据加载)"
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ return parser
|
|
|
|
+```
|
|
|
|
+
|
|
|
|
+### 2. SchemaWorkflowOrchestrator 类修改
|
|
|
|
+
|
|
|
|
+```python
|
|
|
|
+class SchemaWorkflowOrchestrator:
|
|
|
|
+ """端到端的Schema处理编排器 - 完整工作流程"""
|
|
|
|
+
|
|
|
|
+ def __init__(self,
|
|
|
|
+ db_connection: str,
|
|
|
|
+ table_list_file: str,
|
|
|
|
+ business_context: str,
|
|
|
|
+ output_dir: str = None,
|
|
|
|
+ enable_sql_validation: bool = True,
|
|
|
|
+ enable_llm_repair: bool = True,
|
|
|
|
+ modify_original_file: bool = True,
|
|
|
|
+ enable_training_data_load: bool = True,
|
|
|
|
+ # 新增参数
|
|
|
|
+ task_id: str = None,
|
|
|
|
+ db_logger: 'DatabaseProgressLogger' = None,
|
|
|
|
+ execution_mode: str = 'complete',
|
|
|
|
+ single_step: int = None):
|
|
|
|
+ """
|
|
|
|
+ 初始化Schema工作流编排器
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ # ... 现有参数 ...
|
|
|
|
+ task_id: 任务ID(可选)
|
|
|
|
+ db_logger: 数据库进度记录器(可选)
|
|
|
|
+ execution_mode: 执行模式 ('complete' 或 'single')
|
|
|
|
+ single_step: 单步执行时的步骤号 (1-4)
|
|
|
|
+ """
|
|
|
|
+ # ... 现有初始化代码 ...
|
|
|
|
+
|
|
|
|
+ # 新增属性
|
|
|
|
+ self.task_id = task_id
|
|
|
|
+ self.db_logger = db_logger
|
|
|
|
+ self.execution_mode = execution_mode
|
|
|
|
+ self.single_step = single_step
|
|
|
|
+
|
|
|
|
+ # 如果提供了task_id但没有db_logger,尝试创建一个
|
|
|
|
+ if self.task_id and not self.db_logger:
|
|
|
|
+ try:
|
|
|
|
+ self.db_logger = self._create_db_logger()
|
|
|
|
+ except Exception as e:
|
|
|
|
+ self.logger.warning(f"无法创建数据库记录器: {e}")
|
|
|
|
+
|
|
|
|
+ def _create_db_logger(self):
|
|
|
|
+ """创建数据库进度记录器"""
|
|
|
|
+ from data_pipeline.api.database_logger import DatabaseProgressLogger
|
|
|
|
+ return DatabaseProgressLogger(self.task_id, self.db_connection)
|
|
|
|
+
|
|
|
|
+ def _should_execute_step(self, step_number: int) -> bool:
|
|
|
|
+ """判断是否应该执行指定步骤"""
|
|
|
|
+ if self.execution_mode == 'complete':
|
|
|
|
+ # 完整模式:执行所有步骤
|
|
|
|
+ return True
|
|
|
|
+ elif self.execution_mode == 'single':
|
|
|
|
+ # 单步模式:只执行指定的步骤
|
|
|
|
+ return step_number == self.single_step
|
|
|
|
+ else:
|
|
|
|
+ return False
|
|
|
|
+
|
|
|
|
+ async def execute_complete_workflow(self) -> Dict[str, Any]:
|
|
|
|
+ """执行完整的Schema处理工作流程"""
|
|
|
|
+ self.workflow_state["start_time"] = time.time()
|
|
|
|
+
|
|
|
|
+ # 更新数据库状态为running
|
|
|
|
+ if self.db_logger:
|
|
|
|
+ self.db_logger.update_task_status('running')
|
|
|
|
+ self.db_logger.add_log('INFO', f'开始执行Schema工作流编排', 'workflow_start')
|
|
|
|
+
|
|
|
|
+ self.logger.info("🚀 开始执行Schema工作流编排")
|
|
|
|
+ # ... 现有日志 ...
|
|
|
|
+
|
|
|
|
+ try:
|
|
|
|
+ # 步骤1: 生成DDL和MD文件
|
|
|
|
+ if self._should_execute_step(1):
|
|
|
|
+ await self._execute_step_1_ddl_md_generation()
|
|
|
|
+
|
|
|
|
+ # 步骤2: 生成Question-SQL对
|
|
|
|
+ if self._should_execute_step(2):
|
|
|
|
+ await self._execute_step_2_question_sql_generation()
|
|
|
|
+
|
|
|
|
+ # 步骤3: 验证和修正SQL
|
|
|
|
+ if self._should_execute_step(3):
|
|
|
|
+ await self._execute_step_3_sql_validation()
|
|
|
|
+
|
|
|
|
+ # 步骤4: 训练数据加载
|
|
|
|
+ if self._should_execute_step(4):
|
|
|
|
+ await self._execute_step_4_training_data_load()
|
|
|
|
+
|
|
|
|
+ # 设置结束时间
|
|
|
|
+ self.workflow_state["end_time"] = time.time()
|
|
|
|
+
|
|
|
|
+ # 生成最终报告
|
|
|
|
+ final_report = await self._generate_final_report()
|
|
|
|
+
|
|
|
|
+ # 更新数据库状态为completed
|
|
|
|
+ if self.db_logger:
|
|
|
|
+ self.db_logger.update_task_status('completed', result=final_report)
|
|
|
|
+ self.db_logger.add_log('INFO', '工作流执行完成', 'workflow_complete')
|
|
|
|
+
|
|
|
|
+ self.logger.info("✅ Schema工作流编排完成")
|
|
|
|
+ return final_report
|
|
|
|
+
|
|
|
|
+ except Exception as e:
|
|
|
|
+ self.workflow_state["end_time"] = time.time()
|
|
|
|
+
|
|
|
|
+ # 更新数据库状态为failed
|
|
|
|
+ if self.db_logger:
|
|
|
|
+ self.db_logger.update_task_status('failed', error_message=str(e))
|
|
|
|
+ self.db_logger.add_log('ERROR', f'工作流执行失败: {str(e)}', 'workflow_error')
|
|
|
|
+
|
|
|
|
+ self.logger.exception(f"❌ 工作流程执行失败: {str(e)}")
|
|
|
|
+ error_report = await self._generate_error_report(e)
|
|
|
|
+ return error_report
|
|
|
|
+
|
|
|
|
+ async def _execute_step_1_ddl_md_generation(self):
|
|
|
|
+ """步骤1: 生成DDL和MD文件"""
|
|
|
|
+ self.workflow_state["current_step"] = "ddl_md_generation"
|
|
|
|
+
|
|
|
|
+ # 更新数据库进度
|
|
|
|
+ if self.db_logger:
|
|
|
|
+ self.db_logger.update_progress(10, 'ddl_md_generation')
|
|
|
|
+ self.db_logger.add_log('INFO', 'DDL/MD生成开始', 'ddl_md_generation')
|
|
|
|
+
|
|
|
|
+ # ... 现有执行代码 ...
|
|
|
|
+
|
|
|
|
+ try:
|
|
|
|
+ # ... DDL/MD生成逻辑 ...
|
|
|
|
+
|
|
|
|
+ # 更新进度
|
|
|
|
+ if self.db_logger:
|
|
|
|
+ self.db_logger.update_progress(40, 'ddl_md_generation')
|
|
|
|
+ self.db_logger.add_log('INFO', f'DDL/MD生成完成: 成功处理 {processed_tables} 个表', 'ddl_md_generation')
|
|
|
|
+
|
|
|
|
+ except Exception as e:
|
|
|
|
+ if self.db_logger:
|
|
|
|
+ self.db_logger.add_log('ERROR', f'DDL/MD生成失败: {str(e)}', 'ddl_md_generation')
|
|
|
|
+ raise
|
|
|
|
+
|
|
|
|
+ # 类似地修改其他步骤方法...
|
|
|
|
+```
|
|
|
|
+
|
|
|
|
+### 3. 数据库进度记录器
|
|
|
|
+
|
|
|
|
+```python
|
|
|
|
+# data_pipeline/api/database_logger.py
|
|
|
|
+class DatabaseProgressLogger:
|
|
|
|
+ """数据库进度记录器"""
|
|
|
|
+
|
|
|
|
+ def __init__(self, task_id: str, db_connection_string: str):
|
|
|
|
+ self.task_id = task_id
|
|
|
|
+ self.task_manager = DataPipelineTaskManager(db_connection_string)
|
|
|
|
+ self.logger = get_data_pipeline_logger("DatabaseLogger")
|
|
|
|
+
|
|
|
|
+ def update_task_status(self, status: str, current_step: str = None,
|
|
|
|
+ error_message: str = None, result: dict = None):
|
|
|
|
+ """更新任务状态"""
|
|
|
|
+ try:
|
|
|
|
+ success = self.task_manager.update_task_status(
|
|
|
|
+ self.task_id, status, current_step, error_message
|
|
|
|
+ )
|
|
|
|
+ if result and status == 'completed':
|
|
|
|
+ self.task_manager.update_task_result(self.task_id, result)
|
|
|
|
+ return success
|
|
|
|
+ except Exception as e:
|
|
|
|
+ self.logger.warning(f"更新任务状态失败: {e}")
|
|
|
|
+ return False
|
|
|
|
+
|
|
|
|
+ def update_progress(self, progress: int, current_step: str = None):
|
|
|
|
+ """更新任务进度"""
|
|
|
|
+ try:
|
|
|
|
+ return self.task_manager.update_task_progress(
|
|
|
|
+ self.task_id, progress, current_step
|
|
|
|
+ )
|
|
|
|
+ except Exception as e:
|
|
|
|
+ self.logger.warning(f"更新任务进度失败: {e}")
|
|
|
|
+ return False
|
|
|
|
+
|
|
|
|
+ def add_log(self, level: str, message: str, step_name: str = None,
|
|
|
|
+ extra_data: dict = None):
|
|
|
|
+ """添加任务日志"""
|
|
|
|
+ try:
|
|
|
|
+ return self.task_manager.add_task_log(
|
|
|
|
+ self.task_id, level, message, step_name, extra_data
|
|
|
|
+ )
|
|
|
|
+ except Exception as e:
|
|
|
|
+ self.logger.warning(f"添加任务日志失败: {e}")
|
|
|
|
+ return False
|
|
|
|
+```
|
|
|
|
+
|
|
|
|
+## 日志系统集成设计
|
|
|
|
+
|
|
|
|
+### 1. 日志路径管理
|
|
|
|
+
|
|
|
|
+修改 `core/logging/log_manager.py` 以支持任务特定的日志目录:
|
|
|
|
+
|
|
|
|
+```python
|
|
|
|
+def _create_file_handler(self, file_config: dict, module: str) -> logging.Handler:
|
|
|
|
+ """创建文件处理器"""
|
|
|
|
+
|
|
|
|
+ # 对于data_pipeline模块,检查是否有任务特定的日志目录
|
|
|
|
+ if module == 'data_pipeline' and 'DATA_PIPELINE_LOG_DIR' in os.environ:
|
|
|
|
+ log_file = Path(os.environ['DATA_PIPELINE_LOG_DIR']) / 'data_pipeline.log'
|
|
|
|
+ # 禁用轮转,因为每个任务的日志是独立的
|
|
|
|
+ file_config = file_config.copy()
|
|
|
|
+ file_config['enable_rotation'] = False
|
|
|
|
+ else:
|
|
|
|
+ log_file = self.base_log_dir / file_config.get('filename', f'{module}.log')
|
|
|
|
+
|
|
|
|
+ # 确保日志目录存在
|
|
|
|
+ log_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
+
|
|
|
|
+ # ... 其余代码保持不变 ...
|
|
|
|
+```
|
|
|
|
+
|
|
|
|
+### 2. 任务日志初始化
|
|
|
|
+
|
|
|
|
+在 `schema_workflow.py` 的 `main()` 函数中:
|
|
|
|
+
|
|
|
|
+```python
|
|
|
|
+async def main():
|
|
|
|
+ """命令行入口点"""
|
|
|
|
+ parser = setup_argument_parser()
|
|
|
|
+ args = parser.parse_args()
|
|
|
|
+
|
|
|
|
+ # 初始化变量
|
|
|
|
+ task_id = None
|
|
|
|
+ db_logger = None
|
|
|
|
+
|
|
|
|
+ # 如果不禁用数据库追踪
|
|
|
|
+ if not args.no_db_tracking:
|
|
|
|
+ # 如果没有task_id,自动生成
|
|
|
|
+ if not args.task_id:
|
|
|
|
+ from datetime import datetime
|
|
|
|
+ args.task_id = f"manual_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
|
|
|
+ logger.info(f"📝 自动生成任务ID: {args.task_id}")
|
|
|
|
+
|
|
|
|
+ task_id = args.task_id
|
|
|
|
+
|
|
|
|
+ # 确定任务目录
|
|
|
|
+ if task_id.startswith('task_'):
|
|
|
|
+ # API调用的任务,输出目录已经是任务特定的
|
|
|
|
+ task_dir = args.output_dir
|
|
|
|
+ else:
|
|
|
|
+ # 手动执行的任务,创建任务特定目录
|
|
|
|
+ task_dir = os.path.join(args.output_dir, task_id)
|
|
|
|
+ os.makedirs(task_dir, exist_ok=True)
|
|
|
|
+ args.output_dir = task_dir
|
|
|
|
+
|
|
|
|
+ # 设置环境变量,让日志系统知道当前的任务目录
|
|
|
|
+ os.environ['DATA_PIPELINE_LOG_DIR'] = task_dir
|
|
|
|
+
|
|
|
|
+ # 重新初始化日志系统
|
|
|
|
+ from core.logging import initialize_logging
|
|
|
|
+ initialize_logging()
|
|
|
|
+
|
|
|
|
+ try:
|
|
|
|
+ # 创建任务记录(如果是手动执行)
|
|
|
|
+ if task_id.startswith('manual_'):
|
|
|
|
+ task_manager = DataPipelineTaskManager(args.db_connection)
|
|
|
|
+ task_manager.create_task(
|
|
|
|
+ task_id=task_id,
|
|
|
|
+ task_type='complete_workflow',
|
|
|
|
+ parameters={
|
|
|
|
+ 'db_connection': args.db_connection,
|
|
|
|
+ 'table_list': args.table_list,
|
|
|
|
+ 'business_context': args.business_context,
|
|
|
|
+ 'output_dir': args.output_dir,
|
|
|
|
+ # ... 其他参数
|
|
|
|
+ },
|
|
|
|
+ created_by='manual'
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ # 初始化数据库记录器
|
|
|
|
+ db_logger = DatabaseProgressLogger(task_id, args.db_connection)
|
|
|
|
+ logger.info(f"✅ 已启用数据库任务追踪: {task_id}")
|
|
|
|
+
|
|
|
|
+ except Exception as e:
|
|
|
|
+ logger.warning(f"⚠️ 无法初始化任务追踪: {e}")
|
|
|
|
+ db_logger = None
|
|
|
|
+ else:
|
|
|
|
+ logger.info("ℹ️ 已禁用数据库任务追踪")
|
|
|
|
+
|
|
|
|
+ # 参数验证:单步模式必须提供步骤号
|
|
|
|
+ if args.execution_mode == 'single' and not args.single_step:
|
|
|
|
+ logger.error("单步模式下必须提供 --single-step 参数")
|
|
|
|
+ sys.exit(1)
|
|
|
|
+
|
|
|
|
+ # 创建编排器,传入新参数
|
|
|
|
+ orchestrator = SchemaWorkflowOrchestrator(
|
|
|
|
+ db_connection=args.db_connection,
|
|
|
|
+ table_list_file=args.table_list,
|
|
|
|
+ business_context=args.business_context,
|
|
|
|
+ output_dir=args.output_dir,
|
|
|
|
+ enable_sql_validation=not args.skip_validation,
|
|
|
|
+ enable_llm_repair=not args.disable_llm_repair,
|
|
|
|
+ modify_original_file=not args.no_modify_file,
|
|
|
|
+ enable_training_data_load=not args.skip_training_load,
|
|
|
|
+ task_id=task_id,
|
|
|
|
+ db_logger=db_logger,
|
|
|
|
+ execution_mode=args.execution_mode,
|
|
|
|
+ single_step=args.single_step
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ # 执行工作流
|
|
|
|
+ report = await orchestrator.execute_complete_workflow()
|
|
|
|
+
|
|
|
|
+ # ... 其余代码保持不变 ...
|
|
|
|
+```
|
|
|
|
+
|
|
|
|
+## 错误处理和监控
|
|
|
|
+
|
|
|
|
+### 1. 僵尸任务检测
|
|
|
|
+
|
|
|
|
+```python
|
|
|
|
+# data_pipeline/api/task_monitor.py
|
|
|
|
+class TaskMonitor:
|
|
|
|
+ """任务监控器"""
|
|
|
|
+
|
|
|
|
+ def __init__(self, db_connection_string: str):
|
|
|
|
+ self.task_manager = DataPipelineTaskManager(db_connection_string)
|
|
|
|
+ self.logger = get_data_pipeline_logger("TaskMonitor")
|
|
|
|
+
|
|
|
|
+ def check_zombie_tasks(self, timeout_hours: int = 2):
|
|
|
|
+ """检查僵尸任务"""
|
|
|
|
+ try:
|
|
|
|
+ cutoff_time = datetime.now() - timedelta(hours=timeout_hours)
|
|
|
|
+
|
|
|
|
+ # 查找超时的运行中任务
|
|
|
|
+ zombie_tasks = self.task_manager.get_zombie_tasks(cutoff_time)
|
|
|
|
+
|
|
|
|
+ for task in zombie_tasks:
|
|
|
|
+ task_id = task['id']
|
|
|
|
+ self.logger.warning(f"发现僵尸任务: {task_id}")
|
|
|
|
+
|
|
|
|
+ # 标记为失败
|
|
|
|
+ self.task_manager.update_task_status(
|
|
|
|
+ task_id,
|
|
|
|
+ 'failed',
|
|
|
|
+ error_message=f"任务超时(超过{timeout_hours}小时),可能已停止运行"
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ # 记录日志
|
|
|
|
+ self.task_manager.add_task_log(
|
|
|
|
+ task_id,
|
|
|
|
+ 'ERROR',
|
|
|
|
+ f"任务被标记为僵尸任务,执行时间超过{timeout_hours}小时",
|
|
|
|
+ 'system_check'
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ except Exception as e:
|
|
|
|
+ self.logger.error(f"检查僵尸任务失败: {e}")
|
|
|
|
+
|
|
|
|
+# 在citu_app.py中添加定期检查
|
|
|
|
+import threading
|
|
|
|
+import time
|
|
|
|
+
|
|
|
|
+def start_task_monitor():
|
|
|
|
+ """启动任务监控器"""
|
|
|
|
+ def monitor_loop():
|
|
|
|
+ monitor = TaskMonitor(app_config.PGVECTOR_CONFIG)
|
|
|
|
+ while True:
|
|
|
|
+ try:
|
|
|
|
+ monitor.check_zombie_tasks()
|
|
|
|
+ time.sleep(300) # 每5分钟检查一次
|
|
|
|
+ except Exception as e:
|
|
|
|
+ logger.error(f"任务监控异常: {e}")
|
|
|
|
+ time.sleep(60) # 出错时等待1分钟再重试
|
|
|
|
+
|
|
|
|
+ monitor_thread = threading.Thread(target=monitor_loop, daemon=True)
|
|
|
|
+ monitor_thread.start()
|
|
|
|
+ logger.info("任务监控器已启动")
|
|
|
|
+
|
|
|
|
+# 在应用启动时调用
|
|
|
|
+if __name__ == '__main__':
|
|
|
|
+ start_task_monitor()
|
|
|
|
+ app.run()
|
|
|
|
+```
|
|
|
|
+
|
|
|
|
+### 2. 文件输出管理
|
|
|
|
+
|
|
|
|
+```python
|
|
|
|
+# data_pipeline/api/file_manager.py
|
|
|
|
+class TaskFileManager:
|
|
|
|
+ """任务文件管理器"""
|
|
|
|
+
|
|
|
|
+ def __init__(self, task_id: str, output_dir: str, db_connection_string: str):
|
|
|
|
+ self.task_id = task_id
|
|
|
|
+ self.output_dir = Path(output_dir)
|
|
|
|
+ self.task_manager = DataPipelineTaskManager(db_connection_string)
|
|
|
|
+ self.logger = get_data_pipeline_logger("FileManager")
|
|
|
|
+
|
|
|
|
+ def scan_and_register_files(self):
|
|
|
|
+ """扫描并注册输出文件"""
|
|
|
|
+ try:
|
|
|
|
+ if not self.output_dir.exists():
|
|
|
|
+ return
|
|
|
|
+
|
|
|
|
+ # 文件类型映射
|
|
|
|
+ file_type_mapping = {
|
|
|
|
+ '.ddl': 'ddl',
|
|
|
|
+ '.md': 'md',
|
|
|
|
+ '.json': 'json',
|
|
|
|
+ '.log': 'log',
|
|
|
|
+ '.txt': 'txt'
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ for file_path in self.output_dir.iterdir():
|
|
|
|
+ if file_path.is_file():
|
|
|
|
+ file_ext = file_path.suffix.lower()
|
|
|
|
+ file_type = file_type_mapping.get(file_ext, 'other')
|
|
|
|
+ file_size = file_path.stat().st_size
|
|
|
|
+
|
|
|
|
+ # 判断是否为主要输出文件
|
|
|
|
+ is_primary = (
|
|
|
|
+ file_path.name.endswith('_pair.json') or
|
|
|
|
+ file_path.name == 'metadata.txt' or
|
|
|
|
+ file_path.name.endswith('_summary.log')
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ # 注册文件
|
|
|
|
+ self.task_manager.register_output_file(
|
|
|
|
+ task_id=self.task_id,
|
|
|
|
+ file_type=file_type,
|
|
|
|
+ file_name=file_path.name,
|
|
|
|
+ file_path=str(file_path.relative_to(self.output_dir)),
|
|
|
|
+ file_size=file_size,
|
|
|
|
+ is_primary=is_primary
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ except Exception as e:
|
|
|
|
+ self.logger.error(f"扫描文件失败: {e}")
|
|
|
|
+
|
|
|
|
+ def cleanup_task_files(self):
|
|
|
|
+ """清理任务文件"""
|
|
|
|
+ try:
|
|
|
|
+ if self.output_dir.exists():
|
|
|
|
+ shutil.rmtree(self.output_dir)
|
|
|
|
+ self.logger.info(f"已清理任务文件: {self.output_dir}")
|
|
|
|
+ except Exception as e:
|
|
|
|
+ self.logger.error(f"清理任务文件失败: {e}")
|
|
|
|
+```
|
|
|
|
+
|
|
|
|
+## 部署和初始化
|
|
|
|
+
|
|
|
|
+### 1. 数据库初始化脚本
|
|
|
|
+
|
|
|
|
+```sql
|
|
|
|
+-- data_pipeline/sql/init_tables.sql
|
|
|
|
+
|
|
|
|
+-- 创建任务表
|
|
|
|
+CREATE TABLE IF NOT EXISTS data_pipeline_tasks (
|
|
|
|
+ id VARCHAR(32) PRIMARY KEY,
|
|
|
|
+ task_type VARCHAR(50) NOT NULL DEFAULT 'complete_workflow',
|
|
|
|
+ status VARCHAR(20) NOT NULL DEFAULT 'pending',
|
|
|
|
+ parameters JSONB NOT NULL,
|
|
|
|
+ result JSONB,
|
|
|
|
+ error_message TEXT,
|
|
|
|
+ error_step VARCHAR(100),
|
|
|
|
+ progress INTEGER DEFAULT 0 CHECK (progress >= 0 AND progress <= 100),
|
|
|
|
+ current_step VARCHAR(100),
|
|
|
|
+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
|
+ started_at TIMESTAMP,
|
|
|
|
+ completed_at TIMESTAMP,
|
|
|
|
+ created_by VARCHAR(50) DEFAULT 'api',
|
|
|
|
+ step_stats JSONB DEFAULT '{}',
|
|
|
|
+ output_directory TEXT,
|
|
|
|
+ db_name VARCHAR(100),
|
|
|
|
+ business_context TEXT
|
|
|
|
+);
|
|
|
|
+
|
|
|
|
+-- 创建日志表
|
|
|
|
+CREATE TABLE IF NOT EXISTS data_pipeline_task_logs (
|
|
|
|
+ id SERIAL PRIMARY KEY,
|
|
|
|
+ task_id VARCHAR(32) REFERENCES data_pipeline_tasks(id) ON DELETE CASCADE,
|
|
|
|
+ log_level VARCHAR(10) NOT NULL,
|
|
|
|
+ message TEXT NOT NULL,
|
|
|
|
+ step_name VARCHAR(100),
|
|
|
|
+ module_name VARCHAR(100),
|
|
|
|
+ function_name VARCHAR(100),
|
|
|
|
+ timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
|
+ extra_data JSONB DEFAULT '{}'
|
|
|
|
+);
|
|
|
|
+
|
|
|
|
+-- 创建输出文件表
|
|
|
|
+CREATE TABLE IF NOT EXISTS data_pipeline_task_outputs (
|
|
|
|
+ id SERIAL PRIMARY KEY,
|
|
|
|
+ task_id VARCHAR(32) REFERENCES data_pipeline_tasks(id) ON DELETE CASCADE,
|
|
|
|
+ file_type VARCHAR(50) NOT NULL,
|
|
|
|
+ file_name VARCHAR(255) NOT NULL,
|
|
|
|
+ file_path TEXT NOT NULL,
|
|
|
|
+ file_size BIGINT DEFAULT 0,
|
|
|
|
+ content_hash VARCHAR(64),
|
|
|
|
+ description TEXT,
|
|
|
|
+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
|
+ modified_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
|
+ is_primary BOOLEAN DEFAULT FALSE,
|
|
|
|
+ is_downloadable BOOLEAN DEFAULT TRUE
|
|
|
|
+);
|
|
|
|
+
|
|
|
|
+-- 创建索引
|
|
|
|
+CREATE INDEX IF NOT EXISTS idx_tasks_status ON data_pipeline_tasks(status);
|
|
|
|
+CREATE INDEX IF NOT EXISTS idx_tasks_created_at ON data_pipeline_tasks(created_at DESC);
|
|
|
|
+CREATE INDEX IF NOT EXISTS idx_tasks_db_name ON data_pipeline_tasks(db_name);
|
|
|
|
+CREATE INDEX IF NOT EXISTS idx_tasks_created_by ON data_pipeline_tasks(created_by);
|
|
|
|
+
|
|
|
|
+CREATE INDEX IF NOT EXISTS idx_logs_task_id ON data_pipeline_task_logs(task_id);
|
|
|
|
+CREATE INDEX IF NOT EXISTS idx_logs_timestamp ON data_pipeline_task_logs(timestamp DESC);
|
|
|
|
+CREATE INDEX IF NOT EXISTS idx_logs_level ON data_pipeline_task_logs(log_level);
|
|
|
|
+CREATE INDEX IF NOT EXISTS idx_logs_step ON data_pipeline_task_logs(step_name);
|
|
|
|
+
|
|
|
|
+CREATE INDEX IF NOT EXISTS idx_outputs_task_id ON data_pipeline_task_outputs(task_id);
|
|
|
|
+CREATE INDEX IF NOT EXISTS idx_outputs_file_type ON data_pipeline_task_outputs(file_type);
|
|
|
|
+CREATE INDEX IF NOT EXISTS idx_outputs_primary ON data_pipeline_task_outputs(is_primary) WHERE is_primary = TRUE;
|
|
|
|
+
|
|
|
|
+-- 创建清理函数
|
|
|
|
+CREATE OR REPLACE FUNCTION cleanup_old_data_pipeline_tasks(days_to_keep INTEGER DEFAULT 30)
|
|
|
|
+RETURNS INTEGER AS $$
|
|
|
|
+DECLARE
|
|
|
|
+ deleted_count INTEGER;
|
|
|
|
+ cutoff_date TIMESTAMP;
|
|
|
|
+BEGIN
|
|
|
|
+ cutoff_date := NOW() - INTERVAL '1 day' * days_to_keep;
|
|
|
|
+
|
|
|
|
+ -- 删除旧任务(级联删除相关日志和文件记录)
|
|
|
|
+ DELETE FROM data_pipeline_tasks
|
|
|
|
+ WHERE created_at < cutoff_date
|
|
|
|
+ AND status IN ('completed', 'failed');
|
|
|
|
+
|
|
|
|
+ GET DIAGNOSTICS deleted_count = ROW_COUNT;
|
|
|
|
+
|
|
|
|
+ RETURN deleted_count;
|
|
|
|
+END;
|
|
|
|
+$$ LANGUAGE plpgsql;
|
|
|
|
+```
|
|
|
|
+
|
|
|
|
+### 2. 配置文件更新
|
|
|
|
+
|
|
|
|
+需要在 `app_config.py` 中添加Data Pipeline相关配置:
|
|
|
|
+
|
|
|
|
+```python
|
|
|
|
+# Data Pipeline API配置
|
|
|
|
+DATA_PIPELINE_CONFIG = {
|
|
|
|
+ "max_concurrent_tasks": 1, # 最大并发任务数
|
|
|
|
+ "task_timeout_hours": 2, # 任务超时时间(小时)
|
|
|
|
+ "log_retention_days": 30, # 日志保留天数
|
|
|
|
+ "file_retention_days": 30, # 文件保留天数
|
|
|
|
+ "monitor_interval_seconds": 300, # 监控检查间隔(秒)
|
|
|
|
+ "enable_file_download": True, # 是否允许文件下载
|
|
|
|
+ "max_download_file_size": 100 * 1024 * 1024, # 最大下载文件大小(字节)
|
|
|
|
+}
|
|
|
|
+```
|
|
|
|
+
|
|
|
|
+## 总结
|
|
|
|
+
|
|
|
|
+本详细设计文档提供了Data Pipeline API系统的完整技术实现方案:
|
|
|
|
+
|
|
|
|
+### 主要特点
|
|
|
|
+
|
|
|
|
+1. **API与执行分离**:使用subprocess实现真正的后台执行,API不阻塞
|
|
|
|
+2. **数据库驱动的状态管理**:所有任务状态、进度、日志都记录在PostgreSQL中
|
|
|
|
+3. **灵活的步骤控制**:支持从指定步骤开始、结束,以及跳过特定步骤
|
|
|
|
+4. **统一的日志管理**:每个任务的日志都写入独立的任务目录
|
|
|
|
+5. **完整的文件管理**:自动扫描、注册和管理任务输出文件
|
|
|
|
+6. **健壮的错误处理**:包括僵尸任务检测、超时处理等
|
|
|
|
+
|
|
|
|
+### 实现要点
|
|
|
|
+
|
|
|
|
+1. **最小化代码修改**:主要修改集中在 `schema_workflow.py` 和 `citu_app.py`
|
|
|
|
+2. **向后兼容**:手动执行方式仍然完全支持
|
|
|
|
+3. **扩展性好**:易于添加新的任务类型和执行步骤
|
|
|
|
+4. **监控友好**:提供完整的任务监控和清理机制
|
|
|
|
+
|
|
|
|
+### 关键文件
|
|
|
|
+
|
|
|
|
+1. `citu_app.py` - 添加API路由实现
|
|
|
|
+2. `data_pipeline/schema_workflow.py` - 修改以支持API集成
|
|
|
|
+3. `data_pipeline/api/database_manager.py` - 数据库操作封装(新建)
|
|
|
|
+4. `data_pipeline/api/database_logger.py` - 进度记录器(新建)
|
|
|
|
+5. `data_pipeline/sql/init_tables.sql` - 数据库初始化脚本(新建)
|
|
|
|
+
|
|
|
|
+这个设计充分考虑了现有代码结构,提供了完整的API功能,同时保持了系统的简洁性和可维护性。
|