Data Pipeline 是一个完整的数据库逆向工程和训练数据生成系统,提供以下核心功能:
pip install asyncpg asyncio
Data Pipeline 使用项目现有的 LLM 配置,无需额外配置数据库连接。
data_pipeline/
├── ddl_generation/          # DDL/MD文档生成工具
│   ├── ddl_md_generator.py
│   └── training_data_agent.py
├── qa_generation/           # Q&A生成工具
│   ├── qs_agent.py
│   └── qs_generator.py
├── validators/              # SQL验证工具  
│   ├── sql_validate_cli.py
│   ├── sql_validation_agent.py
│   └── sql_validator.py
├── trainer/                 # 训练数据管道
│   ├── run_training.py
│   └── vanna_trainer.py
├── training_data/           # 训练数据存储目录
├── tools/                   # 核心工具
├── utils/                   # 工具函数
├── config.py               # 配置文件
└── schema_workflow.py  # 工作流编排器
SchemaWorkflowOrchestrator 是 Data Pipeline 的核心组件,提供端到端的自动化处理流程:
python -m data_pipeline.schema_workflow \
  --db-connection "postgresql://user:pass@localhost:5432/highway_db" \
  --table-list tables.txt \
  --business-context "高速公路服务区管理系统" \
  --output-dir ./data_pipeline/training_data/
python -m data_pipeline.schema_workflow \
  --db-connection "postgresql://user:pass@localhost:5432/ecommerce_db" \
  --table-list tables.txt \
  --business-context "电商系统" \
  --skip-validation
python -m data_pipeline.schema_workflow \
  --db-connection "postgresql://user:pass@localhost:5432/management_db" \
  --table-list tables.txt \
  --business-context "管理系统" \
  --disable-llm-repair
python -m data_pipeline.schema_workflow \
  --db-connection "postgresql://user:pass@localhost:5432/business_db" \
  --table-list tables.txt \
  --business-context "业务系统" \
  --no-modify-file
import asyncio
from data_pipeline.schema_workflow import SchemaWorkflowOrchestrator
async def run_complete_workflow():
    # 创建工作流编排器
    orchestrator = SchemaWorkflowOrchestrator(
        db_connection="postgresql://user:pass@localhost:5432/highway_db",
        table_list_file="tables.txt",
        business_context="高速公路服务区管理系统",
        output_dir="./data_pipeline/training_data/",
        enable_sql_validation=True,      # 启用SQL验证
        enable_llm_repair=True,          # 启用LLM修复
        modify_original_file=True        # 修改原始JSON文件
    )
    
    # 执行完整工作流程
    report = await orchestrator.execute_complete_workflow()
    
    # 处理结果
    if report["success"]:
        print(f"✅ 工作流程执行成功!")
        print(f"📄 最终输出文件: {report['final_outputs']['primary_output_file']}")
        print(f"❓ 最终问题数量: {report['final_outputs']['final_question_count']}")
        print(f"⏱️  总耗时: {report['performance_metrics']['total_duration']} 秒")
    else:
        print(f"❌ 工作流程执行失败: {report['error']['message']}")
        print(f"💥 失败步骤: {report['error']['failed_step']}")
