Data Pipeline 是一个完整的数据库逆向工程和训练数据生成系统,提供以下核心功能:
pip install asyncpg asyncio
Data Pipeline 使用项目现有的 LLM 配置,无需额外配置数据库连接。
data_pipeline/
├── ddl_generation/ # DDL/MD文档生成工具
│ ├── ddl_md_generator.py
│ └── training_data_agent.py
├── qa_generation/ # Q&A生成工具
│ ├── qs_agent.py
│ └── qs_generator.py
├── validators/ # SQL验证工具
│ ├── sql_validate_cli.py
│ ├── sql_validation_agent.py
│ └── sql_validator.py
├── trainer/ # 训练数据管道
│ ├── run_training.py
│ └── vanna_trainer.py
├── training_data/ # 训练数据存储目录
├── tools/ # 核心工具
├── utils/ # 工具函数
├── config.py # 配置文件
└── schema_workflow.py # 工作流编排器
SchemaWorkflowOrchestrator
是 Data Pipeline 的核心组件,提供端到端的自动化处理流程:
python -m data_pipeline.schema_workflow \
--db-connection "postgresql://user:pass@localhost:5432/highway_db" \
--table-list tables.txt \
--business-context "高速公路服务区管理系统" \
--output-dir ./data_pipeline/training_data/
python -m data_pipeline.schema_workflow \
--db-connection "postgresql://user:pass@localhost:5432/ecommerce_db" \
--table-list tables.txt \
--business-context "电商系统" \
--skip-validation
python -m data_pipeline.schema_workflow \
--db-connection "postgresql://user:pass@localhost:5432/management_db" \
--table-list tables.txt \
--business-context "管理系统" \
--disable-llm-repair
python -m data_pipeline.schema_workflow \
--db-connection "postgresql://user:pass@localhost:5432/business_db" \
--table-list tables.txt \
--business-context "业务系统" \
--no-modify-file
import asyncio
from data_pipeline.schema_workflow import SchemaWorkflowOrchestrator
async def run_complete_workflow():
# 创建工作流编排器
orchestrator = SchemaWorkflowOrchestrator(
db_connection="postgresql://user:pass@localhost:5432/highway_db",
table_list_file="tables.txt",
business_context="高速公路服务区管理系统",
output_dir="./data_pipeline/training_data/",
enable_sql_validation=True, # 启用SQL验证
enable_llm_repair=True, # 启用LLM修复
modify_original_file=True # 修改原始JSON文件
)
# 执行完整工作流程
report = await orchestrator.execute_complete_workflow()
# 处理结果
if report["success"]:
print(f"✅ 工作流程执行成功!")
print(f"📄 最终输出文件: {report['final_outputs']['primary_output_file']}")
print(f"❓ 最终问题数量: {report['final_outputs']['final_question_count']}")
print(f"⏱️ 总耗时: {report['performance_metrics']['total_duration']} 秒")
else:
print(f"❌ 工作流程执行失败: {report['error']['message']}")
print(f"💥 失败步骤: {report['error']['failed_step']}")
# 运行工作流程
asyncio.run(run_complete_workflow())
参数 | 说明 | 默认值 |
---|---|---|
--db-connection |
数据库连接字符串(包含数据库名) | 必需 |
--table-list |
表清单文件路径 | 必需 |
--business-context |
业务上下文描述 | 必需 |
--output-dir |
输出目录 | ./data_pipeline/training_data/ |
--skip-validation |
跳过SQL验证步骤 | False (默认执行SQL验证) |
--disable-llm-repair |
禁用LLM修复功能 | False (默认启用LLM修复) |
--no-modify-file |
不修改原始JSON文件 | False (默认修改原文件) |
--skip-training-load |
跳过训练数据加载步骤 | False (默认执行训练数据加载) |
--verbose |
启用详细日志 | False |
--log-file |
日志文件路径 | 无 |
工作流编排器会生成详细的执行报告,包括:
{
"success": True,
"workflow_summary": {
"total_duration": 285.34,
"completed_steps": ["ddl_md_generation", "question_sql_generation", "sql_validation", "training_data_load"],
"total_steps": 4
},
"processing_results": {
"ddl_md_generation": {
"total_tables": 8,
"processed_successfully": 8,
"duration": 89.23
},
"question_sql_generation": {
"total_questions": 50,
"total_themes": 5,
"duration": 123.45
},
"sql_validation": {
"success_rate": 0.94,
"valid_sql_count": 47,
"invalid_sql_count": 3,
"duration": 32.99
},
"training_data_load": {
"total_records": 195,
"data_type_counts": {
"ddl": 8,
"documentation": 8,
"sql": 47
},
"duration": 39.67
}
},
"final_outputs": {
"primary_output_file": "./data_pipeline/training_data/qs_highway_db_20240123_143052_pair.json",
"final_question_count": 47
},
"performance_metrics": {
"step1_duration": 89.23,
"step2_duration": 123.45,
"step3_duration": 32.99,
"step4_duration": 39.67,
"total_duration": 285.34
}
}
python -m data_pipeline.ddl_generation.ddl_md_generator \
--db-connection <数据库连接字符串> \
--table-list <表清单文件> \
--business-context <业务上下文> \
[可选参数]
参数 | 说明 | 示例 |
---|---|---|
--db-connection |
PostgreSQL数据库连接字符串 | postgresql://user:pass@localhost:5432/dbname |
--table-list |
表清单文件路径 | ./tables.txt |
--business-context |
业务上下文描述 | "高速公路服务区管理系统" |
参数 | 说明 | 默认值 |
---|---|---|
--output-dir |
输出目录路径 | ./data_pipeline/training_data/ |
--pipeline |
处理链类型 | full |
--max-concurrent |
最大并发表数量 | 1 |
--verbose |
启用详细日志 | False |
--log-file |
日志文件路径 | 无 |
--no-filter-system-tables |
禁用系统表过滤 | False |
--check-permissions-only |
仅检查数据库权限 | False |
python -m data_pipeline.ddl_generation.ddl_md_generator \
--db-connection "postgresql://postgres:postgres@localhost:5432/highway_db" \
--table-list ./tables.txt \
--business-context "高速公路服务区管理系统"
python -m data_pipeline.ddl_generation.ddl_md_generator \
--db-connection "postgresql://postgres:postgres@localhost:5432/highway_db" \
--table-list ./tables.txt \
--business-context "高速公路服务区管理系统" \
--output-dir ./data_pipeline/training_data/ \
--verbose
python -m data_pipeline.ddl_generation.ddl_md_generator \
--db-connection "postgresql://postgres:postgres@localhost:5432/highway_db" \
--check-permissions-only
必须先执行DDL和MD文档生成,确保输出目录中有完整的DDL和MD文件。
python -m data_pipeline.qa_generation.qs_generator \
--output-dir <输出目录> \
--table-list <表清单文件> \
--business-context <业务上下文> \
[可选参数]
参数 | 说明 | 示例 |
---|---|---|
--output-dir |
包含DDL和MD文件的目录 | ./data_pipeline/training_data/ |
--table-list |
表清单文件路径(用于验证) | ./tables.txt |
--business-context |
业务上下文描述 | "高速公路服务区管理系统" |
python -m data_pipeline.qa_generation.qs_generator \
--output-dir ./data_pipeline/training_data/ \
--table-list ./tables.txt \
--business-context "高速公路服务区管理系统" \
highway_db
python -m data_pipeline.qa_generation.qs_generator \
--output-dir ./data_pipeline/training_data/ \
--table-list ./tables.txt \
--business-context "高速公路服务区管理系统" \
highway_db \
--verbose
生成Question-SQL对后,可以使用SQL验证功能。注意:命令行使用时,默认启用LLM修复和文件修改功能。
python -m data_pipeline.validators.sql_validate_cli \
--db-connection "postgresql://user:pass@localhost:5432/dbname" \
--input-file ./qs_highway_db_20240123_143052_pair.json \
--output-dir ./validation_reports
参数 | 说明 | 默认值 |
---|---|---|
--db-connection |
数据库连接字符串 | 必需 |
--input-file |
Question-SQL文件路径 | 必需 |
--output-dir |
验证报告输出目录 | 输入文件同目录 |
--disable-llm-repair |
禁用LLM修复功能 | False (默认启用修复) |
--no-modify-file |
不修改原始JSON文件 | False (默认修改原文件) |
--max-concurrent |
最大并发验证数 | 5 |
--batch-size |
批处理大小 | 10 |
--timeout |
单个验证超时时间(秒) | 30 |
--verbose |
启用详细日志 | False |
--dry-run |
仅解析文件不执行验证 | False |
--save-json |
保存详细JSON报告 | False |
# 基本验证(默认:启用LLM修复和文件修改)
python -m data_pipeline.validators.sql_validate_cli \
--db-connection "postgresql://user:pass@localhost:5432/dbname" \
--input-file ./data.json
# 仅生成报告,不修改文件
python -m data_pipeline.validators.sql_validate_cli \
--db-connection "postgresql://user:pass@localhost:5432/dbname" \
--input-file ./data.json \
--no-modify-file
# 启用文件修改,但禁用LLM修复(仅删除无效SQL)
python -m data_pipeline.validators.sql_validate_cli \
--db-connection "postgresql://user:pass@localhost:5432/dbname" \
--input-file ./data.json \
--disable-llm-repair
# 性能调优参数
python -m data_pipeline.validators.sql_validate_cli \
--db-connection "postgresql://user:pass@localhost:5432/dbname" \
--input-file ./data.json \
--max-concurrent 10 \
--batch-size 20 \
--timeout 60 \
--verbose
# 使用训练数据管道
python -m data_pipeline.trainer.run_training \
--data_path ./data_pipeline/training_data/
Data Pipeline 自动识别以下训练数据格式:
.ddl
文件 → train_ddl_statements()
.md/.markdown
→ train_documentation_blocks()
_pair.json/_pairs.json
→ train_json_question_sql_pairs()
_pair.sql/_pairs.sql
→ train_formatted_question_sql_pairs()
.sql
(其他) → train_sql_examples()
data_pipeline/training_data/
├── bss_service_area.ddl # DDL文件
├── bss_service_area_detail.md # MD文档
├── bss_company.ddl
├── bss_company_detail.md
├── qs_highway_db_20240123_143052_pair.json # Q&A训练数据
├── filename_mapping.txt # 文件名映射
└── logs/ # 日志目录
└── data_pipeline_20240123.log
配置文件位于 data_pipeline/config.py
:
# DDL/MD生成相关配置
"output_directory": "./data_pipeline/training_data/", # 输出目录
"create_subdirectories": False, # 不创建子目录
"max_concurrent_tables": 1, # 最大并发数(避免LLM并发问题)
"sample_data_limit": 20, # 数据采样量
"large_table_threshold": 1000000, # 大表阈值(100万行)
"filter_system_tables": True, # 过滤系统表
"continue_on_error": True, # 错误后继续
# Question-SQL生成配置
"qs_generation": {
"max_tables": 20, # 最大表数量限制
"theme_count": 5, # 主题数量
"questions_per_theme": 10, # 每主题问题数
"max_concurrent_themes": 1, # 并行主题数(避免LLM并发问题)
"continue_on_theme_error": True, # 主题失败继续
"save_intermediate": True, # 保存中间结果
}
# SQL验证配置
"sql_validation": {
"max_concurrent_validations": 5, # 并发验证数
"validation_timeout": 30, # 单个验证超时(秒)
"batch_size": 10, # 批处理大小
"enable_sql_repair": False, # SQL修复功能(命令行覆盖为True)
"modify_original_file": False, # 文件修改功能(命令行覆盖为True)
"readonly_mode": True, # 启用只读模式
}
可以通过编辑 data_pipeline/config.py
文件来修改默认配置。
错误信息:
表数量(25)超过限制(20)。请分批处理或调整配置中的max_tables参数。
解决方案:
config.py
中增加 max_tables
限制错误信息:
DDL文件数量(5)与表数量(6)不一致
解决方案:
可能原因:
解决方案:
错误信息:
数据库查询权限不足
解决方案:
--check-permissions-only
检查权限Q: 工作流中途失败,如何恢复? A: 工作流编排器会保留已完成步骤的输出文件,可以手动从失败步骤开始重新执行。
Q: 如何只执行部分步骤?
A: 使用 --skip-validation
跳过SQL验证,或使用分步执行方式调用各个模块。
Q: 工作流执行时间过长怎么办? A: 可以通过减少表数量、调整并发参数、或分批处理来优化执行时间。
重要:SQL验证器的命令行模式与配置文件中的默认值不同:
enable_sql_repair=False
, modify_original_file=False
如需禁用,请明确使用 --disable-llm-repair
或 --no-modify-file
参数。
方式一:一键执行(推荐)
# 完整工作流程,一个命令搞定
python -m data_pipeline.schema_workflow \
--db-connection "postgresql://user:pass@localhost:5432/highway_db" \
--table-list tables.txt \
--business-context "高速公路服务区管理系统" \
--output-dir ./data_pipeline/training_data/
方式二:分步执行(调试时使用)
第一步:生成DDL和MD文档
python -m data_pipeline.ddl_generation.ddl_md_generator --db-connection "..." --table-list tables.txt --business-context "..." --output-dir ./data_pipeline/training_data/
根据需要手动调整
第三步:生成Question-SQL
python -m data_pipeline.qa_generation.qs_generator --output-dir ./data_pipeline/training_data/ --table-list tables.txt --business-context "..."
第四步:验证SQL(可选)
python -m data_pipeline.validators.sql_validate_cli --db-connection "..." --input-file ./qs_xxx.json
bash
python -m data_pipeline.trainer.run_training --data_path ./data_pipeline/training_data/
./data_pipeline/training_data/
目录--verbose
)观察执行过程--skip-validation
跳过验证步骤加快执行--no-modify-file
仅生成报告,手动审查后再决定是否修改data_pipeline/training_data/
目录Data Pipeline 提供了完整的数据库逆向工程解决方案,从原始数据库schema到可用的训练数据,整个流程完全自动化。通过工作流编排器,用户可以一键完成所有步骤,也可以根据需要分步执行和调试。系统设计考虑了容错性、可扩展性和易用性,适合各种规模的数据处理需求。
一键执行(推荐):
# 完整的4步流程 python -m data_pipeline.schema_workflow
--db-connection "postgresql://user:pass@localhost:5432/highway_db" \
--table-list tables.txt \
--business-context "高速公路服务区管理系统" \
--output-dir ./data_pipeline/training_data/
# 如需跳过训练数据加载 python -m data_pipeline.schema_workflow
--db-connection "postgresql://user:pass@localhost:5432/test_db" \
--table-list tables.txt \
--business-context "测试系统" \
--skip-training-load
分步执行:
# 第1步:DDL/MD生成 python -m data_pipeline.ddl_generation.ddl_md_generator --db-connection "..." --table-list tables.txt --business-context "..."
# 第2步:Q&A生成 python -m data_pipeline.qa_generation.qs_generator --output-dir ./data_pipeline/training_data/ --table-list tables.txt --business-context "..."
# 第3步:SQL验证 python -m data_pipeline.validators.sql_validate_cli --db-connection "..." --input-file ./qs_xxx.json
# 第4步:训练数据加载 python -m data_pipeline.trainer.run_training --data_path ./data_pipeline/training_data/