data_pipeline_API调用指南.md 53 KB

Data Pipeline API调用指南

概述

本文档详细介绍data_pipeline模块的API调用方式、支持的参数列表以及API调用时的日志配置方式。data_pipeline API系统基于Flask框架构建,提供RESTful接口支持后台任务执行、进度追踪和结果管理。

目录

  1. API架构概览
  2. 核心API端点
  3. 任务管理API
  4. 文件管理API
  5. 日志管理API
  6. 数据库工具API
  7. 日志配置
  8. 使用示例
  9. 错误处理
  10. 最佳实践

1. API架构概览

1.1 系统架构

unified_api.py (Flask应用)
├── 任务管理API
│   ├── POST /api/v0/data_pipeline/tasks                    # 创建任务
│   ├── POST /api/v0/data_pipeline/tasks/{task_id}/execute  # 执行任务
│   ├── GET  /api/v0/data_pipeline/tasks/{task_id}          # 获取任务状态
│   └── GET  /api/v0/data_pipeline/tasks                    # 获取任务列表
├── 文件管理API
│   ├── GET  /api/v0/data_pipeline/tasks/{task_id}/files    # 获取文件列表
│   ├── GET  /api/v0/data_pipeline/tasks/{task_id}/files/{filename} # 下载文件
│   └── POST /api/v0/data_pipeline/tasks/{task_id}/upload-table-list # 上传表清单
├── 日志管理API
│   ├── GET  /api/v0/data_pipeline/tasks/{task_id}/logs     # 获取任务日志
│   └── POST /api/v0/data_pipeline/tasks/{task_id}/logs/query # 高级日志查询
└── 数据库工具API
    ├── POST /api/v0/database/tables                        # 获取数据库表列表
    └── POST /api/v0/database/table/ddl                     # 获取表DDL/MD文档

1.2 核心组件

组件 功能 实现位置
SimpleWorkflowManager 任务管理器 data_pipeline.api.simple_workflow
SimpleFileManager 文件管理器 data_pipeline.api.simple_file_manager
SimpleTaskManager 数据库管理器 data_pipeline.api.simple_db_manager
TableInspectorAPI 数据库检查器 data_pipeline.api.table_inspector_api
task_executor.py 独立任务执行器 data_pipeline.task_executor

1.3 任务执行模式

模式 描述 使用场景
完整工作流 一次性执行4个步骤 生产环境,自动化处理
分步执行 逐步执行各个步骤 调试,质量控制
后台执行 使用subprocess独立进程 长时间任务,不阻塞API
Vector表管理 备份和清空vector表数据 重新训练前清理旧数据

2. 核心API端点

2.1 基础信息

  • 基础URL: http://localhost:8084
  • API版本: v0
  • 内容类型: application/json
  • 字符编码: UTF-8

2.2 通用响应格式

成功响应

{
    "success": true,
    "code": 200,
    "message": "操作成功",
    "data": {
        // 具体数据
    }
}

错误响应

{
    "success": false,
    "code": 400,
    "message": "请求参数错误",
    "error": {
        "type": "ValidationError",
        "details": "缺少必需参数: table_list_file"
    }
}

2.3 HTTP状态码

状态码 说明 使用场景
200 成功 获取数据成功
201 创建成功 任务创建成功
202 已接受 任务执行已启动
400 请求错误 参数验证失败
404 未找到 任务不存在
409 冲突 任务已在执行
500 服务器错误 内部错误

2.4 Vector表管理功能

data_pipeline API现在支持Vector表管理功能,用于备份和清空向量数据:

关键参数

  • backup_vector_tables: 备份vector表数据到任务目录
  • truncate_vector_tables: 清空vector表数据(自动启用备份)

参数依赖关系

  • ✅ 可以单独使用 backup_vector_tables
  • ❌ 不能单独使用 truncate_vector_tables
  • 🔄 使用 truncate_vector_tables 时自动启用 backup_vector_tables

影响的表

  • langchain_pg_collection: 只备份,不清空
  • langchain_pg_embedding: 备份并清空

应用场景

  • 重新训练: 在加载新训练数据前清空旧的embedding数据
  • 数据迁移: 备份vector数据用于环境迁移
  • 版本管理: 保留不同版本的vector数据备份

3. 任务管理API

3.1 创建任务

端点: POST /api/v0/data_pipeline/tasks

请求参数

参数 必需 类型 默认值 说明
table_list_file string - 表清单文件路径,不提供则需要后续上传
business_context string - 业务上下文描述
db_name string - 数据库名称
db_connection string - 数据库连接字符串,不提供则使用默认配置
task_name string - 任务名称
enable_sql_validation boolean true 是否启用SQL验证
enable_llm_repair boolean true 是否启用LLM修复
modify_original_file boolean true 是否修改原始文件
enable_training_data_load boolean true 是否启用训练数据加载
backup_vector_tables boolean false 是否备份vector表数据
truncate_vector_tables boolean false 是否清空vector表数据(自动启用备份)

请求示例

curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks \
  -H "Content-Type: application/json" \
  -d '{
    "table_list_file": "tables.txt",
    "business_context": "高速公路服务区管理系统",
    "db_name": "highway_db",
    "task_name": "高速公路数据处理",
    "enable_sql_validation": true,
    "enable_llm_repair": true,
    "modify_original_file": true,
    "enable_training_data_load": true,
    "backup_vector_tables": false,
    "truncate_vector_tables": false
  }'

响应示例

{
    "success": true,
    "code": 201,
    "message": "任务创建成功",
    "data": {
        "task_id": "task_20250627_143052",
        "task_name": "高速公路数据处理",
        "status": "pending",
        "created_at": "2025-06-27T14:30:52"
    }
}

3.2 执行任务

端点: POST /api/v0/data_pipeline/tasks/{task_id}/execute

请求参数