# 运行工作流程
asyncio.run(run_complete_workflow())
| 参数 | 说明 | 默认值 | 
|---|---|---|
| --db-connection | 数据库连接字符串(包含数据库名) | 必需 | 
| --table-list | 表清单文件路径 | 必需 | 
| --business-context | 业务上下文描述 | 必需 | 
| --output-dir | 输出目录 | ./data_pipeline/training_data/ | 
| --skip-validation | 跳过SQL验证步骤 | False(默认执行SQL验证) | 
| --disable-llm-repair | 禁用LLM修复功能 | False(默认启用LLM修复) | 
| --no-modify-file | 不修改原始JSON文件 | False(默认修改原文件) | 
| --skip-training-load | 跳过训练数据加载步骤 | False(默认执行训练数据加载) | 
| --verbose | 启用详细日志 | False | 
| --log-file | 日志文件路径 | 无 | 
工作流编排器会生成详细的执行报告,包括:
{
    "success": True,
    "workflow_summary": {
        "total_duration": 285.34,
        "completed_steps": ["ddl_md_generation", "question_sql_generation", "sql_validation", "training_data_load"],
        "total_steps": 4
    },
    "processing_results": {
        "ddl_md_generation": {
            "total_tables": 8,
            "processed_successfully": 8,
            "duration": 89.23
        },
        "question_sql_generation": {
            "total_questions": 50,
            "total_themes": 5,
            "duration": 123.45
        },
        "sql_validation": {
            "success_rate": 0.94,
            "valid_sql_count": 47,
            "invalid_sql_count": 3,
            "duration": 32.99
        },
        "training_data_load": {
            "total_records": 195,
            "data_type_counts": {
                "ddl": 8,
                "documentation": 8,
                "sql": 47
            },
            "duration": 39.67
        }
    },
    "final_outputs": {
        "primary_output_file": "./data_pipeline/training_data/qs_highway_db_20240123_143052_pair.json",
        "final_question_count": 47
    },
    "performance_metrics": {
        "step1_duration": 89.23,
        "step2_duration": 123.45,
        "step3_duration": 32.99,
        "step4_duration": 39.67,
        "total_duration": 285.34
    }
}
python -m data_pipeline.ddl_generation.ddl_md_generator \
  --db-connection <数据库连接字符串> \
  --table-list <表清单文件> \
  --business-context <业务上下文> \
  [可选参数]
| 参数 | 说明 | 示例 | 
|---|---|---|
| --db-connection | PostgreSQL数据库连接字符串 | postgresql://user:pass@localhost:5432/dbname | 
| --table-list | 表清单文件路径 | ./tables.txt | 
| --business-context | 业务上下文描述 | "高速公路服务区管理系统" | 
| 参数 | 说明 | 默认值 | 
|---|---|---|
| --output-dir | 输出目录路径 | ./data_pipeline/training_data/ | 
| --pipeline | 处理链类型 | full | 
| --max-concurrent | 最大并发表数量 | 1 | 
| --verbose | 启用详细日志 | False | 
| --log-file | 日志文件路径 | 无 | 
| --no-filter-system-tables | 禁用系统表过滤 | False | 
| --check-permissions-only | 仅检查数据库权限 | False | 
python -m data_pipeline.ddl_generation.ddl_md_generator \
  --db-connection "postgresql://postgres:postgres@localhost:5432/highway_db" \
  --table-list ./tables.txt \
  --business-context "高速公路服务区管理系统"
python -m data_pipeline.ddl_generation.ddl_md_generator \
  --db-connection "postgresql://postgres:postgres@localhost:5432/highway_db" \
  --table-list ./tables.txt \
  --business-context "高速公路服务区管理系统" \
  --output-dir ./data_pipeline/training_data/ \
  --verbose
python -m data_pipeline.ddl_generation.ddl_md_generator \
  --db-connection "postgresql://postgres:postgres@localhost:5432/highway_db" \
  --check-permissions-only
必须先执行DDL和MD文档生成,确保输出目录中有完整的DDL和MD文件。
python -m data_pipeline.qa_generation.qs_generator \
  --output-dir <输出目录> \
  --table-list <表清单文件> \
  --business-context <业务上下文> \
  [可选参数]
| 参数 | 说明 | 示例 | 
|---|---|---|
| --output-dir | 包含DDL和MD文件的目录 | ./data_pipeline/training_data/ | 
| --table-list | 表清单文件路径(用于验证) | ./tables.txt | 
| --business-context | 业务上下文描述 | "高速公路服务区管理系统" | 
python -m data_pipeline.qa_generation.qs_generator \
  --output-dir ./data_pipeline/training_data/ \
  --table-list ./tables.txt \
  --business-context "高速公路服务区管理系统" \
   highway_db
