本文档详细说明了 Data Pipeline 自动化训练数据生成的完整工作流程和 API 调用方法。通过这些 API,可以自动化生成数据库表的 DDL 文档、Markdown 说明、Question-SQL 训练对,并自动加载到训练数据库中。
方法 | 端点 | 描述 |
---|---|---|
POST |
/api/v0/data_pipeline/tasks |
创建数据管道任务 |
POST |
/api/v0/data_pipeline/tasks/{task_id}/execute |
执行数据管道任务 |
GET |
/api/v0/data_pipeline/tasks/{task_id} |
获取任务状态 |
GET |
/api/v0/data_pipeline/tasks |
获取任务列表 |
DELETE |
/api/v0/data_pipeline/tasks |
删除任务(批量) |
方法 | 端点 | 描述 |
---|---|---|
POST |
/api/v0/database/tables |
查询数据库表列表 |
POST |
/api/v0/data_pipeline/tasks/{task_id}/table-list |
在线提交表名列表 |
POST |
/api/v0/data_pipeline/tasks/{task_id}/upload-table-list |
上传表清单文件 |
GET |
/api/v0/data_pipeline/tasks/{task_id}/table-list-info |
获取表清单文件信息 |
方法 | 端点 | 描述 |
---|---|---|
GET |
/api/v0/data_pipeline/tasks/{task_id}/files |
查看任务文件列表 |
GET |
/api/v0/data_pipeline/tasks/{task_id}/files/{file_name} |
下载任务文件 |
POST |
/api/v0/data_pipeline/tasks/{task_id}/files |
上传文件到任务目录 |
方法 | 端点 | 描述 |
---|---|---|
GET |
/api/v0/data_pipeline/tasks/{task_id}/logs |
获取任务日志 |
下面是完整的执行流程和API调用说明:
API: POST /api/v0/data_pipeline/tasks
请求地址: http://localhost:8084/api/v0/data_pipeline/tasks
参数样例1:
{
"task_name": "服务区初始化数据加载",
"db_name": "highway_db",
"business_context": "高速公路服务区管理系统"
}
参数样例2:
{
"db_name": "highway_db",
"business_context": "高速公路服务区管理系统",
"enable_sql_validation": true,
"enable_llm_repair": true,
"modify_original_file": true,
"enable_training_data_load": true
}
table_list_file
(string): 表清单文件路径,如未提供则进入文件上传模式,目前这种方式已经废弃business_context
(string): 业务上下文描述,默认为"数据库管理系统",使用默认值会严重影响LLM对数据表业务主题判断的准确性db_name
(string): 数据库名称,如果不提供,从连接字符串中提取db_connection
(string): 完整的PostgreSQL连接字符串注意:目前所有的控制参数都不在WEB UI暴露给用户,它们的默认值都是true。
enable_sql_validation
(boolean, 默认true): 是否启用SQL验证enable_llm_repair
(boolean, 默认true): 是否启用LLM修复modify_original_file
(boolean, 默认true): 是否修改原始文件enable_training_data_load
(boolean, 默认true): 是否启用训练数据加载执行步骤流程:
1. DDL/MD生成 (必需)
↓
2. Question-SQL生成 (必需)
↓
3. SQL验证 (受enable_sql_validation控制)
├─ SQL验证失败 → LLM修复 (受enable_llm_repair控制)
└─ 文件修改 (受modify_original_file控制)
↓
4. 训练数据加载 (受enable_training_data_load控制)
对于前端UI,主要提供四个参数 business_context
、db_name
、db_connection
、task_name
,如果db_connection
连接串中填写了数据库的名字,那么db_name
可以忽略。
请求:
{
"task_name": "服务区初始化数据加载",
"db_name": "highway_db",
"business_context": "高速公路服务区管理系统"
}
响应(创建成功):
注意:请记录返回的
task_id
,后续的操作都需要使用这个task_id
。
{
"code": 200,
"data": {
"created_at": "2025-07-02T17:40:00.268100",
"file_upload_mode": true,
"next_step": "POST /api/v0/data_pipeline/tasks/task_20250702_174000/upload-table-list",
"response": "任务创建成功,请上传表清单文件后再执行任务",
"status": "pending",
"task_id": "task_20250702_174000",
"task_name": "服务区初始化数据加载"
},
"message": "操作成功",
"success": true
}
有两种方式提交表名列表,这些表是将来用NL2SQL查询的,我们需要基于这些表的定义和数据生成训练数据集。
重要:请注意上个步骤中返回的
task_id
,在接下来的步骤中都需要用到这个task_id
。
API: POST /api/v0/database/tables
请求地址: http://localhost:8084/api/v0/database/tables
参数(都是可选参数):db_connection / schema / table_name_pattern
如果要查询的数据库没有在app_config.py中配置,或者不是查询业务数据的表,那么需要提供db_connection字符串。
{
"db_connection": "postgresql://postgres:postgres@192.168.67.1:5432/highway_db",
"schema": "public,ods,dw",
"table_name_pattern": "*_area*"
}
请求示例:
直接使用空参数{}
,会返回app_config.py中配置的业务数据库中所有public.*schema的表
预期返回结果:
{
"code": 200,
"data": {
"db_connection_info": {
"database": "highway_db"
},
"response": "获取表列表成功",
"schemas": [
"public",
"ods",
"dw"
],
"table_name_pattern": "*_area*",
"tables": [
"public.bss_section_route_area_link",
"public.bss_service_area",
"public.bss_service_area_mapper"
],
"total": 3
},
"message": "操作成功",
"success": true
}
API: POST /api/v0/data_pipeline/tasks/{task_id}/table-list
请求地址: http://localhost:8084/api/v0/data_pipeline/tasks/task_20250702_144901/table-list
参数:
只有一个必选参数 tables
,后面的表名使用逗号分隔,支持 schema.table
的格式。
{
"tables": "bss_car_day_count,bss_business_day_data,bss_company,bss_section_route,bss_section_route_area_link,bss_service_area,bss_service_area_mapper"
}
预期返回结果:
{
"code": 200,
"data": {
"created_time": "2025-07-02T18:07:15.596971",
"file_size": 220,
"file_size_formatted": "220.0 B",
"filename": "table_list.txt",
"original_count": 7,
"response": "表清单已成功创建,包含 7 个表",
"table_count": 7,
"task_id": "task_20250702_174000",
"unique_table_count": 7
},
"message": "操作成功",
"success": true
}
API: POST /api/v0/data_pipeline/tasks/{task_id}/upload-table-list
预期返回结果:
{
"code": 200,
"data": {
"file_size": 284,
"file_size_formatted": "284.0 B",
"filename": "table_list.txt",
"response": "表清单文件上传成功",
"task_id": "task_20250702_144901",
"upload_time": "2025-07-02T14:59:37.143754"
},
"message": "操作成功",
"success": true
}
主要用来排查问题的,目前前端UI不用关注这个API。
API: GET /api/v0/data_pipeline/tasks/{task_id}/table-list-info
请求地址: http://localhost:8084/api/v0/data_pipeline/tasks/task_20250702_174000/table-list-info
预期返回结果:
{
"code": 200,
"data": {
"created_at": "2025-07-02T18:07:15.596353",
"exists": true,
"file_name": "table_list.txt",
"file_path": "C:\\Projects\\cursor_projects\\Vanna-Chainlit-Chromadb\\data_pipeline\\training_data\\task_20250702_174000\\table_list.txt",
"file_size": 220,
"file_size_formatted": "220.0 B",
"has_file": true,
"is_readable": true,
"response": "获取表清单文件信息成功",
"table_count": 7,
"task_id": "task_20250702_174000",
"uploaded_at": "2025-07-02T18:07:15.596971"
},
"message": "操作成功",
"success": true
}
API: POST /api/v0/data_pipeline/tasks/{task_id}/execute
完整执行的参数:
{
"execution_mode": "complete"
}
预期返回结果:
该作业属于异步执行,提交后调度成功就可以返回。
{
"code": 200,
"data": {
"execution_mode": "complete",
"message": "任务正在后台执行,请通过状态接口查询进度",
"response": "任务执行已启动",
"step_name": null,
"task_id": "task_20250702_174000"
},
"message": "操作成功",
"success": true
}
API: GET /api/v0/data_pipeline/tasks/{task_id}
请求地址: http://localhost:8084/api/v0/data_pipeline/tasks/task_20250702_174000
正在执行第一步:"ddl_generation": "running"
{
"code": 200,
"data": {
"completed_at": null,
"created_at": "2025-07-02T17:40:00.268100",
"current_step": {
"execution_id": "task_20250702_174000_step_ddl_generation_exec_20250702_190410",
"started_at": "2025-07-02T19:04:09.933108",
"status": "running",
"step": "ddl_generation"
},
"error_message": null,
"parameters": {
"business_context": "高速公路服务区管理系统",
"db_connection": "postgresql://postgres:postgres@192.168.67.1:6432/highway_db",
"enable_llm_repair": true,
"enable_sql_validation": true,
"enable_training_data_load": true,
"file_upload_mode": true,
"modify_original_file": true,
"table_list_file": "{task_directory}/table_list.txt"
},
"response": "获取任务状态成功",
"result": null,
"started_at": "2025-07-02T19:04:09.925931",
"status": "in_progress",
"step_status": {
"ddl_generation": "running",
"qa_generation": "pending",
"sql_validation": "pending",
"training_load": "pending"
},
"steps": [
{
"completed_at": null,
"error_message": null,
"started_at": "2025-07-02T19:04:09.933108",
"step_name": "ddl_generation",
"step_status": "running"
},
{
"completed_at": null,
"error_message": null,
"started_at": null,
"step_name": "qa_generation",
"step_status": "pending"
},
{
"completed_at": null,
"error_message": null,
"started_at": null,
"step_name": "sql_validation",
"step_status": "pending"
},
{
"completed_at": null,
"error_message": null,
"started_at": null,
"step_name": "training_load",
"step_status": "pending"
}
],
"task_id": "task_20250702_174000",
"task_name": "服务区初始化数据加载",
"total_steps": 4
},
"message": "操作成功",
"success": true
}
四个步骤全部完成:
"status": "completed"
"step_status": { "ddl_generation": "completed", "qa_generation": "completed", "sql_validation": "completed", "training_load": "completed" }
{
"code": 200,
"data": {
"completed_at": "2025-07-02T19:21:03.007862",
"created_at": "2025-07-02T17:40:00.268100",
"current_step": null,
"error_message": null,
"parameters": {
"business_context": "高速公路服务区管理系统",
"db_connection": "postgresql://postgres:postgres@192.168.67.1:6432/highway_db",
"enable_llm_repair": true,
"enable_sql_validation": true,
"enable_training_data_load": true,
"file_upload_mode": true,
"modify_original_file": true,
"table_list_file": "{task_directory}/table_list.txt"
},
"response": "获取任务状态成功",
"result": null,
"started_at": "2025-07-02T19:04:09.925931",
"status": "completed",
"step_status": {
"ddl_generation": "completed",
"qa_generation": "completed",
"sql_validation": "completed",
"training_load": "completed"
},
"steps": [
{
"completed_at": "2025-07-02T19:10:18.599375",
"error_message": null,
"started_at": "2025-07-02T19:04:09.933108",
"step_name": "ddl_generation",
"step_status": "completed"
},
{
"completed_at": "2025-07-02T19:17:23.449415",
"error_message": null,
"started_at": "2025-07-02T19:10:18.602632",
"step_name": "qa_generation",
"step_status": "completed"
},
{
"completed_at": "2025-07-02T19:19:48.712247",
"error_message": null,
"started_at": "2025-07-02T19:17:23.453558",
"step_name": "sql_validation",
"step_status": "completed"
},
{
"completed_at": "2025-07-02T19:21:03.002708",
"error_message": null,
"started_at": "2025-07-02T19:19:48.715083",
"step_name": "training_load",
"step_status": "completed"
}
],
"task_id": "task_20250702_174000",
"task_name": "服务区初始化数据加载",
"total_steps": 4
},
"message": "操作成功",
"success": true
}
API: GET /api/v0/data_pipeline/tasks/{task_id}/logs
请求地址: http://localhost:8084/api/v0/data_pipeline/tasks/task_20250702_174000/logs
预期返回结果:
{
"code": 200,
"data": {
"log_file": "C:\\Projects\\cursor_projects\\Vanna-Chainlit-Chromadb\\data_pipeline\\training_data\\task_20250702_174000\\data_pipeline.log",
"logs": [
{
"level": "INFO",
"logger": "TaskDir_task_20250702_174000",
"message": "任务目录日志初始化完成 - 任务ID: task_20250702_174000",
"timestamp": "2025-07-02 19:04:10"
},
{
"level": "INFO",
"logger": "TaskDir_task_20250702_174000",
"message": "任务参数: {\"db_connection\": \"postgresql://postgres:postgres@192.168.67.1:6432/highway_db\", \"table_list_file\": \"{task_directory}/table_list.txt\", \"business_context\": \"高速公路服务区管理系统\", \"file_upload_mode\": true, \"enable_llm_repair\": true, \"modify_original_file\": true, \"enable_sql_validation\": true, \"enable_training_data_load\": true}",
"timestamp": "2025-07-02 19:04:10"
},
{
"level": "INFO",
"logger": "TaskDir_task_20250702_174000",
"message": "完整工作流任务开始执行",
"timestamp": "2025-07-02 19:04:10"
},
{
"level": "INFO",
"logger": "TaskDir_task_20250702_174000",
"message": "[ddl_generation] 开始执行步骤: ddl_generation",
"timestamp": "2025-07-02 19:04:10"
},
{
"level": "INFO",
"logger": "TaskDir_task_20250702_174000",
"message": "[ddl_generation] 开始执行DDL/MD生成步骤\n2025-07-02 19:04:10 [INFO] [data_pipeline.SchemaWorkflowOrchestrator] schema_workflow.py:167 - ============================================================",
"timestamp": "2025-07-02 19:04:10"
},
{
"level": "ERROR",
"logger": "[data_pipeline.SchemaTrainingDataAgent] training_data_agent.py:234 - ❌ 表 public.bss_car_day_count 处理失败,耗时",
"message": "55.71秒",
"timestamp": "2025-07-02 19:05:06"
}
],
"response": "获取任务日志成功",
"source": "file",
"task_id": "task_20250702_174000",
"total": 23
},
"message": "操作成功",
"success": true
}
API: GET /api/v0/data_pipeline/tasks/{task_id}/files
请求地址: http://localhost:8084/api/v0/data_pipeline/tasks/task_20250702_174000/files
预期返回结果:
{
"code": 200,
"data": {
"directory_info": {
"directory_path": "C:\\Projects\\cursor_projects\\Vanna-Chainlit-Chromadb\\data_pipeline\\training_data\\task_20250702_174000",
"exists": true,
"total_files": 26,
"total_size": 104982,
"total_size_formatted": "102.5 KB"
},
"files": [
{
"created_at": "2025-07-02T19:04:10.194958",
"file_name": "data_pipeline.log",
"file_size": 35951,
"file_size_formatted": "35.1 KB",
"file_type": "log",
"is_readable": true,
"modified_at": "2025-07-02T19:21:03.233582"
},
{
"created_at": "2025-07-02T19:21:03.230334",
"file_name": "task_result.json",
"file_size": 3601,
"file_size_formatted": "3.5 KB",
"file_type": "json",
"is_readable": true,
"modified_at": "2025-07-02T19:21:03.230878"
},
{
"created_at": "2025-07-02T19:19:48.483686",
"file_name": "sql_validation_20250702_191948_summary.log",
"file_size": 2839,
"file_size_formatted": "2.8 KB",
"file_type": "log",
"is_readable": true,
"modified_at": "2025-07-02T19:19:48.484199"
},
{
"created_at": "2025-07-02T18:07:15.596353",
"file_name": "table_list.txt",
"file_size": 220,
"file_size_formatted": "220.0 B",
"file_type": "text",
"is_readable": true,
"modified_at": "2025-07-02T18:07:15.596971"
}
],
"response": "获取任务文件列表成功",
"task_id": "task_20250702_174000"
},
"message": "操作成功",
"success": true
}
API: GET /api/v0/data_pipeline/tasks/{task_id}/files/{file_name}
请求地址: http://localhost:8084/api/v0/data_pipeline/tasks/task_20250702_174000/files/bss_company.ddl
返回文件的内容:
-- 中文名: 业务支撑系统公司信息表
-- 描述: 业务支撑系统公司信息表,存储服务区关联企业的基础信息及状态变更记录
create table public.bss_company (
id varchar(32) not null -- 主键ID,主键,
version integer not null -- 版本号,
create_ts timestamp -- 创建时间,
created_by varchar(50) -- 创建人ID,
update_ts timestamp -- 更新时间,
updated_by varchar(50) -- 更新人ID,
delete_ts timestamp -- 删除时间,
deleted_by varchar(50) -- 删除人ID,
company_name varchar(255) -- 公司名称,
company_no varchar(255) -- 公司编码,
primary key (id)
);
如果有需要,可以把自动生成的训练数据下载到本地,进行修改,然后上传。或者,直接上传本地准备好的训练数据集。
API: POST /api/v0/data_pipeline/tasks/{task_id}/files
请求地址: http://localhost:8084/api/v0/data_pipeline/tasks/task_20250702_213000/files
基本上传(无同名文件):
{
"code": 200,
"data": {
"response": "文件上传成功",
"success": true,
"task_id": "task_20250702_213000",
"uploaded_file": {
"filename": "tables.txt",
"overwrite_mode": "backup",
"size": 284,
"size_formatted": "284.0 B",
"uploaded_at": "2025-07-02T21:42:13.627532"
}
},
"message": "操作成功",
"success": true
}
备份模式下,有同名文件的返回结果:
{
"code": 200,
"data": {
"backup_info": {
"backup_created_at": "2025-07-02T21:43:06.048935",
"backup_filename": "tables.txt_bak1",
"backup_version": 1,
"had_existing_file": true
},
"response": "文件上传成功",
"success": true,
"task_id": "task_20250702_213000",
"uploaded_file": {
"filename": "tables.txt",
"overwrite_mode": "backup",
"size": 284,
"size_formatted": "284.0 B",
"uploaded_at": "2025-07-02T21:43:06.050035"
}
},
"message": "操作成功",
"success": true
}
file
(file, required): 要上传的文件overwrite_mode
(string, optional): 重名处理模式,可选值:backup
(默认)、replace
、skip
# 基本上传(默认备份模式)
curl -X POST \
http://localhost:8084/api/v0/data_pipeline/tasks/task_20250702_123456/files \
-F "file=@test.ddl"
# 上传DDL文件(备份模式)
curl -X POST \
http://localhost:8084/api/v0/data_pipeline/tasks/task_20250702_213000/files \
-F "file=@test.ddl" \
-F "overwrite_mode=backup"
# 上传文件(替换模式)
curl -X POST \
http://localhost:8084/api/v0/data_pipeline/tasks/task_20250702_213000/files \
-F "file=@config.json" \
-F "overwrite_mode=replace"
API: GET /api/v0/data_pipeline/tasks
请求地址: http://localhost:8084/api/v0/data_pipeline/tasks
预期返回结果:
{
"code": 200,
"data": {
"limit": 50,
"offset": 0,
"response": "获取任务列表成功",
"tasks": [
{
"business_context": "数据库管理系统",
"completed_at": null,
"created_at": "2025-07-02T22:38:03.338248",
"created_by": "guest",
"db_name": "highway_db",
"directory_exists": false,
"started_at": null,
"status": "pending",
"step_status": "pending",
"task_id": "task_20250702_223802",
"task_name": "任务删除测试",
"updated_at": "2025-07-02T22:39:25.276811"
},
{
"business_context": "数据库管理系统",
"completed_at": null,
"created_at": "2025-07-02T22:37:51.797283",
"created_by": "guest",
"db_name": "highway_db",
"directory_exists": true,
"started_at": null,
"status": "pending",
"step_status": "pending",
"task_id": "task_20250702_223751",
"task_name": "任务删除测试",
"updated_at": null
}
],
"total": 29
},
"message": "操作成功",
"success": true
}
API: DELETE /api/v0/data_pipeline/tasks
请求地址: http://localhost:8084/api/v0/data_pipeline/tasks
参数名 | 类型 | 必需 | 默认值 | 说明 |
---|---|---|---|---|
task_ids |
Array[String] | ✅ | - | 要删除的任务ID列表,支持单个或多个 |
confirm |
Boolean | ✅ | - | 确认删除操作,必须为 true |
delete_database_records |
Boolean | ❌ | false |
是否删除数据库记录 |
continue_on_error |
Boolean | ❌ | true |
批量删除时遇到错误是否继续 |
删除单个任务:
curl -X DELETE \
http://localhost:8084/api/v0/data_pipeline/tasks \
-H "Content-Type: application/json" \
-d '{
"task_ids": ["task_20250702_223802"],
"confirm": true
}'
删除多个任务:
curl -X DELETE \
http://localhost:8084/api/v0/data_pipeline/tasks \
-H "Content-Type: application/json" \
-d '{
"task_ids": ["task_20250702_223802", "task_20250702_223751", "task_20250702_223705"],
"confirm": true,
"delete_database_records": false,
"continue_on_error": true
}'
请求参数示例:
{
"task_ids": ["manual_20250701_165444"],
"confirm": true
}
预期返回结果:
{
"code": 200,
"data": {
"deleted_at": "2025-07-02T23:15:26.423971",
"deleted_tasks": [
{
"database_records_deleted": false,
"deleted_at": "2025-07-02T23:15:26.423971",
"deleted_files_count": 1,
"deleted_size": "281 B",
"directory_deleted": true,
"success": true,
"task_id": "manual_20250701_165444"
}
],
"failed_tasks": [],
"response": "任务目录删除成功",
"summary": {
"failed": 0,
"successfully_deleted": 1,
"total_requested": 1
}
},
"message": "操作成功",
"success": true
}