Data Pipeline 使用说明.md 21 KB

Data Pipeline 使用说明

目录

  1. 功能概述
  2. 安装与配置
  3. 一键执行完整工作流(推荐)
  4. 生成DDL和MD文档
  5. 生成Question-SQL训练数据
  6. SQL验证和修复
  7. 训练数据管理
  8. 配置详解
  9. 常见问题
  10. 最佳实践

1. 功能概述

Data Pipeline 是一个完整的数据库逆向工程和训练数据生成系统,提供以下核心功能:

1.1 DDL和MD文档生成

  • 自动连接PostgreSQL数据库
  • 批量处理表清单
  • 使用LLM生成中文注释
  • 自动检测枚举字段
  • 生成标准化的DDL和MD文档

1.2 Question-SQL训练数据生成

  • 验证DDL和MD文件完整性
  • 分析表结构提取业务主题
  • 为每个主题生成高质量的Question-SQL对
  • 支持中断恢复和并行处理

1.3 SQL验证和修复

  • 自动验证生成的SQL语句
  • 使用EXPLAIN语法检查SQL有效性
  • LLM自动修复无效SQL语句
  • 详细的验证报告和统计信息

1.4 训练数据管理

  • 自动识别多种训练数据格式
  • 统一的训练数据加载和处理
  • 支持DDL、文档、Q&A对等多种数据类型

1.5 一键工作流编排器(推荐)

  • 端到端自动化执行完整流程
  • DDL/MD生成 → Question-SQL生成 → SQL验证修复
  • 详细的执行报告和性能指标
  • 支持灵活配置和错误恢复

2. 安装与配置

2.1 依赖安装

pip install asyncpg asyncio

2.2 基本配置

Data Pipeline 使用项目现有的 LLM 配置,无需额外配置数据库连接。

2.3 目录结构

data_pipeline/
├── ddl_generation/          # DDL/MD文档生成工具
│   ├── ddl_md_generator.py
│   └── training_data_agent.py
├── qa_generation/           # Q&A生成工具
│   ├── qs_agent.py
│   └── qs_generator.py
├── validators/              # SQL验证工具  
│   ├── sql_validate_cli.py
│   ├── sql_validation_agent.py
│   └── sql_validator.py
├── trainer/                 # 训练数据管道
│   ├── run_training.py
│   └── vanna_trainer.py
├── training_data/           # 训练数据存储目录
├── tools/                   # 核心工具
├── utils/                   # 工具函数
├── config.py               # 配置文件
└── schema_workflow.py  # 工作流编排器

3. 一键执行完整工作流(推荐)

3.1 工作流编排器概述

SchemaWorkflowOrchestrator 是 Data Pipeline 的核心组件,提供端到端的自动化处理流程:

  1. DDL和MD文档生成 - 连接数据库,生成表结构文档
  2. Question-SQL对生成 - 基于文档生成训练数据
  3. SQL验证和修复 - 验证SQL有效性并自动修复错误
  4. 训练数据加载 - 将生成的数据加载到向量数据库中

3.2 命令行使用

基本使用(完整工作流)

python -m data_pipeline.schema_workflow \
  --db-connection "postgresql://user:pass@localhost:5432/highway_db" \
  --table-list tables.txt \
  --business-context "高速公路服务区管理系统" \
  --output-dir ./data_pipeline/training_data/

跳过SQL验证

python -m data_pipeline.schema_workflow \
  --db-connection "postgresql://user:pass@localhost:5432/ecommerce_db" \
  --table-list tables.txt \
  --business-context "电商系统" \
  --skip-validation

禁用LLM修复

python -m data_pipeline.schema_workflow \
  --db-connection "postgresql://user:pass@localhost:5432/management_db" \
  --table-list tables.txt \
  --business-context "管理系统" \
  --disable-llm-repair

不修改原始文件(仅生成报告)

python -m data_pipeline.schema_workflow \
  --db-connection "postgresql://user:pass@localhost:5432/business_db" \
  --table-list tables.txt \
  --business-context "业务系统" \
  --no-modify-file

3.3 编程方式使用