参数 必需 类型 默认值 说明
execution_mode enum complete 执行模式:complete/step
step_name string - 步骤名称,步骤模式时必需
backup_vector_tables boolean false 是否备份vector表数据
truncate_vector_tables boolean false 是否清空vector表数据(自动启用备份)

有效步骤名称

  • ddl_generation - DDL/MD文档生成
  • qa_generation - Question-SQL对生成
  • sql_validation - SQL验证和修复
  • training_load - 训练数据加载

请求示例

# 执行完整工作流
curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052/execute \
  -H "Content-Type: application/json" \
  -d '{
    "execution_mode": "complete",
    "backup_vector_tables": false,
    "truncate_vector_tables": false
  }'

# 执行完整工作流并清空vector表
curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052/execute \
  -H "Content-Type: application/json" \
  -d '{
    "execution_mode": "complete",
    "truncate_vector_tables": true
  }'

# 执行单个步骤
curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052/execute \
  -H "Content-Type: application/json" \
  -d '{
    "execution_mode": "step",
    "step_name": "ddl_generation",
    "backup_vector_tables": false,
    "truncate_vector_tables": false
  }'

响应示例

{
    "success": true,
    "code": 202,
    "message": "任务执行已启动",
    "data": {
        "task_id": "task_20250627_143052",
        "execution_mode": "complete",
        "step_name": null,
        "message": "任务正在后台执行,请通过状态接口查询进度"
    }
}

3.3 获取任务状态

端点: GET /api/v0/data_pipeline/tasks/{task_id}

响应示例

{
    "success": true,
    "code": 200,
    "message": "获取任务状态成功",
    "data": {
        "task_id": "task_20250627_143052",
        "task_name": "高速公路数据处理",
        "status": "in_progress",
        "step_status": {
            "ddl_generation": "completed",
            "qa_generation": "running",
            "sql_validation": "pending",
            "training_load": "pending"
        },
        "created_at": "2025-06-27T14:30:52",
        "started_at": "2025-06-27T14:31:00",
        "completed_at": null,
        "parameters": {
            "business_context": "高速公路服务区管理系统",
            "enable_sql_validation": true,
            "backup_vector_tables": false,
            "truncate_vector_tables": false
        },
        "current_step": {
            "execution_id": "task_20250627_143052_step_qa_generation_exec_20250627143521",
            "step": "qa_generation",
            "status": "running",
            "started_at": "2025-06-27T14:35:21"
        },
        "total_steps": 4,
        "steps": [
            {
                "step_name": "ddl_generation",
                "step_status": "completed",
                "started_at": "2025-06-27T14:30:53",
                "completed_at": "2025-06-27T14:35:20",
                "error_message": null
            }
        ]
    }
}

任务状态说明

状态 说明
pending 待执行
in_progress 执行中
completed 已完成
failed 执行失败
cancelled 已取消

3.4 获取任务列表

端点: GET /api/v0/data_pipeline/tasks

查询参数

参数 类型 默认值 说明
limit int 50 返回数量限制
offset int 0 偏移量
status string - 状态过滤

请求示例

curl "http://localhost:8084/api/v0/data_pipeline/tasks?limit=20&status=completed"

响应示例

{
    "success": true,
    "code": 200,
    "message": "获取任务列表成功",
    "data": {
        "tasks": [
            {
                "task_id": "task_20250627_143052",
                "task_name": "高速公路数据处理",
                "status": "completed",
                "step_status": {
                    "ddl_generation": "completed",
                    "qa_generation": "completed",
                    "sql_validation": "completed",
                    "training_load": "completed"
                },
                "created_at": "2025-06-27T14:30:52",
                "started_at": "2025-06-27T14:31:00",
                "completed_at": "2025-06-27T14:45:30",
                "created_by": "user123",
                "db_name": "highway_db",
                "business_context": "高速公路服务区管理系统",
                "directory_exists": true,
                "updated_at": "2025-06-27T14:45:30"
            }
        ],
        "total": 1,
        "limit": 20,
        "offset": 0
    }
}

3.5 高级任务查询

端点: POST /api/v0/data_pipeline/tasks/query

请求参数

参数 类型 说明
page int 页码,默认1
page_size int 每页大小,默认20
status string 任务状态筛选
task_name string 任务名称模糊搜索
created_by string 创建者精确匹配
db_name string 数据库名称精确匹配
created_time_start string 创建时间范围开始
created_time_end string 创建时间范围结束

请求示例

curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks/query \
  -H "Content-Type: application/json" \
  -d '{
    "page": 1,
    "page_size": 10,
    "status": "completed",
    "task_name": "highway",
    "created_time_start": "2025-06-01T00:00:00",
    "created_time_end": "2025-06-30T23:59:59"
  }'

4. 文件管理API

4.1 获取任务文件列表

端点: GET /api/v0/data_pipeline/tasks/{task_id}/files

响应示例

{
    "success": true,
    "code": 200,
    "message": "获取文件列表成功",
    "data": {
        "task_id": "task_20250627_143052",
        "files": [
            {
                "file_name": "qs_highway_db_20250627_143052_pair.json",
                "file_type": "json",
                "file_size": 102400,
                "file_size_human": "100.0 KB",
                "created_at": "2025-06-27T14:40:30",
                "is_primary": true,
                "description": "Question-SQL训练数据对",
                "download_url": "/api/v0/data_pipeline/tasks/task_20250627_143052/files/qs_highway_db_20250627_143052_pair.json"
            },
            {
                "file_name": "data_pipeline.log",
                "file_type": "log",
                "file_size": 51200,
                "file_size_human": "50.0 KB",
                "created_at": "2025-06-27T14:30:52",
                "is_primary": false,
                "description": "任务执行日志",
                "download_url": "/api/v0/data_pipeline/tasks/task_20250627_143052/files/data_pipeline.log"
            }
        ],
        "total_files": 2,
        "total_size": 153600,
        "total_size_human": "150.0 KB"
    }
}