python -m data_pipeline.qa_generation.qs_generator \
  --output-dir ./data_pipeline/training_data/ \
  --table-list ./tables.txt \
  --business-context "高速公路服务区管理系统" \
   highway_db \
  --verbose
生成Question-SQL对后,可以使用SQL验证功能。注意:命令行使用时,默认启用LLM修复和文件修改功能。
python -m data_pipeline.validators.sql_validate_cli \
  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
  --input-file ./qs_highway_db_20240123_143052_pair.json \
  --output-dir ./validation_reports
| 参数 | 说明 | 默认值 | 
|---|---|---|
| --db-connection | 数据库连接字符串 | 必需 | 
| --input-file | Question-SQL文件路径 | 必需 | 
| --output-dir | 验证报告输出目录 | 输入文件同目录 | 
| --disable-llm-repair | 禁用LLM修复功能 | False(默认启用修复) | 
| --no-modify-file | 不修改原始JSON文件 | False(默认修改原文件) | 
| --max-concurrent | 最大并发验证数 | 5 | 
| --batch-size | 批处理大小 | 10 | 
| --timeout | 单个验证超时时间(秒) | 30 | 
| --verbose | 启用详细日志 | False | 
| --dry-run | 仅解析文件不执行验证 | False | 
| --save-json | 保存详细JSON报告 | False | 
# 基本验证(默认:启用LLM修复和文件修改)
python -m data_pipeline.validators.sql_validate_cli \
  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
  --input-file ./data.json
# 仅生成报告,不修改文件
python -m data_pipeline.validators.sql_validate_cli \
  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
  --input-file ./data.json \
  --no-modify-file
# 启用文件修改,但禁用LLM修复(仅删除无效SQL)
python -m data_pipeline.validators.sql_validate_cli \
  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
  --input-file ./data.json \
  --disable-llm-repair
# 性能调优参数
python -m data_pipeline.validators.sql_validate_cli \
  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
  --input-file ./data.json \
  --max-concurrent 10 \
  --batch-size 20 \
  --timeout 60 \
  --verbose
# 使用训练数据管道
python -m data_pipeline.trainer.run_training \
  --data_path ./data_pipeline/training_data/
Data Pipeline 自动识别以下训练数据格式:
.ddl 文件 → train_ddl_statements().md/.markdown → train_documentation_blocks()_pair.json/_pairs.json → train_json_question_sql_pairs()_pair.sql/_pairs.sql → train_formatted_question_sql_pairs().sql (其他) → train_sql_examples()data_pipeline/training_data/
├── bss_service_area.ddl              # DDL文件
├── bss_service_area_detail.md        # MD文档
├── bss_company.ddl
├── bss_company_detail.md
├── qs_highway_db_20240123_143052_pair.json  # Q&A训练数据
├── filename_mapping.txt              # 文件名映射
└── logs/                            # 日志目录
    └── data_pipeline_20240123.log