import asyncio
from data_pipeline.schema_workflow import SchemaWorkflowOrchestrator

async def run_complete_workflow():
    # 创建工作流编排器
    orchestrator = SchemaWorkflowOrchestrator(
        db_connection="postgresql://user:pass@localhost:5432/highway_db",
        table_list_file="tables.txt",
        business_context="高速公路服务区管理系统",
        output_dir="./data_pipeline/training_data/",
        enable_sql_validation=True,      # 启用SQL验证
        enable_llm_repair=True,          # 启用LLM修复
        modify_original_file=True        # 修改原始JSON文件
    )
    
    # 执行完整工作流程
    report = await orchestrator.execute_complete_workflow()
    
    # 处理结果
    if report["success"]:
        print(f"✅ 工作流程执行成功!")
        print(f"📄 最终输出文件: {report['final_outputs']['primary_output_file']}")
        print(f"❓ 最终问题数量: {report['final_outputs']['final_question_count']}")
        print(f"⏱️  总耗时: {report['performance_metrics']['total_duration']} 秒")
    else:
        print(f"❌ 工作流程执行失败: {report['error']['message']}")
        print(f"💥 失败步骤: {report['error']['failed_step']}")

# 运行工作流程
asyncio.run(run_complete_workflow())

3.4 工作流参数说明

参数 说明 默认值
--db-connection 数据库连接字符串(包含数据库名) 必需
--table-list 表清单文件路径 必需
--business-context 业务上下文描述 必需
--output-dir 输出目录 ./data_pipeline/training_data/
--skip-validation 跳过SQL验证步骤 False(默认执行SQL验证)
--disable-llm-repair 禁用LLM修复功能 False(默认启用LLM修复)
--no-modify-file 不修改原始JSON文件 False(默认修改原文件)
--skip-training-load 跳过训练数据加载步骤 False(默认执行训练数据加载)
--verbose 启用详细日志 False
--log-file 日志文件路径

3.5 工作流执行报告

工作流编排器会生成详细的执行报告,包括:

{
    "success": True,
    "workflow_summary": {
        "total_duration": 285.34,
        "completed_steps": ["ddl_md_generation", "question_sql_generation", "sql_validation", "training_data_load"],
        "total_steps": 4
    },
    "processing_results": {
        "ddl_md_generation": {
            "total_tables": 8,
            "processed_successfully": 8,
            "duration": 89.23
        },
        "question_sql_generation": {
            "total_questions": 50,
            "total_themes": 5,
            "duration": 123.45
        },
        "sql_validation": {
            "success_rate": 0.94,
            "valid_sql_count": 47,
            "invalid_sql_count": 3,
            "duration": 32.99
        },
        "training_data_load": {
            "total_records": 195,
            "data_type_counts": {
                "ddl": 8,
                "documentation": 8,
                "sql": 47
            },
            "duration": 39.67
        }
    },
    "final_outputs": {
        "primary_output_file": "./data_pipeline/training_data/qs_highway_db_20240123_143052_pair.json",
        "final_question_count": 47
    },
    "performance_metrics": {
        "step1_duration": 89.23,
        "step2_duration": 123.45,
        "step3_duration": 32.99,
        "step4_duration": 39.67,
        "total_duration": 285.34
    }
}

4. 生成DDL和MD文档(分步执行)

4.1 命令格式

python -m data_pipeline.ddl_generation.ddl_md_generator \
  --db-connection <数据库连接字符串> \
  --table-list <表清单文件> \
  --business-context <业务上下文> \
  [可选参数]

4.2 必需参数说明

参数 说明 示例
--db-connection PostgreSQL数据库连接字符串 postgresql://user:pass@localhost:5432/dbname
--table-list 表清单文件路径 ./tables.txt
--business-context 业务上下文描述 "高速公路服务区管理系统"

4.3 可选参数说明

参数 说明 默认值
--output-dir 输出目录路径 ./data_pipeline/training_data/
--pipeline 处理链类型 full
--max-concurrent 最大并发表数量 1
--verbose 启用详细日志 False
--log-file 日志文件路径
--no-filter-system-tables 禁用系统表过滤 False
--check-permissions-only 仅检查数据库权限 False