4.2 下载文件

端点: GET /api/v0/data_pipeline/tasks/{task_id}/files/{filename}

请求示例

curl -O http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052/files/qs_highway_db_20250627_143052_pair.json

响应

  • 成功时返回文件内容,设置适当的Content-Type
  • 失败时返回JSON错误信息

4.3 上传表清单文件

端点: POST /api/v0/data_pipeline/tasks/{task_id}/upload-table-list

请求参数

  • 使用multipart/form-data格式
  • 文件字段名: file
  • 支持的文件类型: .txt

请求示例

curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052/upload-table-list \
  -F "file=@tables.txt"

响应示例

{
    "success": true,
    "code": 200,
    "message": "表清单文件上传成功",
    "data": {
        "task_id": "task_20250627_143052",
        "file_name": "table_list.txt",
        "file_size": 1024,
        "file_path": "./data_pipeline/training_data/task_20250627_143052/table_list.txt",
        "table_count": 8,
        "tables": [
            "bss_business_day_data",
            "bss_car_day_count",
            "bss_company"
        ]
    }
}

4.4 获取表清单信息

端点: GET /api/v0/data_pipeline/tasks/{task_id}/table-list-info

响应示例

{
    "success": true,
    "code": 200,
    "message": "获取表清单信息成功",
    "data": {
        "task_id": "task_20250627_143052",
        "has_table_list": true,
        "file_name": "table_list.txt",
        "file_size": 1024,
        "file_path": "./data_pipeline/training_data/task_20250627_143052/table_list.txt",
        "table_count": 8,
        "tables": [
            "bss_business_day_data",
            "bss_car_day_count"
        ],
        "uploaded_at": "2025-06-27T14:32:15"
    }
}

4.5 直接创建表清单

端点: POST /api/v0/data_pipeline/tasks/{task_id}/table-list

请求参数

参数 必需 类型 说明
table_names array 表名列表
business_context string 业务上下文描述

请求示例

curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052/table-list \
  -H "Content-Type: application/json" \
  -d '{
    "table_names": [
      "bss_business_day_data",
      "bss_car_day_count",
      "bss_company"
    ],
    "business_context": "高速公路服务区管理系统"
  }'

5. 日志管理API

5.1 获取任务日志

端点: GET /api/v0/data_pipeline/tasks/{task_id}/logs

查询参数

参数 类型 默认值 说明
limit int 100 日志行数限制
level string - 日志级别过滤

请求示例

curl "http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052/logs?limit=50&level=ERROR"

响应示例

{
    "success": true,
    "code": 200,
    "message": "获取任务日志成功",
    "data": {
        "task_id": "task_20250627_143052",
        "logs": [
            {
                "timestamp": "2025-06-27 14:30:52",
                "level": "INFO",
                "logger": "SchemaWorkflowOrchestrator",
                "message": "🚀 开始执行Schema工作流编排"
            },
            {
                "timestamp": "2025-06-27 14:30:53",
                "level": "INFO",
                "logger": "DDLMDGenerator",
                "message": "开始处理表: bss_business_day_data"
            },
            {
                "timestamp": "2025-06-27 14:31:25",
                "level": "ERROR",
                "logger": "SQLValidator",
                "message": "SQL验证失败,尝试LLM修复"
            }
        ],
        "total": 3,
        "source": "file",
        "log_file": "/path/to/data_pipeline/training_data/task_20250627_143052/data_pipeline.log"
    }
}

5.2 高级日志查询

端点: POST /api/v0/data_pipeline/tasks/{task_id}/logs/query

请求参数

参数 类型 默认值 说明
page int 1 页码
page_size int 50 每页大小
level string - 日志级别过滤
start_time string - 开始时间
end_time string - 结束时间
keyword string - 关键词搜索
logger_name string - 记录器名称过滤
step_name string - 步骤名称过滤
sort_by string timestamp 排序字段
sort_order string desc 排序顺序

请求示例

curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052/logs/query \
  -H "Content-Type: application/json" \
  -d '{
    "page": 1,
    "page_size": 20,
    "level": "ERROR",
    "keyword": "SQL验证",
    "start_time": "2025-06-27T14:30:00",
    "end_time": "2025-06-27T14:45:00"
  }'

响应示例

{
    "success": true,
    "code": 200,
    "message": "高级日志查询成功",
    "data": {
        "logs": [
            {
                "timestamp": "2025-06-27 14:31:25",
                "level": "ERROR",
                "logger": "SQLValidator",
                "step": "sql_validation",
                "message": "SQL验证失败,尝试LLM修复",
                "line_number": 125
            }
        ],
        "pagination": {
            "page": 1,
            "page_size": 20,
            "total": 1,
            "total_pages": 1,
            "has_next": false,
            "has_prev": false
        },
        "log_file_info": {
            "exists": true,
            "file_path": "/path/to/data_pipeline/training_data/task_20250627_143052/data_pipeline.log",
            "file_size": 51200,
            "last_modified": "2025-06-27T14:45:30"
        },
        "query_time": "0.023s"
    }
}

6. 数据库工具API

6.1 获取数据库表列表

端点: POST /api/v0/database/tables

请求参数

参数 必需 类型 说明
db_connection string 数据库连接字符串,不传则使用默认配置
schema string Schema名称,支持多个用逗号分隔
table_name_pattern string 表名模式匹配,支持通配符

请求示例

curl -X POST http://localhost:8084/api/v0/database/tables \
  -H "Content-Type: application/json" \
  -d '{
    "db_connection": "postgresql://postgres:postgres@192.168.67.1:5432/highway_db",
    "schema": "public,ods",
    "table_name_pattern": "bss_*"
  }'

响应示例