配置文件位于 data_pipeline/config.py:
# DDL/MD生成相关配置
"output_directory": "./data_pipeline/training_data/",   # 输出目录
"create_subdirectories": False,                        # 不创建子目录
"max_concurrent_tables": 1,                            # 最大并发数(避免LLM并发问题)
"sample_data_limit": 20,                              # 数据采样量
"large_table_threshold": 1000000,                     # 大表阈值(100万行)
"filter_system_tables": True,                          # 过滤系统表
"continue_on_error": True,                             # 错误后继续
# Question-SQL生成配置
"qs_generation": {
    "max_tables": 20,                                 # 最大表数量限制
    "theme_count": 5,                                 # 主题数量
    "questions_per_theme": 10,                        # 每主题问题数
    "max_concurrent_themes": 1,                       # 并行主题数(避免LLM并发问题)
    "continue_on_theme_error": True,                  # 主题失败继续
    "save_intermediate": True,                        # 保存中间结果
}
# SQL验证配置
"sql_validation": {
    "max_concurrent_validations": 5,                  # 并发验证数
    "validation_timeout": 30,                         # 单个验证超时(秒)
    "batch_size": 10,                                 # 批处理大小
    "enable_sql_repair": False,                       # SQL修复功能(命令行覆盖为True)
    "modify_original_file": False,                    # 文件修改功能(命令行覆盖为True)
    "readonly_mode": True,                            # 启用只读模式
}
可以通过编辑 data_pipeline/config.py 文件来修改默认配置。
错误信息:
表数量(25)超过限制(20)。请分批处理或调整配置中的max_tables参数。
解决方案:
config.py 中增加 max_tables 限制错误信息:
DDL文件数量(5)与表数量(6)不一致
解决方案:
可能原因:
解决方案:
错误信息:
数据库查询权限不足
解决方案:
--check-permissions-only 检查权限Q: 工作流中途失败,如何恢复? A: 工作流编排器会保留已完成步骤的输出文件,可以手动从失败步骤开始重新执行。
Q: 如何只执行部分步骤?
A: 使用 --skip-validation 跳过SQL验证,或使用分步执行方式调用各个模块。
Q: 工作流执行时间过长怎么办? A: 可以通过减少表数量、调整并发参数、或分批处理来优化执行时间。
重要:SQL验证器的命令行模式与配置文件中的默认值不同:
enable_sql_repair=False, modify_original_file=False如需禁用,请明确使用 --disable-llm-repair 或 --no-modify-file 参数。
方式一:一键执行(推荐)
# 完整工作流程,一个命令搞定
python -m data_pipeline.schema_workflow \
  --db-connection "postgresql://user:pass@localhost:5432/highway_db" \
  --table-list tables.txt \
  --business-context "高速公路服务区管理系统" \
  --output-dir ./data_pipeline/training_data/
方式二:分步执行(调试时使用)
第一步:生成DDL和MD文档
python -m data_pipeline.ddl_generation.ddl_md_generator --db-connection "..." --table-list tables.txt --business-context "..." --output-dir ./data_pipeline/training_data/
根据需要手动调整
第三步:生成Question-SQL
python -m data_pipeline.qa_generation.qs_generator --output-dir ./data_pipeline/training_data/ --table-list tables.txt --business-context "..."
第四步:验证SQL(可选)
python -m data_pipeline.validators.sql_validate_cli --db-connection "..." --input-file ./qs_xxx.json
bash
python -m data_pipeline.trainer.run_training --data_path ./data_pipeline/training_data/
./data_pipeline/training_data/ 目录--verbose)观察执行过程--skip-validation 跳过验证步骤加快执行--no-modify-file 仅生成报告,手动审查后再决定是否修改data_pipeline/training_data/ 目录Data Pipeline 提供了完整的数据库逆向工程解决方案,从原始数据库schema到可用的训练数据,整个流程完全自动化。通过工作流编排器,用户可以一键完成所有步骤,也可以根据需要分步执行和调试。系统设计考虑了容错性、可扩展性和易用性,适合各种规模的数据处理需求。
一键执行(推荐):
# 完整的4步流程 python -m data_pipeline.schema_workflow
--db-connection "postgresql://user:pass@localhost:5432/highway_db" \
--table-list tables.txt \
--business-context "高速公路服务区管理系统" \
--output-dir ./data_pipeline/training_data/
# 如需跳过训练数据加载 python -m data_pipeline.schema_workflow
--db-connection "postgresql://user:pass@localhost:5432/test_db" \
--table-list tables.txt \
--business-context "测试系统" \
--skip-training-load
分步执行:
# 第1步:DDL/MD生成 python -m data_pipeline.ddl_generation.ddl_md_generator --db-connection "..." --table-list tables.txt --business-context "..."
# 第2步:Q&A生成 python -m data_pipeline.qa_generation.qs_generator --output-dir ./data_pipeline/training_data/ --table-list tables.txt --business-context "..."
# 第3步:SQL验证 python -m data_pipeline.validators.sql_validate_cli --db-connection "..." --input-file ./qs_xxx.json
# 第4步:训练数据加载 python -m data_pipeline.trainer.run_training --data_path ./data_pipeline/training_data/