4.4 使用示例

基本使用

python -m data_pipeline.ddl_generation.ddl_md_generator \
  --db-connection "postgresql://postgres:postgres@localhost:5432/highway_db" \
  --table-list ./tables.txt \
  --business-context "高速公路服务区管理系统"

指定输出目录和启用详细日志

python -m data_pipeline.ddl_generation.ddl_md_generator \
  --db-connection "postgresql://postgres:postgres@localhost:5432/highway_db" \
  --table-list ./tables.txt \
  --business-context "高速公路服务区管理系统" \
  --output-dir ./data_pipeline/training_data/ \
  --verbose

权限检查

python -m data_pipeline.ddl_generation.ddl_md_generator \
  --db-connection "postgresql://postgres:postgres@localhost:5432/highway_db" \
  --check-permissions-only

5. 生成Question-SQL训练数据(分步执行)

5.1 前置条件

必须先执行DDL和MD文档生成,确保输出目录中有完整的DDL和MD文件。

5.2 命令格式

python -m data_pipeline.qa_generation.qs_generator \
  --output-dir <输出目录> \
  --table-list <表清单文件> \
  --business-context <业务上下文> \
  [可选参数]

5.3 必需参数说明

参数 说明 示例
--output-dir 包含DDL和MD文件的目录 ./data_pipeline/training_data/
--table-list 表清单文件路径(用于验证) ./tables.txt
--business-context 业务上下文描述 "高速公路服务区管理系统"

5.4 使用示例

基本使用

python -m data_pipeline.qa_generation.qs_generator \
  --output-dir ./data_pipeline/training_data/ \
  --table-list ./tables.txt \
  --business-context "高速公路服务区管理系统" \
   highway_db

启用详细日志

python -m data_pipeline.qa_generation.qs_generator \
  --output-dir ./data_pipeline/training_data/ \
  --table-list ./tables.txt \
  --business-context "高速公路服务区管理系统" \
   highway_db \
  --verbose

6. SQL验证和修复

6.1 命令格式

生成Question-SQL对后,可以使用SQL验证功能。注意:命令行使用时,默认启用LLM修复和文件修改功能

python -m data_pipeline.validators.sql_validate_cli \
  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
  --input-file ./qs_highway_db_20240123_143052_pair.json \
  --output-dir ./validation_reports

6.2 SQL验证参数说明

参数 说明 默认值
--db-connection 数据库连接字符串 必需
--input-file Question-SQL文件路径 必需
--output-dir 验证报告输出目录 输入文件同目录
--disable-llm-repair 禁用LLM修复功能 False(默认启用修复)
--no-modify-file 不修改原始JSON文件 False(默认修改原文件)
--max-concurrent 最大并发验证数 5
--batch-size 批处理大小 10
--timeout 单个验证超时时间(秒) 30
--verbose 启用详细日志 False
--dry-run 仅解析文件不执行验证 False
--save-json 保存详细JSON报告 False

6.3 SQL验证使用示例

# 基本验证(默认:启用LLM修复和文件修改)
python -m data_pipeline.validators.sql_validate_cli \
  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
  --input-file ./data.json

# 仅生成报告,不修改文件
python -m data_pipeline.validators.sql_validate_cli \
  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
  --input-file ./data.json \
  --no-modify-file

# 启用文件修改,但禁用LLM修复(仅删除无效SQL)
python -m data_pipeline.validators.sql_validate_cli \
  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
  --input-file ./data.json \
  --disable-llm-repair

# 性能调优参数
python -m data_pipeline.validators.sql_validate_cli \
  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
  --input-file ./data.json \
  --max-concurrent 10 \
  --batch-size 20 \
  --timeout 60 \
  --verbose

7. 训练数据管理

7.1 训练数据加载

# 使用训练数据管道
python -m data_pipeline.trainer.run_training \
  --data_path ./data_pipeline/training_data/

7.2 支持的文件格式