{
    "success": true,
    "code": 200,
    "message": "获取表列表成功",
    "data": {
        "tables": [
            "public.bss_business_day_data",
            "public.bss_car_day_count",
            "ods.bss_company"
        ],
        "total": 3,
        "schemas": ["public", "ods"],
        "table_name_pattern": "bss_*",
        "db_connection_info": {
            "database": "highway_db"
        }
    }
}

6.2 获取表DDL/MD文档

端点: POST /api/v0/database/table/ddl

请求参数

参数 必需 类型 说明
table string 表名(可包含schema)
db_connection string 数据库连接字符串
business_context string 业务上下文描述
type enum 输出类型:ddl/md/both

请求示例

curl -X POST http://localhost:8084/api/v0/database/table/ddl \
  -H "Content-Type: application/json" \
  -d '{
    "table": "public.bss_company",
    "business_context": "高速公路服务区管理系统",
    "type": "both"
  }'

响应示例

{
    "success": true,
    "code": 200,
    "message": "获取表DDL成功",
    "data": {
        "ddl": "-- 中文名: 存储高速公路管理公司信息\ncreate table public.bss_company (\n  id varchar(32) not null,\n  company_name varchar(255)\n);",
        "md": "## bss_company(存储高速公路管理公司信息)\n\n字段列表:\n- id (varchar(32)) - 主键ID\n- company_name (varchar(255)) - 公司名称",
        "table_info": {
            "table_name": "bss_company",
            "schema_name": "public",
            "full_name": "public.bss_company",
            "comment": "存储高速公路管理公司信息",
            "field_count": 2,
            "row_count": 15
        },
        "fields": [
            {
                "column_name": "id",
                "data_type": "varchar",
                "max_length": 32,
                "is_nullable": false,
                "is_primary_key": true,
                "comment": "主键ID"
            }
        ],
        "generation_info": {
            "business_context": "高速公路服务区管理系统",
            "output_type": "both",
            "has_llm_comments": true,
            "database": "highway_db"
        }
    }
}

7. 日志配置

7.1 API日志架构

API模式下的日志系统采用双日志架构:

  1. 任务目录日志: 每个任务在独立目录下生成详细日志
  2. API系统日志: 记录API调用和系统事件

7.2 日志文件位置

任务日志

data_pipeline/training_data/task_20250627_143052/
└── data_pipeline.log                    # 任务详细执行日志

API系统日志

logs/
├── app.log                              # Flask应用日志
├── agent.log                            # Agent相关日志
├── vanna.log                            # Vanna系统日志
└── data_pipeline.log                    # data_pipeline模块日志(已弃用)

7.3 日志级别配置

7.3.1 环境变量配置

# 设置日志级别
export DATA_PIPELINE_LOG_LEVEL=DEBUG
export FLASK_LOG_LEVEL=INFO

# 设置日志目录
export DATA_PIPELINE_LOG_DIR=./logs/data_pipeline/

7.3.2 任务日志配置

任务执行时自动配置:

# 在SimpleWorkflowExecutor中自动设置
def _setup_task_directory_logger(self):
    task_dir = self.file_manager.get_task_directory(self.task_id)
    log_file = task_dir / "data_pipeline.log"
    
    # 创建任务专用logger
    self.task_dir_logger = logging.getLogger(f"TaskDir_{self.task_id}")
    # ... 配置详细格式和处理器

7.4 日志格式

任务日志格式

2025-07-01 14:30:52 [INFO] TaskDir_task_20250627_143052: 任务目录日志初始化完成
2025-07-01 14:30:53 [INFO] SchemaWorkflowOrchestrator: 🚀 开始执行Schema工作流编排
2025-07-01 14:30:54 [INFO] DDLMDGenerator: 开始处理表: bss_business_day_data

API日志格式

2025-07-01 14:30:50 [INFO] unified_api: POST /api/v0/data_pipeline/tasks - 任务创建成功
2025-07-01 14:30:51 [INFO] SimpleWorkflowManager: 创建任务: task_20250627_143052
2025-07-01 14:30:52 [INFO] unified_api: POST /api/v0/data_pipeline/tasks/task_20250627_143052/execute - 任务执行启动

7.5 日志监控和查询

通过API查询日志

# 获取最新日志
curl "http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052/logs?limit=100"

# 查询错误日志
curl "http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052/logs?level=ERROR"

# 高级日志查询
curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052/logs/query \
  -H "Content-Type: application/json" \
  -d '{
    "page": 1,
    "page_size": 50,
    "level": "ERROR",
    "keyword": "SQL验证"
  }'

直接查看日志文件

# 查看最新任务日志
tail -f data_pipeline/training_data/task_*/data_pipeline.log

# 搜索错误日志
grep -i "error" data_pipeline/training_data/task_*/data_pipeline.log

8. 使用示例

8.1 完整工作流示例

步骤1: 创建任务

curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks \
  -H "Content-Type: application/json" \
  -d '{
    "table_list_file": "tables.txt",
    "business_context": "高速公路服务区管理系统",
    "db_name": "highway_db",
    "task_name": "高速公路数据处理",
    "backup_vector_tables": false,
    "truncate_vector_tables": false
  }'

步骤2: 执行完整工作流

curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052/execute \
  -H "Content-Type: application/json" \
  -d '{
    "execution_mode": "complete",
    "backup_vector_tables": false,
    "truncate_vector_tables": false
  }'

步骤3: 轮询任务状态

#!/bin/bash
TASK_ID="task_20250627_143052"

while true; do
    STATUS=$(curl -s "http://localhost:8084/api/v0/data_pipeline/tasks/$TASK_ID" | jq -r '.data.status')
    echo "任务状态: $STATUS"
    
    if [[ "$STATUS" == "completed" || "$STATUS" == "failed" ]]; then
        break
    fi
    
    sleep 10
