为了让Web UI能够调用Data Pipeline生成训练数据的功能,并实现任务的后台执行、进度追踪和日志查看,我们需要设计一套API系统来支持这些需求。
task_YYYYMMDD_HHMMSS
格式
task_20250627_143052
表示 2025年6月27日 14:30:52 创建的任务每个任务在./data_pipeline/training_data/
下创建独立的时间戳目录:
./data_pipeline/training_data/
├── task_20250627_143052/ # 时间戳作为任务ID
│ ├── data_pipeline.log # 所有data_pipeline模块的统一日志
│ ├── task_config.json # 任务配置参数
│ ├── task_result.json # 最终执行结果
│ ├── bss_*.ddl # 生成的DDL文件
│ ├── bss_*_detail.md # 生成的MD文档
│ ├── qs_*.json # Question-SQL对
│ ├── metadata.txt # 元数据文件
│ ├── sql_validation_*_summary.log # SQL验证摘要报告
│ ├── sql_validation_*_report.json # SQL验证详细报告(可选)
│ └── file_modifications_*.log # 文件修改日志(如果启用修改功能)
└── task_20250627_150123/
└── ...
目录创建细节:
./data_pipeline/training_data/
采用步骤级进度追踪,不追踪表级别的细节:
CREATE TABLE data_pipeline_tasks (
id VARCHAR(32) PRIMARY KEY, -- 任务ID (时间戳格式)
task_type VARCHAR(50) NOT NULL, -- 任务类型
status VARCHAR(20) NOT NULL, -- 任务状态: pending/in_progress/partial_completed/completed/failed
parameters JSONB NOT NULL, -- 任务参数
result JSONB, -- 任务结果
error_message TEXT, -- 错误信息
step_status JSONB DEFAULT '{ -- 各步骤状态跟踪
"ddl_generation": "pending",
"qa_generation": "pending",
"sql_validation": "pending",
"training_load": "pending"
}',
output_directory TEXT, -- 任务输出目录
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
started_at TIMESTAMP,
completed_at TIMESTAMP,
created_by VARCHAR(50),
db_name VARCHAR(100), -- 数据库名称
business_context TEXT -- 业务上下文
);
CREATE TABLE data_pipeline_task_executions (
id SERIAL PRIMARY KEY,
task_id VARCHAR(32) REFERENCES data_pipeline_tasks(id) ON DELETE CASCADE,
execution_step VARCHAR(50) NOT NULL, -- 'ddl_generation', 'qa_generation', 'sql_validation', 'training_load'
status VARCHAR(20) NOT NULL, -- 'running', 'completed', 'failed'
started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP,
error_message TEXT,
execution_result JSONB, -- 步骤执行结果
execution_id VARCHAR(100) UNIQUE, -- {task_id}_step_{step_name}_exec_{timestamp}
force_executed BOOLEAN DEFAULT FALSE, -- 是否强制执行
files_cleaned BOOLEAN DEFAULT FALSE -- 是否清理了旧文件
);
CREATE TABLE data_pipeline_task_logs (
id SERIAL PRIMARY KEY,
task_id VARCHAR(32) REFERENCES data_pipeline_tasks(id) ON DELETE CASCADE,
execution_id VARCHAR(100) REFERENCES data_pipeline_task_executions(execution_id),
log_level VARCHAR(10) NOT NULL, -- 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'
message TEXT NOT NULL,
step_name VARCHAR(50), -- 执行步骤名称
module_name VARCHAR(100), -- 模块名称
function_name VARCHAR(100), -- 函数名称
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
extra_data JSONB DEFAULT '{}' -- 额外的结构化信息
);
CREATE TABLE data_pipeline_task_outputs (
id SERIAL PRIMARY KEY,
task_id VARCHAR(32) REFERENCES data_pipeline_tasks(id) ON DELETE CASCADE,
execution_id VARCHAR(100) REFERENCES data_pipeline_task_executions(execution_id),
file_type VARCHAR(50) NOT NULL, -- 'ddl', 'md', 'json', 'log', 'report'
file_name VARCHAR(255) NOT NULL, -- 文件名
file_path TEXT NOT NULL, -- 相对路径
file_size BIGINT DEFAULT 0, -- 文件大小(字节)
content_hash VARCHAR(64), -- 文件内容hash
description TEXT, -- 文件描述
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
modified_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_primary BOOLEAN DEFAULT FALSE, -- 是否为主要输出文件
is_downloadable BOOLEAN DEFAULT TRUE -- 是否可下载
);
-- 任务表索引
CREATE INDEX idx_tasks_status ON data_pipeline_tasks(status);
CREATE INDEX idx_tasks_created_at ON data_pipeline_tasks(created_at DESC);
CREATE INDEX idx_tasks_db_name ON data_pipeline_tasks(db_name);
CREATE INDEX idx_tasks_created_by ON data_pipeline_tasks(created_by);
-- 执行记录表索引
CREATE INDEX idx_executions_task_id ON data_pipeline_task_executions(task_id);
CREATE INDEX idx_executions_step ON data_pipeline_task_executions(execution_step);
CREATE INDEX idx_executions_status ON data_pipeline_task_executions(status);
CREATE INDEX idx_executions_started_at ON data_pipeline_task_executions(started_at DESC);
-- 日志表索引
CREATE INDEX idx_logs_task_id ON data_pipeline_task_logs(task_id);
CREATE INDEX idx_logs_execution_id ON data_pipeline_task_logs(execution_id);
CREATE INDEX idx_logs_timestamp ON data_pipeline_task_logs(timestamp DESC);
CREATE INDEX idx_logs_level ON data_pipeline_task_logs(log_level);
CREATE INDEX idx_logs_step ON data_pipeline_task_logs(step_name);
-- 文件输出表索引
CREATE INDEX idx_outputs_task_id ON data_pipeline_task_outputs(task_id);
CREATE INDEX idx_outputs_execution_id ON data_pipeline_task_outputs(execution_id);
CREATE INDEX idx_outputs_file_type ON data_pipeline_task_outputs(file_type);
CREATE INDEX idx_outputs_primary ON data_pipeline_task_outputs(is_primary) WHERE is_primary = TRUE;
实现位置:所有API端点都在citu_app.py
中实现,作为现有Flask应用的扩展。
POST /api/v0/data_pipeline/tasks
请求参数:
{
"task_type": "data_workflow",
"table_list_file": "tables.txt",
"business_context": "高速公路服务区管理系统",
"db_name": "highway_db",
"enable_sql_validation": true,
"enable_llm_repair": true,
"modify_original_file": true,
"enable_training_data_load": true
}
注意: 数据库连接信息自动从 app_config.py
获取:
APP_DB_CONFIG
PGVECTOR_CONFIG
(向量数据库)响应:
{
"success": true,
"message": "任务创建成功",
"data": {
"task_id": "task_20250627_143052",
"status": "pending",
"output_directory": "./data_pipeline/training_data/task_20250627_143052",
"step_status": {
"ddl_generation": "pending",
"qa_generation": "pending",
"sql_validation": "pending",
"training_load": "pending"
},
"created_at": "2025-06-27T14:30:52"
}
}
POST /api/v0/data_pipeline/tasks/{task_id}/execute
请求参数:
{
"step": "ddl_generation",
"force_execute": false,
"clean_previous": true
}
响应:
{
"success": true,
"message": "步骤执行已启动",
"data": {
"execution_id": "task_20250627_143052_step_ddl_generation_exec_20250627143055",
"task_id": "task_20250627_143052",
"step": "ddl_generation",
"status": "running",
"started_at": "2025-06-27T14:30:55"
}
}
POST /api/v0/data_pipeline/tasks/execute-complete
请求参数:
{
"task_type": "complete_workflow",
"table_list_file": "tables.txt",
"business_context": "高速公路服务区管理系统",
"db_name": "highway_db"
}
响应:
{
"success": true,
"message": "完整工作流执行已启动",
"data": {
"task_id": "task_20250627_143052",
"execution_id": "task_20250627_143052_step_complete_exec_20250627143055",
"status": "running",
"started_at": "2025-06-27T14:30:55"
}
}
GET /api/v0/data_pipeline/tasks
响应:
{
"success": true,
"data": {
"tasks": [
{
"task_id": "task_20250627_143052",
"task_type": "complete_workflow",
"status": "running",
"progress": 45,
"created_at": "2025-06-27T14:30:52"
}
]
}
}
GET /api/v0/data_pipeline/tasks/{task_id}
响应:
{
"success": true,
"data": {
"task_id": "task_20250627_143052",
"task_type": "data_workflow",
"status": "in_progress",
"parameters": { ... },
"step_status": {
"ddl_generation": "completed",
"qa_generation": "running",
"sql_validation": "pending",
"training_load": "pending"
},
"output_directory": "./data_pipeline/training_data/task_20250627_143052",
"created_at": "2025-06-27T14:30:52",
"started_at": "2025-06-27T14:30:53",
"completed_at": null,
"current_execution": {
"execution_id": "task_20250627_143052_step_qa_generation_exec_20250627143521",
"step": "qa_generation",
"status": "running",
"started_at": "2025-06-27T14:35:21"
}
}
}
GET /api/v0/data_pipeline/tasks/{task_id}/executions
响应:
{
"success": true,
"data": {
"executions": [
{
"execution_id": "task_20250627_143052_step_ddl_generation_exec_20250627143053",
"step": "ddl_generation",
"status": "completed",
"started_at": "2025-06-27T14:30:53",
"completed_at": "2025-06-27T14:35:20",
"duration": 267,
"force_executed": false,
"files_cleaned": true
},
{
"execution_id": "task_20250627_143052_step_qa_generation_exec_20250627143521",
"step": "qa_generation",
"status": "running",
"started_at": "2025-06-27T14:35:21",
"completed_at": null,
"force_executed": false,
"files_cleaned": false
}
]
}
}
GET /api/v0/data_pipeline/tasks/active
响应:返回最近的运行中任务,如无则返回最近完成的任务
GET /api/v0/data_pipeline/tasks/{task_id}/logs?limit=100&level=INFO
响应:
{
"success": true,
"data": {
"logs": [
{
"timestamp": "2025-06-27T14:30:53",
"level": "INFO",
"step_name": "ddl_md_generation",
"message": "开始处理表: bss_business_day_data"
}
]
}
}
GET /api/v0/data_pipeline/tasks/{task_id}/files
响应:
{
"success": true,
"data": {
"files": [
{
"file_name": "qs_highway_db_20250627_143052_pair.json",
"file_type": "json",
"file_size": 102400,
"download_url": "/api/v0/data_pipeline/tasks/task_20250627_143052/files/download/qs_highway_db_20250627_143052_pair.json"
}
]
}
}
GET /api/v0/data_pipeline/tasks/{task_id}/files/download/{filename}
任务(Task):一个完整的数据处理工作单元,包含4个步骤,有唯一的任务ID和输出目录 执行(Execution):在某个任务中执行特定步骤的一次操作,支持重复执行和分步执行
步骤标识使用描述性名称:
完整工作流模式:
分步执行模式:
同一任务目录原则:
./data_pipeline/training_data/{task_id}/
目录步骤文件映射:
*.ddl
, *_detail.md
, metadata.txt
qs_*.json
, qs_*.json.backup
sql_validation_*_summary.log
, sql_validation_*_report.json
training_load_*.log
单任务内串行执行:
1. 前端发送POST请求创建任务
2. API生成task_id (格式: task_YYYYMMDD_HHMMSS)
3. 在数据库中创建任务记录,状态为'pending'
4. 创建对应的时间戳目录
5. 初始化步骤状态为全部'pending'
6. 立即返回task_id给前端
7. 任务创建完成,等待步骤执行请求
1. 前端发送POST请求执行特定步骤
2. 检查任务是否存在
3. 检查步骤依赖关系(除非force_execute=true)
4. 检查是否有正在运行的步骤(并发控制)
5. 生成execution_id
6. 创建执行记录,状态为'running'
7. 如果clean_previous=true,清理该步骤的旧输出文件
8. 启动独立任务执行器进程: subprocess.Popen([
sys.executable,
'./data_pipeline/task_executor.py',
'--task-id', task_id,
'--execution-mode', execution_mode, # 'complete' 或 'step'
'--step-name', step_name if execution_mode == 'step' else None
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
cwd=project_root
)
9. 立即返回execution_id给前端
10. API请求结束,task_executor.py脚本继续后台运行
详细实现步骤:
from datetime import datetime
task_id = f"task_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
SELECT COUNT(*) FROM data_pipeline_tasks WHERE status = 'running';
-- 如果结果 > 0,返回错误:"已有任务正在执行,请稍后再试"
INSERT INTO data_pipeline_tasks (id, task_type, status, parameters, created_by)
VALUES (?, ?, 'pending', ?::jsonb, ?);
task_dir = os.path.join('./data_pipeline/training_data/', task_id)
os.makedirs(task_dir, mode=0o755, exist_ok=False) # exist_ok=False 确保目录唯一
config_path = os.path.join(task_dir, 'task_config.json')
with open(config_path, 'w', encoding='utf-8') as f:
json.dump({
'task_id': task_id,
'task_type': task_type,
'parameters': parameters,
'created_at': datetime.now().isoformat()
}, f, indent=2, ensure_ascii=False)
# 使用subprocess.Popen启动独立任务执行器进程
process = subprocess.Popen(
[sys.executable,
'./data_pipeline/task_executor.py',
'--task-id', task_id,
'--execution-mode', execution_mode, # 'complete' 或 'step'
'--step-name', step_name if execution_mode == 'step' else None
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
cwd=project_root # 项目根目录
)
1. task_executor.py启动,接收task_id和执行模式参数
2. 初始化日志系统,创建SimpleWorkflowExecutor实例
3. 确保任务目录存在,设置任务目录日志记录器
4. 更新数据库状态为'running',started_at时间戳
5. 创建SchemaWorkflowOrchestrator并重定向其日志到任务目录
6. 执行工作流(完整或单步),记录详细日志到data_pipeline.log
7. 生成的文件都保存在对应的时间戳目录
8. 完成后更新数据库状态为'completed'或'failed'
9. 清理资源,脚本退出
任务执行架构 (基于独立的task_executor.py):
# data_pipeline/task_executor.py 命令行参数
parser.add_argument('--task-id', required=True, help='任务ID')
parser.add_argument('--execution-mode', default='complete',
choices=['complete', 'step'], help='执行模式')
parser.add_argument('--step-name', help='步骤名称(当execution-mode=step时必需)')
async def execute_task(task_id: str, execution_mode: str, step_name: str = None):
"""执行任务的异步函数"""
executor = None
try:
# 创建SimpleWorkflowExecutor实例
executor = SimpleWorkflowExecutor(task_id)
if execution_mode == "complete":
# 执行完整工作流
return await executor.execute_complete_workflow()
elif execution_mode == "step":
# 执行单个步骤
return await executor.execute_single_step(step_name)
else:
raise ValueError(f"不支持的执行模式: {execution_mode}")
finally:
if executor:
executor.cleanup()
class SimpleWorkflowExecutor:
def __init__(self, task_id: str):
self.task_id = task_id
self.task_manager = SimpleTaskManager() # 数据库管理
self.file_manager = SimpleFileManager() # 文件管理
self.task_dir_logger = None # 任务目录日志记录器
self._load_task_info() # 加载任务信息
def _setup_task_directory_logger(self):
"""设置任务目录日志记录器"""
task_dir = self.file_manager.get_task_directory(self.task_id)
log_file = task_dir / "data_pipeline.log"
# 创建专门的任务目录日志记录器
self.task_dir_logger = logging.getLogger(f"TaskDir_{self.task_id}")
self.task_dir_logger.setLevel(logging.DEBUG)
self.task_dir_logger.handlers.clear()
self.task_dir_logger.propagate = False
# 创建文件处理器
file_handler = logging.FileHandler(log_file, encoding='utf-8')
formatter = logging.Formatter(
'%(asctime)s [%(levelname)s] %(name)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
file_handler.setFormatter(formatter)
self.task_dir_logger.addHandler(file_handler)
def _redirect_orchestrator_logs(self, orchestrator):
"""重定向SchemaWorkflowOrchestrator的日志到任务目录"""
if self.task_dir_logger and hasattr(orchestrator, 'logger'):
for handler in self.task_dir_logger.handlers:
if isinstance(handler, logging.FileHandler):
orchestrator.logger.addHandler(handler)
break
./data_pipeline/training_data/{task_id}/data_pipeline.log
- 详细执行日志data_pipeline_task_logs
表 - 结构化查询和展示./logs/
目录保留系统级日志(app.log、agent.log、vanna.log)任务目录日志记录器:
TaskDir_{task_id}
日志记录器data_pipeline.log
文件数据库日志记录器:
SimpleTaskManager.record_log()
记录关键事件SchemaWorkflowOrchestrator日志重定向:
# 任务目录日志文件内容示例
2025-07-01 14:30:52 [INFO] TaskDir_task_20250701_143052: 任务目录日志初始化完成 - 任务ID: task_20250701_143052
2025-07-01 14:30:52 [INFO] TaskDir_task_20250701_143052: 任务参数: {"db_connection": "...", "business_context": "..."}
2025-07-01 14:30:53 [INFO] TaskDir_task_20250701_143052: [complete] 开始执行步骤: complete
2025-07-01 14:30:53 [INFO] DataPipelineOrchestrator: 开始执行完整工作流
2025-07-01 14:30:54 [INFO] DDLMDGenerator: 开始处理表: bss_business_day_data
# 1. API调用(完整工作流)
python data_pipeline/task_executor.py \
--task-id "task_20250627_143052" \
--execution-mode complete
# 2. API调用(单步执行DDL生成)
python data_pipeline/task_executor.py \
--task-id "task_20250627_143052" \
--execution-mode step \
--step-name ddl_generation
# 3. API调用(单步执行Q&A生成)
python data_pipeline/task_executor.py \
--task-id "task_20250627_143052" \
--execution-mode step \
--step-name qa_generation
# 4. API调用(单步执行SQL验证)
python data_pipeline/task_executor.py \
--task-id "task_20250627_143052" \
--execution-mode step \
--step-name sql_validation
# 5. API调用(单步执行训练数据加载)
python data_pipeline/task_executor.py \
--task-id "task_20250627_143052" \
--execution-mode step \
--step-name training_load
# 1. 创建任务
curl -X POST /api/v0/data_pipeline/tasks \
-d '{"task_type": "data_workflow", "parameters": {...}}'
# 返回: {"task_id": "task_20250627_143052"}
# 2. 执行DDL生成
curl -X POST /api/v0/data_pipeline/tasks/task_20250627_143052/execute \
-d '{"step": "ddl_generation"}'
# 等待完成,检查结果
# 3. 检查DDL生成结果满意后,执行Q&A生成
curl -X POST /api/v0/data_pipeline/tasks/task_20250627_143052/execute \
-d '{"step": "qa_generation"}'
# 4. 如果Q&A结果不满意,重新执行
curl -X POST /api/v0/data_pipeline/tasks/task_20250627_143052/execute \
-d '{"step": "qa_generation", "clean_previous": true}'
# 5. 继续后续步骤
curl -X POST /api/v0/data_pipeline/tasks/task_20250627_143052/execute \
-d '{"step": "sql_validation"}'
curl -X POST /api/v0/data_pipeline/tasks/task_20250627_143052/execute \
-d '{"step": "training_load"}'
# 创建任务并立即执行完整工作流
curl -X POST /api/v0/data_pipeline/tasks/execute-complete \
-d '{"task_type": "complete_workflow", "parameters": {...}}'
// 分步执行时的轮询
async function pollExecutionStatus(taskId, executionId) {
const pollInterval = setInterval(async () => {
const response = await fetch(`/api/v0/data_pipeline/tasks/${taskId}/executions`);
const data = await response.json();
const currentExecution = data.data.executions.find(e => e.execution_id === executionId);
// 更新UI
updateStepStatus(currentExecution.step, currentExecution.status);
// 检查是否完成
if (currentExecution.status === 'completed' || currentExecution.status === 'failed') {
clearInterval(pollInterval);
handleStepComplete(currentExecution);
}
}, 5000);
}
// 任务整体状态轮询
async function pollTaskStatus(taskId) {
const pollInterval = setInterval(async () => {
const response = await fetch(`/api/v0/data_pipeline/tasks/${taskId}`);
const data = await response.json();
// 更新各步骤状态
updateAllStepsStatus(data.data.step_status);
// 更新当前执行信息
if (data.data.current_execution) {
updateCurrentExecution(data.data.current_execution);
}
// 检查任务是否全部完成
if (data.data.status === 'completed' || data.data.status === 'failed') {
clearInterval(pollInterval);
handleTaskComplete(data.data);
}
}, 5000);
}
{
"task_id": "task_20250627_143052",
"task_type": "complete_workflow",
"created_at": "2025-06-27T14:30:52",
"parameters": {
"db_connection": {
"host": "localhost",
"port": 5432,
"database": "highway_db",
"user": "postgres",
"password": "******"
},
"table_list": ["bss_business_day_data", "bss_car_day_count", ...],
"business_context": "高速公路服务区管理系统",
"output_dir": "./data_pipeline/training_data/task_20250627_143052",
"execution_mode": "complete",
"single_step": null,
"llm_config": {
"model": "qianwen",
"temperature": 0.7
}
}
}
try:
task_id = create_task(request_data)
return {"success": True, "task_id": task_id}
except ConcurrentTaskError:
return {"success": False, "error": "已有任务正在执行"}, 409
except Exception as e:
logger.error(f"任务创建失败: {str(e)}")
return {"success": False, "error": "任务创建失败"}, 500
try:
# 执行任务
report = await orchestrator.execute_complete_workflow()
if self.db_logger:
self.db_logger.update_status('completed')
except Exception as e:
# 记录错误到日志和数据库
self.logger.error(f"任务执行失败: {str(e)}", exc_info=True)
if self.db_logger:
self.db_logger.log('ERROR', str(e))
self.db_logger.update_status('failed', error_message=str(e))
raise
# 在API启动时检查
def check_zombie_tasks():
# 查找超过2小时仍在运行的任务
query = """
UPDATE data_pipeline_tasks
SET status = 'failed',
error_message = '任务超时,可能已停止运行'
WHERE status = 'running'
AND started_at < NOW() - INTERVAL '2 hours'
"""
# 使用数据库事务确保原子性
def acquire_task_lock(task_id):
with db.transaction():
# 检查是否有运行中的任务
running_count = db.query(
"SELECT COUNT(*) FROM data_pipeline_tasks WHERE status = 'running'"
).scalar()
if running_count > 0:
raise ConcurrentTaskError("已有任务正在执行")
# 获取锁:更新状态为running
db.execute(
"UPDATE data_pipeline_tasks SET status = 'running', started_at = NOW() WHERE id = %s",
[task_id]
)
由于直接调用schema_workflow.py,不需要额外的worker.py,主要修改集中在:
--task-id
和--no-db-tracking
参数在每个执行步骤方法中添加进度更新:
# _execute_step_1_ddl_md_generation
if self.db_logger:
self.db_logger.update_progress(10, 'ddl_md_generation')
self.db_logger.log('INFO', 'DDL/MD生成开始', 'ddl_md_generation')
# ... 执行实际工作
self.db_logger.update_progress(40, 'ddl_md_generation')
# _execute_step_2_question_sql_generation
if self.db_logger:
self.db_logger.update_progress(40, 'question_sql_generation')
# ... 执行实际工作
self.db_logger.update_progress(70, 'question_sql_generation')
# _execute_step_3_sql_validation
if self.db_logger:
self.db_logger.update_progress(70, 'sql_validation')
# ... 执行实际工作
self.db_logger.update_progress(90, 'sql_validation')
# _execute_step_4_training_data_load
if self.db_logger:
self.db_logger.update_progress(90, 'training_data_load')
# ... 执行实际工作
self.db_logger.update_progress(100, 'training_data_load')
在主执行流程中管理任务状态:
async def execute_complete_workflow(self):
# 开始时更新状态
if self.db_logger:
self.db_logger.update_status('running')
try:
# 执行各步骤...
report = await self._generate_final_report()
# 成功完成
if self.db_logger:
self.db_logger.update_status('completed')
except Exception as e:
# 失败处理
if self.db_logger:
self.db_logger.update_status('failed', str(e))
raise
当通过API调用时,output_dir会被设置为任务特定的时间戳目录,确保所有输出文件都集中存储。
def validate_task_request(request_data):
# 验证必填字段
required_fields = ['task_type', 'parameters']
for field in required_fields:
if field not in request_data:
raise ValueError(f"缺少必填字段: {field}")
# 验证数据库连接参数
db_params = request_data['parameters'].get('db_connection', {})
if not all(k in db_params for k in ['host', 'port', 'database']):
raise ValueError("数据库连接参数不完整")
# 验证表列表
table_list = request_data['parameters'].get('table_list', [])
if not table_list:
raise ValueError("表列表不能为空")
# 定期任务清理旧数据
def cleanup_old_tasks():
# 清理30天前的任务
cutoff_date = datetime.now() - timedelta(days=30)
# 查询要清理的任务
old_tasks = db.query("""
SELECT id FROM data_pipeline_tasks
WHERE created_at < %s AND status IN ('completed', 'failed')
""", [cutoff_date])
for task in old_tasks:
# 删除文件目录
task_dir = os.path.join('./data_pipeline/training_data/', task.id)
if os.path.exists(task_dir):
shutil.rmtree(task_dir)
# 删除数据库记录
db.execute("DELETE FROM data_pipeline_tasks WHERE id = %s", [task.id])
DELETE /api/v0/data_pipeline/tasks/{task_id}
-- 创建必要的数据库表
CREATE TABLE IF NOT EXISTS data_pipeline_tasks (...);
CREATE TABLE IF NOT EXISTS data_pipeline_task_logs (...);
CREATE TABLE IF NOT EXISTS data_pipeline_task_outputs (...);
-- 创建索引
CREATE INDEX IF NOT EXISTS idx_tasks_status ON data_pipeline_tasks(status);
CREATE INDEX IF NOT EXISTS idx_tasks_created_at ON data_pipeline_tasks(created_at);
CREATE INDEX IF NOT EXISTS idx_logs_task_id ON data_pipeline_task_logs(task_id);
-- 创建清理函数
CREATE OR REPLACE FUNCTION cleanup_old_tasks()...
本设计采用了任务与API解耦的架构,通过数据库作为通信桥梁,实现了长时间任务的后台执行和实时进度追踪。设计简洁实用,充分复用了现有的代码和基础设施,能够满足Web UI调用Data Pipeline的各种需求。
本概要设计文档详细描述了Data Pipeline API的完整实现方案:
核心设计特点:
技术实现亮点:
实用性考虑:
Web UI友好设计:
关键实现变更:
本方案在保持简单实用的同时,提供了完整的功能支持,能够很好地满足Data Pipeline Web UI集成的需求。