Data Pipeline 自动识别以下训练数据格式:

  • .ddl 文件 → train_ddl_statements()
  • .md/.markdowntrain_documentation_blocks()
  • _pair.json/_pairs.jsontrain_json_question_sql_pairs()
  • _pair.sql/_pairs.sqltrain_formatted_question_sql_pairs()
  • .sql (其他) → train_sql_examples()

7.3 训练数据目录结构

data_pipeline/training_data/
├── bss_service_area.ddl              # DDL文件
├── bss_service_area_detail.md        # MD文档
├── bss_company.ddl
├── bss_company_detail.md
├── qs_highway_db_20240123_143052_pair.json  # Q&A训练数据
├── filename_mapping.txt              # 文件名映射
└── logs/                            # 日志目录
    └── data_pipeline_20240123.log

8. 配置详解

8.1 主要配置项

配置文件位于 data_pipeline/config.py

# DDL/MD生成相关配置
"output_directory": "./data_pipeline/training_data/",   # 输出目录
"create_subdirectories": False,                        # 不创建子目录
"max_concurrent_tables": 1,                            # 最大并发数(避免LLM并发问题)
"sample_data_limit": 20,                              # 数据采样量
"large_table_threshold": 1000000,                     # 大表阈值(100万行)
"filter_system_tables": True,                          # 过滤系统表
"continue_on_error": True,                             # 错误后继续

# Question-SQL生成配置
"qs_generation": {
    "max_tables": 20,                                 # 最大表数量限制
    "theme_count": 5,                                 # 主题数量
    "questions_per_theme": 10,                        # 每主题问题数
    "max_concurrent_themes": 1,                       # 并行主题数(避免LLM并发问题)
    "continue_on_theme_error": True,                  # 主题失败继续
    "save_intermediate": True,                        # 保存中间结果
}

# SQL验证配置
"sql_validation": {
    "max_concurrent_validations": 5,                  # 并发验证数
    "validation_timeout": 30,                         # 单个验证超时(秒)
    "batch_size": 10,                                 # 批处理大小
    "enable_sql_repair": False,                       # SQL修复功能(命令行覆盖为True)
    "modify_original_file": False,                    # 文件修改功能(命令行覆盖为True)
    "readonly_mode": True,                            # 启用只读模式
}

8.2 修改配置

可以通过编辑 data_pipeline/config.py 文件来修改默认配置。

9. 常见问题

9.1 表数量超过20个怎么办?

错误信息

表数量(25)超过限制(20)。请分批处理或调整配置中的max_tables参数。

解决方案

  1. 分批处理:将表清单分成多个文件,每个不超过20个表
  2. 修改配置:在 config.py 中增加 max_tables 限制

9.2 DDL和MD文件数量不一致

错误信息

DDL文件数量(5)与表数量(6)不一致

解决方案

  1. 检查是否有表处理失败
  2. 查看日志文件找出失败的表
  3. 重新运行DDL/MD生成

9.3 LLM调用失败

可能原因

  • 网络连接问题
  • API配额限制
  • Token超限

解决方案

  1. 检查网络连接
  2. 查看中间结果文件,从断点继续
  3. 减少表数量或分批处理

9.4 权限不足

错误信息

数据库查询权限不足

解决方案

  1. 使用 --check-permissions-only 检查权限
  2. 确保数据库用户有SELECT权限
  3. Data Pipeline支持只读数据库

9.5 工作流编排器相关问题

Q: 工作流中途失败,如何恢复? A: 工作流编排器会保留已完成步骤的输出文件,可以手动从失败步骤开始重新执行。

Q: 如何只执行部分步骤? A: 使用 --skip-validation 跳过SQL验证,或使用分步执行方式调用各个模块。

Q: 工作流执行时间过长怎么办? A: 可以通过减少表数量、调整并发参数、或分批处理来优化执行时间。

9.6 SQL验证器默认行为说明

重要:SQL验证器的命令行模式与配置文件中的默认值不同:

  • 配置文件默认enable_sql_repair=False, modify_original_file=False
  • 命令行默认:启用LLM修复和文件修改功能
  • 原因:命令行使用时通常期望完整的修复功能,而配置文件提供保守的默认值