done

步骤4: 获取结果文件

# 获取文件列表
curl "http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052/files"

# 下载主要输出文件
curl -O "http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052/files/qs_highway_db_20250627_143052_pair.json"

8.2 Vector表管理示例

带Vector表管理的完整工作流

# 创建任务并启用vector表清空
curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks \
  -H "Content-Type: application/json" \
  -d '{
    "table_list_file": "tables.txt",
    "business_context": "高速公路服务区管理系统",
    "db_name": "highway_db",
    "task_name": "高速公路数据处理_清空vector",
    "truncate_vector_tables": true
  }'

# 执行工作流(truncate_vector_tables会自动启用backup_vector_tables)
curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052/execute \
  -H "Content-Type: application/json" \
  -d '{
    "execution_mode": "complete",
    "truncate_vector_tables": true
  }'

# 检查vector表管理结果
curl "http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052"

# 下载备份文件(如果有)
curl "http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052/files" | \
  jq -r '.data.files[] | select(.file_name | contains("langchain_")) | .download_url'

仅备份Vector表

curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052/execute \
  -H "Content-Type: application/json" \
  -d '{
    "execution_mode": "complete",
    "backup_vector_tables": true,
    "truncate_vector_tables": false
  }'

8.3 分步执行示例

步骤1: 创建任务(无表清单)

curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks \
  -H "Content-Type: application/json" \
  -d '{
    "business_context": "测试系统",
    "db_name": "test_db",
    "task_name": "测试任务"
  }'

步骤2: 上传表清单

curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052/upload-table-list \
  -F "file=@tables.txt"

步骤3: 分步执行

# 执行DDL生成
curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052/execute \
  -H "Content-Type: application/json" \
  -d '{
    "execution_mode": "step",
    "step_name": "ddl_generation"
  }'

# 等待完成后执行Q&A生成
curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052/execute \
  -H "Content-Type: application/json" \
  -d '{
    "execution_mode": "step",
    "step_name": "qa_generation"
  }'

8.4 数据库工具使用示例

获取数据库表列表

curl -X POST http://localhost:8084/api/v0/database/tables \
  -H "Content-Type: application/json" \
  -d '{
    "schema": "public",
    "table_name_pattern": "bss_*"
  }'

获取单表DDL

curl -X POST http://localhost:8084/api/v0/database/table/ddl \
  -H "Content-Type: application/json" \
  -d '{
    "table": "public.bss_company",
    "business_context": "高速公路管理系统",
    "type": "ddl"
  }'

8.5 JavaScript客户端示例

class DataPipelineAPI {
    constructor(baseUrl = 'http://localhost:8084') {
        this.baseUrl = baseUrl;
    }

    async createTask(params) {
        const response = await fetch(`${this.baseUrl}/api/v0/data_pipeline/tasks`, {
            method: 'POST',
            headers: {
                'Content-Type': 'application/json'
            },
            body: JSON.stringify(params)
        });
        return await response.json();
    }

    async executeTask(taskId, executionMode = 'complete', stepName = null) {
        const params = { execution_mode: executionMode };
        if (stepName) params.step_name = stepName;

        const response = await fetch(`${this.baseUrl}/api/v0/data_pipeline/tasks/${taskId}/execute`, {
            method: 'POST',
            headers: {
                'Content-Type': 'application/json'
            },
            body: JSON.stringify(params)
        });
        return await response.json();
    }

    async getTaskStatus(taskId) {
        const response = await fetch(`${this.baseUrl}/api/v0/data_pipeline/tasks/${taskId}`);
        return await response.json();
    }

    async pollTaskStatus(taskId, interval = 5000) {
        return new Promise((resolve, reject) => {
            const poll = async () => {
                try {
                    const result = await this.getTaskStatus(taskId);
                    const status = result.data?.status;
                    
                    console.log(`任务状态: ${status}`);
                    
                    if (status === 'completed') {
                        resolve(result);
                    } else if (status === 'failed') {
                        reject(new Error('任务执行失败'));
                    } else {
                        setTimeout(poll, interval);
                    }
                } catch (error) {
                    reject(error);
                }
            };
            poll();
        });
    }

    async getTaskFiles(taskId) {
        const response = await fetch(`${this.baseUrl}/api/v0/data_pipeline/tasks/${taskId}/files`);
        return await response.json();
    }

    async getTaskLogs(taskId, limit = 100, level = null) {
        const params = new URLSearchParams({ limit });
        if (level) params.append('level', level);
        
        const response = await fetch(`${this.baseUrl}/api/v0/data_pipeline/tasks/${taskId}/logs?${params}`);
        return await response.json();
    }
}

// 使用示例
async function runDataPipelineWorkflow() {
    const api = new DataPipelineAPI();
    
    try {
        // 1. 创建任务
        const createResult = await api.createTask({
            table_list_file: 'tables.txt',
            business_context: '高速公路服务区管理系统',
            db_name: 'highway_db',
            task_name: '高速公路数据处理',
            backup_vector_tables: false,
            truncate_vector_tables: false
        });
        
        const taskId = createResult.data.task_id;
        console.log(`任务创建成功: ${taskId}`);
        
        // 2. 执行任务
        await api.executeTask(taskId, 'complete');
        console.log('任务执行已启动');
        
        // 3. 轮询状态
        const finalResult = await api.pollTaskStatus(taskId);
        console.log('任务执行完成:', finalResult);
        
        // 4. 获取结果文件
        const files = await api.getTaskFiles(taskId);
        console.log('生成的文件:', files.data.files);
        
    } catch (error) {
        console.error('工作流执行失败:', error);
    }
}

8.6 Python客户端示例

import requests
import time
import json

