Data Pipeline API 是一个简化的数据管道调度和管理系统,支持通过 REST API 调度执行数据管道任务,提供任务管理、进度监控、双日志系统和文件管理等功能。
SimpleTaskManager
) - 管理任务生命周期和数据库操作SimpleWorkflowExecutor
) - 执行具体的数据管道任务task_executor.py
) - 独立进程执行任务SimpleFileManager
) - 管理任务输出文件和下载系统使用 4 个主要数据库表(部署在 pgvector 数据库中):
data_pipeline_tasks
- 任务主表data_pipeline_task_executions
- 任务执行记录表data_pipeline_task_logs
- 任务日志表data_pipeline_task_outputs
- 任务输出文件表API请求 → citu_app.py → subprocess → task_executor.py → SimpleWorkflowExecutor → SchemaWorkflowOrchestrator
./data_pipeline/training_data/{task_id}/
目录中独立存储首先运行 SQL 初始化脚本创建必要的数据库表:
psql -h host -p port -U username -d database_name -f data_pipeline/sql/init_tables.sql
启动 Flask 应用(包含 Data Pipeline API):
python citu_app.py
应用将在 http://localhost:8084
启动,Data Pipeline API 端点前缀为 /api/v0/data_pipeline/
。
curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks \\
-H "Content-Type: application/json" \\
-d '{
"table_list_file": "tables.txt",
"business_context": "高速公路服务区管理系统",
"db_name": "highway_db",
"enable_sql_validation": true,
"enable_llm_repair": true,
"modify_original_file": true,
"enable_training_data_load": true
}'
响应示例:
{
"success": true,
"code": 201,
"message": "任务创建成功",
"data": {
"task_id": "task_20250627_143052",
"status": "pending",
"created_at": "2025-06-27T14:30:52"
}
}
完整工作流执行:
curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052/execute \\
-H "Content-Type: application/json" \\
-d '{
"execution_mode": "complete",
"force_execution": false,
"clean_existing_files": true
}'
单步执行:
curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052/execute \\
-H "Content-Type: application/json" \\
-d '{
"execution_mode": "step",
"step_name": "ddl_generation"
}'
可用的步骤名称:
ddl_generation
- DDL生成和MD文档生成qa_generation
- Q&A问答对生成sql_validation
- SQL验证和修复training_load
- 训练数据加载到Vanna响应示例:
{
"success": true,
"code": 202,
"message": "任务执行已启动",
"data": {
"task_id": "task_20250627_143052",
"execution_mode": "step",
"step_name": "ddl_generation",
"status": "running"
}
}
#### 3. 查询任务状态
```bash
curl -X GET http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052
响应示例:
{
"success": true,
"data": {
"task_id": "task_20250627_143052",
"status": "in_progress",
"step_status": {
"ddl_generation": "completed",
"qa_generation": "running",
"sql_validation": "pending",
"training_load": "pending"
},
"current_execution": {
"execution_id": "task_20250627_143052_step_qa_generation_exec_20250627_143100",
"step": "qa_generation",
"status": "running",
"started_at": "2025-06-27T14:31:00"
}
}
}
curl -X GET "http://localhost:8084/api/v0/data_pipeline/tasks?limit=10&status=completed"
curl -X GET "http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052/logs?limit=50&level=ERROR"
任务执行过程中的详细日志会写入任务目录的 data_pipeline.log
文件:
文件位置: ./data_pipeline/training_data/{task_id}/data_pipeline.log
日志内容示例:
2025-07-01 14:30:52 [INFO] TaskDir_task_20250701_143052: 任务目录日志初始化完成 - 任务ID: task_20250701_143052
2025-07-01 14:30:53 [INFO] TaskDir_task_20250701_143052: [complete] 开始执行步骤: complete
2025-07-01 14:30:53 [INFO] DataPipelineOrchestrator: 开始执行完整工作流
2025-07-01 14:30:54 [INFO] DDLMDGenerator: 开始处理表: bss_business_day_data
curl -X GET http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052/files
curl -X GET http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052/files/qs_highway_db_20250627_143052_pair.json \\
-o downloaded_file.json
curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052/files/archive \\
-H "Content-Type: application/json" \\
-d '{"archive_format": "zip"}'
curl -X GET http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052/files/integrity
curl -X POST http://localhost:8084/api/v0/data_pipeline/files/cleanup \\
-H "Content-Type: application/json" \\
-d '{"days_to_keep": 30}'
curl -X GET http://localhost:8084/api/v0/data_pipeline/monitor/status
响应包含:
curl -X GET http://localhost:8084/api/v0/data_pipeline/monitor/tasks/task_20250627_143052
curl -X GET "http://localhost:8084/api/v0/data_pipeline/monitor/metrics/history?minutes=120"
curl -X GET "http://localhost:8084/api/v0/data_pipeline/monitor/anomalies?hours=24"
curl -X GET http://localhost:8084/api/v0/data_pipeline/stats
DDL生成 (ddl_generation
)
.ddl
文件和 _detail.md
文档metadata.txt
和 filename_mapping.txt
Question-SQL生成 (qa_generation
)
qs_*_pair.json
文件SQL验证 (sql_validation
) - 可选
训练数据加载 (training_load
) - 可选
pending
- 任务已创建,等待执行in_progress
- 任务正在执行中partial_completed
- 部分步骤完成completed
- 任务完全完成failed
- 任务执行失败pending
- 步骤等待执行running
- 步骤正在执行completed
- 步骤执行完成failed
- 步骤执行失败每个任务在 ./data_pipeline/training_data/
下创建独立目录:
./data_pipeline/training_data/
├── task_20250627_143052/ # 任务ID作为目录名
│ ├── task_config.json # 任务配置参数
│ ├── task_result.json # 最终执行结果
│ ├── ddl_generation_result.json # DDL生成步骤结果
│ ├── qa_generation_result.json # QA生成步骤结果
│ ├── sql_validation_result.json # SQL验证步骤结果
│ ├── training_load_result.json # 训练加载步骤结果
│ ├── bss_*.ddl # 生成的DDL文件
│ ├── bss_*_detail.md # 生成的MD文档
│ ├── qs_*.json # Question-SQL对
│ ├── metadata.txt # 元数据文件
│ ├── filename_mapping.txt # 文件映射
│ ├── sql_validation_*_summary.log # SQL验证摘要
│ └── sql_validation_*_report.json # SQL验证详细报告
└── task_20250627_150123/
└── ...
任务创建失败
执行超时
文件访问错误
依赖检查失败
force_execution: true
跳过依赖检查# 查看任务错误日志
curl -X GET "http://localhost:8084/api/v0/data_pipeline/tasks/TASK_ID/logs?level=ERROR"
# 查看系统异常
curl -X GET "http://localhost:8084/api/v0/data_pipeline/monitor/anomalies"
# 获取完整系统状态
curl -X GET http://localhost:8084/api/v0/data_pipeline/monitor/status
# 清理僵尸任务(通过数据库管理器)
# 清理旧文件
curl -X POST http://localhost:8084/api/v0/data_pipeline/files/cleanup \\
-H "Content-Type: application/json" \\
-d '{"days_to_keep": 7}'
系统支持异常检测和告警,可以通过修改 TaskAnomalyDetector
类添加自定义告警逻辑。
系统自动收集性能指标,支持查看历史数据和趋势分析。
支持文件完整性验证、压缩包创建、批量下载等功能。
POST /api/v0/data_pipeline/tasks
Content-Type: application/json
{
"table_list_file": "tables.txt",
"business_context": "业务描述",
"db_name": "highway_db",
"enable_sql_validation": true,
"enable_llm_repair": true,
"modify_original_file": true,
"enable_training_data_load": true
}
参数说明:
table_list_file
(必填): 表清单文件路径business_context
(必填): 业务上下文描述db_name
(可选): 指定业务数据库名称,如不提供则使用app_config中的默认配置注意: 数据库连接信息自动从 app_config.py
的 APP_DB_CONFIG
获取,无需在API请求中提供
预期返回:
{
"success": true,
"code": 201,
"message": "任务创建成功",
"data": {
"task_id": "task_20250701_143052",
"status": "pending",
"created_at": "2025-07-01T14:30:52"
}
}
POST /api/v0/data_pipeline/tasks/{task_id}/execute
Content-Type: application/json
# 完整工作流
{"execution_mode": "complete"}
# 单步执行
{"execution_mode": "step", "step_name": "ddl_generation"}
预期返回:
{
"success": true,
"code": 202,
"message": "任务执行已启动",
"data": {
"task_id": "task_20250701_143052",
"execution_mode": "complete",
"status": "running"
}
}
GET /api/v0/data_pipeline/tasks/{task_id}
预期返回:
{
"success": true,
"data": {
"task_id": "task_20250701_143052",
"status": "in_progress",
"step_status": {
"ddl_generation": "completed",
"qa_generation": "running",
"sql_validation": "pending",
"training_load": "pending"
},
"created_at": "2025-07-01T14:30:52",
"started_at": "2025-07-01T14:30:53"
}
}
GET /api/v0/data_pipeline/tasks?limit=10&status=completed
预期返回:
{
"success": true,
"data": {
"tasks": [
{
"task_id": "task_20250701_143052",
"status": "completed",
"created_at": "2025-07-01T14:30:52"
}
]
}
}
GET /api/v0/data_pipeline/tasks/{task_id}/logs?limit=50&level=ERROR
预期返回:
{
"success": true,
"data": {
"logs": [
{
"id": 123,
"timestamp": "2025-07-01T14:30:54",
"level": "INFO",
"message": "开始执行步骤: ddl_generation",
"step_name": "ddl_generation"
}
]
}
}
GET /api/v0/data_pipeline/tasks/{task_id}/files
预期返回:
{
"success": true,
"data": {
"files": [
{
"file_name": "data_pipeline.log",
"file_type": "log",
"file_size": 1024,
"download_url": "/api/v0/data_pipeline/tasks/{task_id}/files/download/data_pipeline.log"
},
{
"file_name": "qs_highway_db_20250701_143052_pair.json",
"file_type": "json",
"file_size": 10240,
"download_url": "/api/v0/data_pipeline/tasks/{task_id}/files/download/qs_highway_db_20250701_143052_pair.json"
}
]
}
}
GET /api/v0/data_pipeline/tasks/{task_id}/files/download/{filename}
预期返回: 文件二进制内容,Content-Type 根据文件类型设置
GET /api/v0/data_pipeline/tasks/{task_id}/executions
预期返回:
{
"success": true,
"data": {
"executions": [
{
"execution_id": "task_20250701_143052_step_ddl_generation_exec_20250701143053",
"execution_step": "ddl_generation",
"status": "completed",
"started_at": "2025-07-01T14:30:53",
"completed_at": "2025-07-01T14:35:20"
}
]
}
}
所有接口在出错时都返回统一的错误格式:
{
"success": false,
"code": 400,
"message": "错误描述",
"error_type": "validation_error",
"details": {}
}
常见错误码:
400
- 请求参数错误404
- 任务不存在409
- 任务冲突(已有任务在执行)500
- 服务器内部错误503
- 服务暂时不可用如有问题,请检查:
./data_pipeline/training_data/{task_id}/data_pipeline.log
通过监控API可以获取详细的系统状态和错误信息,有助于快速定位和解决问题。