如需禁用,请明确使用 --disable-llm-repair--no-modify-file 参数。

10. 最佳实践

10.1 推荐工作流程

方式一:一键执行(推荐)

# 完整工作流程,一个命令搞定
python -m data_pipeline.schema_workflow \
  --db-connection "postgresql://user:pass@localhost:5432/highway_db" \
  --table-list tables.txt \
  --business-context "高速公路服务区管理系统" \
  --output-dir ./data_pipeline/training_data/

方式二:分步执行(调试时使用)

  1. 第一步:生成DDL和MD文档

    python -m data_pipeline.ddl_generation.ddl_md_generator --db-connection "..." --table-list tables.txt --business-context "..." --output-dir ./data_pipeline/training_data/
    
    1. 第二步:人工检查
    2. 检查DDL文件的表结构是否正确
    3. 确认MD文档中的注释是否准确
    4. 根据需要手动调整

    5. 第三步:生成Question-SQL

      python -m data_pipeline.qa_generation.qs_generator --output-dir ./data_pipeline/training_data/ --table-list tables.txt --business-context "..."
      
  2. 第四步:验证SQL(可选)

    python -m data_pipeline.validators.sql_validate_cli --db-connection "..." --input-file ./qs_xxx.json
    
    1. 第五步:训练数据加载 bash python -m data_pipeline.trainer.run_training --data_path ./data_pipeline/training_data/

10.2 表清单组织

  • 按业务模块分组
  • 每组不超过15-20个表
  • 使用注释说明每组的用途

10.3 业务上下文优化

  • 提供准确的业务背景描述
  • 包含行业特定术语
  • 说明主要业务流程

10.4 输出文件管理

  • 定期备份生成的文件
  • 使用版本控制管理DDL文件
  • 保留中间结果用于调试
  • 统一使用 ./data_pipeline/training_data/ 目录

10.5 工作流编排器最佳实践

  • 首次使用:建议启用详细日志(--verbose)观察执行过程
  • 生产环境:使用默认参数,启用SQL验证和修复
  • 调试阶段:可以使用 --skip-validation 跳过验证步骤加快执行
  • 质量要求高:使用 --no-modify-file 仅生成报告,手动审查后再决定是否修改

10.6 数据管道集成

  • 训练数据统一管理:所有生成的数据都存储在 data_pipeline/training_data/ 目录
  • 自动化训练:可以定期运行工作流编排器更新训练数据
  • 版本控制:建议对训练数据进行版本管理
  • 监控和报告:利用详细的执行报告监控数据质量

总结

Data Pipeline 提供了完整的数据库逆向工程解决方案,从原始数据库schema到可用的训练数据,整个流程完全自动化。通过工作流编排器,用户可以一键完成所有步骤,也可以根据需要分步执行和调试。系统设计考虑了容错性、可扩展性和易用性,适合各种规模的数据处理需求。


一键执行(推荐):

# 完整的4步流程 python -m data_pipeline.schema_workflow

--db-connection "postgresql://user:pass@localhost:5432/highway_db" \
--table-list tables.txt \
--business-context "高速公路服务区管理系统" \
--output-dir ./data_pipeline/training_data/

# 如需跳过训练数据加载 python -m data_pipeline.schema_workflow

--db-connection "postgresql://user:pass@localhost:5432/test_db" \
--table-list tables.txt \
--business-context "测试系统" \
--skip-training-load

分步执行:

# 第1步:DDL/MD生成 python -m data_pipeline.ddl_generation.ddl_md_generator --db-connection "..." --table-list tables.txt --business-context "..."

# 第2步:Q&A生成 python -m data_pipeline.qa_generation.qs_generator --output-dir ./data_pipeline/training_data/ --table-list tables.txt --business-context "..."

# 第3步:SQL验证 python -m data_pipeline.validators.sql_validate_cli --db-connection "..." --input-file ./qs_xxx.json

# 第4步:训练数据加载 python -m data_pipeline.trainer.run_training --data_path ./data_pipeline/training_data/