class DataPipelineAPI:
    def __init__(self, base_url='http://localhost:8084'):
        self.base_url = base_url
        self.session = requests.Session()
        self.session.headers.update({
            'Content-Type': 'application/json'
        })

    def create_task(self, **params):
        """创建任务"""
        response = self.session.post(
            f'{self.base_url}/api/v0/data_pipeline/tasks',
            json=params
        )
        response.raise_for_status()
        return response.json()

    def execute_task(self, task_id, execution_mode='complete', step_name=None):
        """执行任务"""
        params = {'execution_mode': execution_mode}
        if step_name:
            params['step_name'] = step_name
        
        response = self.session.post(
            f'{self.base_url}/api/v0/data_pipeline/tasks/{task_id}/execute',
            json=params
        )
        response.raise_for_status()
        return response.json()

    def get_task_status(self, task_id):
        """获取任务状态"""
        response = self.session.get(
            f'{self.base_url}/api/v0/data_pipeline/tasks/{task_id}'
        )
        response.raise_for_status()
        return response.json()

    def poll_task_status(self, task_id, interval=5):
        """轮询任务状态直到完成"""
        while True:
            result = self.get_task_status(task_id)
            status = result['data']['status']
            
            print(f"任务状态: {status}")
            
            if status == 'completed':
                return result
            elif status == 'failed':
                raise Exception(f"任务执行失败: {result['data'].get('error_message', '未知错误')}")
            
            time.sleep(interval)

    def get_task_files(self, task_id):
        """获取任务文件列表"""
        response = self.session.get(
            f'{self.base_url}/api/v0/data_pipeline/tasks/{task_id}/files'
        )
        response.raise_for_status()
        return response.json()

    def download_file(self, task_id, filename, save_path=None):
        """下载文件"""
        response = self.session.get(
            f'{self.base_url}/api/v0/data_pipeline/tasks/{task_id}/files/{filename}'
        )
        response.raise_for_status()
        
        if save_path is None:
            save_path = filename
        
        with open(save_path, 'wb') as f:
            f.write(response.content)
        
        return save_path

    def get_task_logs(self, task_id, limit=100, level=None):
        """获取任务日志"""
        params = {'limit': limit}
        if level:
            params['level'] = level
        
        response = self.session.get(
            f'{self.base_url}/api/v0/data_pipeline/tasks/{task_id}/logs',
            params=params
        )
        response.raise_for_status()
        return response.json()

# 使用示例
def run_data_pipeline_workflow():
    api = DataPipelineAPI()
    
    try:
        # 1. 创建任务
        create_result = api.create_task(
            table_list_file='tables.txt',
            business_context='高速公路服务区管理系统',
            db_name='highway_db',
            task_name='高速公路数据处理',
            backup_vector_tables=False,
            truncate_vector_tables=False
        )
        
        task_id = create_result['data']['task_id']
        print(f"任务创建成功: {task_id}")
        
        # 2. 执行任务
        api.execute_task(task_id, 'complete')
        print("任务执行已启动")
        
        # 3. 轮询状态
        final_result = api.poll_task_status(task_id)
        print("任务执行完成:", json.dumps(final_result, indent=2, ensure_ascii=False))
        
        # 4. 获取结果文件
        files = api.get_task_files(task_id)
        print("生成的文件:")
        for file_info in files['data']['files']:
            print(f"  - {file_info['file_name']} ({file_info['file_size_human']})")
        
        # 5. 下载主要输出文件
        primary_files = [f for f in files['data']['files'] if f.get('is_primary')]
        if primary_files:
            filename = primary_files[0]['file_name']
            save_path = api.download_file(task_id, filename)
            print(f"主要输出文件已下载: {save_path}")
        
    except Exception as error:
        print(f"工作流执行失败: {error}")

if __name__ == '__main__':
    run_data_pipeline_workflow()

9. 错误处理

9.1 常见错误码

错误码 HTTP状态 说明 解决方案
TASK_NOT_FOUND 404 任务不存在 检查task_id是否正确
TASK_ALREADY_RUNNING 409 任务已在执行 等待当前任务完成
INVALID_EXECUTION_MODE 400 无效的执行模式 使用completestep
MISSING_STEP_NAME 400 缺少步骤名称 步骤模式需要指定step_name
INVALID_STEP_NAME 400 无效的步骤名称 使用有效的步骤名称
FILE_NOT_FOUND 404 文件不存在 检查文件名是否正确
DATABASE_CONNECTION_ERROR 500 数据库连接失败 检查数据库配置
VECTOR_BACKUP_FAILED 500 Vector表备份失败 检查数据库连接和磁盘空间
VECTOR_TRUNCATE_FAILED 500 Vector表清空失败 检查数据库权限

9.2 错误响应格式

{
    "success": false,
    "code": 400,
    "message": "请求参数错误",
    "error": {
        "type": "ValidationError",
        "details": "无效的执行模式,必须是 'complete' 或 'step'",
        "invalid_params": ["execution_mode"],
        "timestamp": "2025-06-27T14:30:52"
    }
}

9.3 错误处理最佳实践

客户端错误处理

async function apiRequest(url, options = {}) {
    try {
        const response = await fetch(url, {
            headers: {
                'Content-Type': 'application/json',
                ...options.headers
            },
            ...options
        });
        
        const data = await response.json();
        
        if (!data.success) {
            throw new Error(`API错误: ${data.message} (${data.code})`);
        }
        
        return data;
        
    } catch (error) {
        if (error instanceof TypeError && error.message.includes('fetch')) {
            throw new Error('网络连接失败,请检查服务器是否正常运行');
        }
        throw error;
    }
}

// 使用示例
try {
    const result = await apiRequest('/api/v0/data_pipeline/tasks/invalid_id');
} catch (error) {
    if (error.message.includes('404')) {
        console.log('任务不存在,请检查任务ID');
    } else {
        console.error('API调用失败:', error.message);
    }
}

重试机制

import time
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

def create_session_with_retry():
    session = requests.Session()
    
    # 配置重试策略
    retry_strategy = Retry(
        total=3,
        status_forcelist=[429, 500, 502, 503, 504],
        method_whitelist=["HEAD", "GET", "OPTIONS"],
        backoff_factor=1
    )
    
    adapter = HTTPAdapter(max_retries=retry_strategy)
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    
    return session

# 使用示例
session = create_session_with_retry()
response = session.get('http://localhost:8084/api/v0/data_pipeline/tasks/task_id')

9.4 日志错误排查

查看任务错误日志

# 获取任务错误日志
curl "http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052/logs?level=ERROR"

# 搜索特定错误
curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks/task_20250627_143052/logs/query \
  -H "Content-Type: application/json" \
  -d '{
    "keyword": "连接失败",
    "level": "ERROR"
  }'

分析错误模式

def analyze_error_logs(api, task_id):
    """分析任务错误日志"""
    logs_result = api.get_task_logs(task_id, level='ERROR')
    error_logs = logs_result['data']['logs']
    
    # 统计错误类型
    error_types = {}
    for log in error_logs:
        message = log['message']
        if 'SQL验证失败' in message:
            error_types['SQL_VALIDATION'] = error_types.get('SQL_VALIDATION', 0) + 1
        elif '数据库连接' in message:
            error_types['DATABASE_CONNECTION'] = error_types.get('DATABASE_CONNECTION', 0) + 1
        elif 'LLM调用' in message:
            error_types['LLM_CALL'] = error_types.get('LLM_CALL', 0) + 1
        else:
            error_types['OTHER'] = error_types.get('OTHER', 0) + 1
    
    return error_types

# 使用示例
error_analysis = analyze_error_logs(api, 'task_20250627_143052')
print("错误类型统计:", error_analysis)

10. 最佳实践

10.1 任务管理最佳实践

1. 任务命名规范

# 推荐的任务命名格式
task_name_patterns = {
    "环境_数据库_用途_时间": "prod_highway_training_20250627",
    "项目_模块_版本": "crm_customer_v1.2",
    "业务_场景_批次": "ecommerce_recommendation_batch01"
}

2. 参数配置建议

{
    "推荐配置": {
        "enable_sql_validation": true,
        "enable_llm_repair": true,
        "modify_original_file": true,
        "enable_training_data_load": true,
        "backup_vector_tables": false,
        "truncate_vector_tables": false
    },
    "调试配置": {
        "enable_sql_validation": false,
        "enable_llm_repair": false,
        "modify_original_file": false,
        "enable_training_data_load": false,
        "backup_vector_tables": false,
        "truncate_vector_tables": false
    },
    "快速配置": {
        "enable_sql_validation": true,
        "enable_llm_repair": false,
        "modify_original_file": false,
        "enable_training_data_load": true,
        "backup_vector_tables": false,
        "truncate_vector_tables": false
    },
    "Vector清理配置": {
        "enable_sql_validation": true,
        "enable_llm_repair": true,
        "modify_original_file": true,
        "enable_training_data_load": true,
        "backup_vector_tables": true,
        "truncate_vector_tables": true
    }
}

3. 任务监控策略

class TaskMonitor {
    constructor(api, taskId) {
        this.api = api;
        this.taskId = taskId;
        this.callbacks = {};
    }

    on(event, callback) {
        this.callbacks[event] = callback;
    }

    async start(interval = 5000) {
        while (true) {
            try {
                const result = await this.api.getTaskStatus(this.taskId);
                const status = result.data.status;
                
                // 触发状态变化回调
                if (this.callbacks.statusChange) {
                    this.callbacks.statusChange(status, result.data);
                }
                
                // 检查是否完成
                if (status === 'completed') {
                    if (this.callbacks.completed) {
                        this.callbacks.completed(result.data);
                    }
                    break;
                } else if (status === 'failed') {
                    if (this.callbacks.failed) {
                        this.callbacks.failed(result.data);
                    }
                    break;
                }
                
                await new Promise(resolve => setTimeout(resolve, interval));
                
            } catch (error) {
                if (this.callbacks.error) {
                    this.callbacks.error(error);
                }
                break;
            }
        }
    }
}

// 使用示例
const monitor = new TaskMonitor(api, taskId);
monitor.on('statusChange', (status, data) => {
    console.log(`任务状态更新: ${status}`);
    updateProgressBar(data.step_status);
});
monitor.on('completed', (data) => {
    console.log('任务完成,开始下载结果文件');
    downloadResultFiles(data);
});
monitor.on('failed', (data) => {
    console.error('任务失败:', data.error_message);
    showErrorDialog(data.error_message);
});
monitor.start();

10.2 性能优化建议

1. 并发控制

import asyncio
import aiohttp

class AsyncDataPipelineAPI:
    def __init__(self, base_url='http://localhost:8084'):
        self.base_url = base_url
        self.session = None

    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()

    async def create_multiple_tasks(self, task_configs, max_concurrent=3):
        """并发创建多个任务"""
        semaphore = asyncio.Semaphore(max_concurrent)
        
        async def create_single_task(config):
            async with semaphore:
                async with self.session.post(
                    f'{self.base_url}/api/v0/data_pipeline/tasks',
                    json=config
                ) as response:
                    return await response.json()
        
        tasks = [create_single_task(config) for config in task_configs]
        return await asyncio.gather(*tasks)

# 使用示例
async def batch_create_tasks():
    task_configs = [
        {
            'table_list_file': f'tables_{i}.txt',
            'business_context': f'业务系统{i}',
            'db_name': f'db_{i}'
        }
        for i in range(1, 6)
    ]
    
    async with AsyncDataPipelineAPI() as api:
        results = await api.create_multiple_tasks(task_configs)
        print(f"创建了 {len(results)} 个任务")

2. 缓存策略

import functools
import time

def cache_with_ttl(ttl_seconds=300):
    """带TTL的缓存装饰器"""
    def decorator(func):
        cache = {}
        
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            key = str(args) + str(sorted(kwargs.items()))
            current_time = time.time()
            
            if key in cache:
                result, timestamp = cache[key]
                if current_time - timestamp < ttl_seconds:
                    return result
            
            result = func(*args, **kwargs)
            cache[key] = (result, current_time)
            return result
        
        return wrapper
    return decorator

class CachedDataPipelineAPI(DataPipelineAPI):
    @cache_with_ttl(60)  # 缓存1分钟
    def get_task_status(self, task_id):
        return super().get_task_status(task_id)
    
    @cache_with_ttl(300)  # 缓存5分钟
    def get_task_files(self, task_id):
        return super().get_task_files(task_id)

10.3 安全最佳实践

1. API认证

class SecureDataPipelineAPI(DataPipelineAPI):
    def __init__(self, base_url, api_key=None):
        super().__init__(base_url)
        if api_key:
            self.session.headers.update({
                'Authorization': f'Bearer {api_key}'
            })

    def set_api_key(self, api_key):
        self.session.headers.update({
            'Authorization': f'Bearer {api_key}'
        })

2. 参数验证

def validate_task_params(params):
    """验证任务参数"""
    required_fields = ['business_context', 'db_name']
    
    for field in required_fields:
        if field not in params or not params[field]:
            raise ValueError(f"缺少必需参数: {field}")
    
    # 验证数据库连接字符串格式
    if 'db_connection' in params:
        db_conn = params['db_connection']
        if not db_conn.startswith('postgresql://'):
            raise ValueError("数据库连接字符串必须以postgresql://开头")
    
    # 验证任务名称长度
    if 'task_name' in params and len(params['task_name']) > 100:
        raise ValueError("任务名称不能超过100个字符")
    
    return True

# 使用示例
try:
    validate_task_params({
        'business_context': '高速公路管理系统',
        'db_name': 'highway_db'
    })
except ValueError as e:
    print(f"参数验证失败: {e}")

10.4 监控和告警

1. 任务状态监控

import logging
from datetime import datetime, timedelta

class TaskStatusMonitor:
    def __init__(self, api, alert_callback=None):
        self.api = api
        self.alert_callback = alert_callback
        self.logger = logging.getLogger(__name__)

    async def monitor_all_tasks(self):
        """监控所有活跃任务"""
        tasks_result = self.api.get_tasks_list(status_filter='in_progress')
        active_tasks = tasks_result['data']['tasks']
        
        for task in active_tasks:
            await self.check_task_health(task)

    async def check_task_health(self, task):
        """检查单个任务健康状态"""
        task_id = task['task_id']
        started_at = datetime.fromisoformat(task['started_at'])
        current_time = datetime.now()
        
        # 检查任务是否超时(超过2小时)
        if current_time - started_at > timedelta(hours=2):
            self.logger.warning(f"任务 {task_id} 可能超时")
            if self.alert_callback:
                self.alert_callback('TASK_TIMEOUT', task)
        
        # 检查任务日志是否有错误
        logs_result = self.api.get_task_logs(task_id, level='ERROR')
        if logs_result['data']['total'] > 0:
            self.logger.error(f"任务 {task_id} 发现错误日志")
            if self.alert_callback:
                self.alert_callback('TASK_ERROR', task)

def alert_handler(alert_type, task):
    """告警处理函数"""
    if alert_type == 'TASK_TIMEOUT':
        print(f"⚠️ 任务超时告警: {task['task_id']}")
        # 可以集成邮件、钉钉、微信等通知方式
    elif alert_type == 'TASK_ERROR':
        print(f"❌ 任务错误告警: {task['task_id']}")

# 使用示例
monitor = TaskStatusMonitor(api, alert_handler)
asyncio.run(monitor.monitor_all_tasks())

2. 性能指标收集

class PerformanceMetrics:
    def __init__(self):
        self.metrics = {}

    def record_api_call(self, endpoint, duration, status_code):
        """记录API调用指标"""
        key = f"{endpoint}_{status_code}"
        if key not in self.metrics:
            self.metrics[key] = {
                'count': 0,
                'total_duration': 0,
                'avg_duration': 0,
                'max_duration': 0,
                'min_duration': float('inf')
            }
        
        metric = self.metrics[key]
        metric['count'] += 1
        metric['total_duration'] += duration
        metric['avg_duration'] = metric['total_duration'] / metric['count']
        metric['max_duration'] = max(metric['max_duration'], duration)
        metric['min_duration'] = min(metric['min_duration'], duration)

    def get_report(self):
        """获取性能报告"""
        return {
            'api_metrics': self.metrics,
            'summary': {
                'total_calls': sum(m['count'] for m in self.metrics.values()),
                'avg_response_time': sum(m['avg_duration'] for m in self.metrics.values()) / len(self.metrics) if self.metrics else 0
            }
        }

# 集成到API客户端
class InstrumentedDataPipelineAPI(DataPipelineAPI):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.metrics = PerformanceMetrics()

    def _make_request(self, method, url, **kwargs):
        """带性能监控的请求方法"""
        start_time = time.time()
        try:
            response = getattr(self.session, method)(url, **kwargs)
            duration = time.time() - start_time
            self.metrics.record_api_call(url, duration, response.status_code)
            return response
        except Exception as e:
            duration = time.time() - start_time
            self.metrics.record_api_call(url, duration, 0)
            raise

总结

data_pipeline API系统提供了完整的RESTful接口支持,能够满足各种数据处理需求。通过合理使用任务管理、文件管理、日志管理和数据库工具API,可以构建高效的数据处理工作流。建议在生产环境中实施适当的监控、告警和性能优化措施,确保系统的稳定性和可靠性。