Data Pipeline 是一个完整的数据库逆向工程和训练数据生成系统,将PostgreSQL数据库结构转换为vanna.ai格式的训练数据,包括DDL文档、Markdown说明、Question-SQL问答对以及训练数据加载。
data_pipeline/ # 数据管道模块
├── __init__.py # 模块入口
├── config.py # 统一配置文件
├── schema_workflow.py # 端到端工作流编排器
├── ddl_generation/ # DDL/MD生成模块
│ ├── __init__.py
│ ├── ddl_md_generator.py # 命令行入口
│ └── training_data_agent.py # 主AI Agent
├── qa_generation/ # Question-SQL生成模块
│ ├── __init__.py
│ ├── qs_agent.py # Q&A生成Agent
│ └── qs_generator.py # Q&A命令行入口
├── validators/ # SQL验证模块
│ ├── __init__.py
│ ├── sql_validation_agent.py # SQL验证Agent
│ ├── sql_validate_cli.py # SQL验证命令行入口
│ ├── sql_validator.py # SQL验证器核心
│ └── file_count_validator.py # 文件数量验证器
├── trainer/ # 训练数据管理模块
│ ├── __init__.py
│ ├── run_training.py # 训练数据加载脚本
│ └── vanna_trainer.py # 训练器核心模块
├── tools/ # Agent工具集
│ ├── __init__.py
│ ├── base.py # 基础工具类和注册机制
│ ├── database_inspector.py # 数据库元数据检查工具
│ ├── data_sampler.py # 数据采样工具
│ ├── comment_generator.py # LLM注释生成工具
│ ├── ddl_generator.py # DDL格式生成工具
│ └── doc_generator.py # MD文档生成工具
├── analyzers/ # 分析器模块
│ ├── __init__.py
│ ├── md_analyzer.py # MD文件分析器
│ └── theme_extractor.py # 主题提取器
├── utils/ # 工具函数
│ ├── __init__.py
│ ├── table_parser.py # 表清单解析器
│ ├── logger.py # 日志管理
│ ├── file_manager.py # 文件管理器
│ ├── data_structures.py # 数据结构定义
│ ├── large_table_handler.py # 大表处理
│ ├── permission_checker.py # 权限检查器
│ └── system_filter.py # 系统表过滤器
├── prompts/ # 提示词和业务词典
│ ├── __init__.py
│ └── business_dictionary.txt
└── training_data/ # 训练数据存储目录
├── *.ddl # DDL文件
├── *_detail.md # MD文档
├── qs_*_pair.json # 问答对文件
└── filename_mapping.txt # 文件映射
SchemaWorkflowOrchestrator
schema_workflow.py
SchemaTrainingDataAgent
ddl_generation/training_data_agent.py
QuestionSQLGenerationAgent
qa_generation/qs_agent.py
SQLValidationAgent
validators/sql_validation_agent.py
trainer/run_training.py
graph TD
A[开始] --> B[步骤1: DDL/MD生成]
B --> C{成功?}
C -->|否| D[生成错误报告]
C -->|是| E[步骤2: Question-SQL生成]
E --> F{成功?}
F -->|否| D
F -->|是| G{启用SQL验证?}
G -->|否| H[步骤4: 训练数据加载]
G -->|是| I[步骤3: SQL验证和修复]
I --> J{成功?}
J -->|否| D
J -->|是| K{启用训练数据加载?}
K -->|否| L[生成最终报告]
K -->|是| H
H --> M{成功?}
M -->|否| D
M -->|是| L
L --> N[完成]
D --> N
graph TD
A[开始处理表] --> B[DatabaseInspectorTool]
B --> C[DataSamplerTool]
C --> D[CommentGeneratorTool<br/>生成注释+枚举建议]
D --> E[验证枚举字段<br/>SELECT DISTINCT]
E --> F[DDLGeneratorTool]
F --> G[DocGeneratorTool]
G --> H[写入文件]
H --> I[记录日志]
I --> J[完成]
graph TD
A[开始] --> B[FileCountValidator<br/>验证文件数量]
B --> C{验证通过?}
C -->|否| D[报错退出]
C -->|是| E[MDFileAnalyzer<br/>读取所有MD文件]
E --> F[ThemeExtractor<br/>提取分析主题]
F --> G[处理每个主题]
G --> H[生成Question-SQL对]
H --> I[保存JSON文件]
I --> J[完成]
graph TD
A[开始] --> B[读取Question-SQL文件]
B --> C[提取SQL语句]
C --> D[批量验证SQL]
D --> E{有失败的SQL?}
E -->|否| F[生成验证报告]
E -->|是| G{启用LLM修复?}
G -->|否| H[仅生成报告]
G -->|是| I[LLM修复失败SQL]
I --> J[重新验证修复后SQL]
J --> K{启用文件修改?}
K -->|否| F
K -->|是| L[创建备份文件]
L --> M[更新原文件]
M --> N[生成修改日志]
N --> F
F --> O[完成]
H --> F
graph TD
A[开始] --> B[扫描训练数据目录]
B --> C[识别文件类型]
C --> D[处理DDL文件]
C --> E[处理MD文件]
C --> F[处理JSON问答对]
C --> G[处理SQL文件]
D --> H[加载到向量数据库]
E --> H
F --> H
G --> H
H --> I[验证加载结果]
I --> J[生成统计报告]
J --> K[完成]
所有data_pipeline相关配置统一在 data_pipeline/config.py
中:
SCHEMA_TOOLS_CONFIG = {
# 核心配置
"default_db_connection": None,
"default_business_context": "数据库管理系统",
"output_directory": "./data_pipeline/training_data/",
# 处理链配置
"default_pipeline": "full",
"available_pipelines": {
"full": ["database_inspector", "data_sampler", "comment_generator", "ddl_generator", "doc_generator"],
"ddl_only": ["database_inspector", "data_sampler", "comment_generator", "ddl_generator"],
"analysis_only": ["database_inspector", "data_sampler", "comment_generator"]
},
# 数据处理配置
"sample_data_limit": 20,
"enum_detection_sample_limit": 5000,
"enum_max_distinct_values": 20,
"large_table_threshold": 1000000,
# 并发配置
"max_concurrent_tables": 1, # 避免LLM并发调用问题
# Question-SQL生成配置
"qs_generation": {
"max_tables": 20,
"theme_count": 5,
"questions_per_theme": 10,
"max_concurrent_themes": 1,
"continue_on_theme_error": True,
"save_intermediate": True,
"output_file_prefix": "qs",
},
# SQL验证配置
"sql_validation": {
"max_concurrent_validations": 5,
"validation_timeout": 30,
"batch_size": 10,
"enable_sql_repair": False, # 默认禁用
"modify_original_file": False, # 默认禁用
"readonly_mode": True,
}
}
命令行参数 > data_pipeline/config.py > 默认值
app_config.py
): LLM配置、向量数据库配置、全局开关data_pipeline/config.py
): 数据管道专用配置# 完整4步流程
python -m data_pipeline.schema_workflow \
--db-connection "postgresql://user:pass@localhost:5432/database_name" \
--table-list tables.txt \
--business-context "高速公路服务区管理系统" \
--output-dir ./data_pipeline/training_data/
# 跳过SQL验证
python -m data_pipeline.schema_workflow \
--db-connection "postgresql://user:pass@localhost:5432/database_name" \
--table-list tables.txt \
--business-context "电商系统" \
--skip-validation
# 跳过训练数据加载
python -m data_pipeline.schema_workflow \
--db-connection "postgresql://user:pass@localhost:5432/database_name" \
--table-list tables.txt \
--business-context "管理系统" \
--skip-training-load
# 第1步:DDL/MD生成
python -m data_pipeline.ddl_generation.ddl_md_generator \
--db-connection "postgresql://user:pass@localhost:5432/database" \
--table-list tables.txt \
--business-context "业务描述"
# 第2步:Q&A生成
python -m data_pipeline.qa_generation.qs_generator \
--output-dir ./data_pipeline/training_data/ \
--table-list tables.txt \
--business-context "业务描述"
# 第3步:SQL验证
python -m data_pipeline.validators.sql_validate_cli \
--db-connection "postgresql://user:pass@localhost:5432/database" \
--input-file ./qs_xxx.json
# 第4步:训练数据加载
python -m data_pipeline.trainer.run_training \
--data_path ./data_pipeline/training_data/
import asyncio
from data_pipeline.schema_workflow import SchemaWorkflowOrchestrator
async def run_complete_workflow():
orchestrator = SchemaWorkflowOrchestrator(
db_connection="postgresql://user:pass@localhost:5432/database_name",
table_list_file="tables.txt",
business_context="高速公路服务区管理系统",
output_dir="./data_pipeline/training_data/",
enable_sql_validation=True,
enable_llm_repair=True,
modify_original_file=True,
enable_training_data_load=True
)
report = await orchestrator.execute_complete_workflow()
return report
# 运行工作流程
asyncio.run(run_complete_workflow())
-- 中文名: 存储高速公路管理公司信息
-- 描述: 存储高速公路管理公司信息,用于服务区运营管理
create table public.bss_company (
id varchar(32) not null -- 主键ID,主键,
version integer not null -- 版本号,
company_name varchar(255) -- 公司名称,
primary key (id)
);
## bss_company(存储高速公路管理公司信息)
bss_company 表存储高速公路管理公司信息,用于服务区运营管理
字段列表:
- id (varchar(32)) - 主键ID [主键, 非空] [示例: 30675d85ba5044c31acfa243b9d16334]
- company_name (varchar(255)) - 公司名称 [示例: 上饶分公司, 宜春分公司]
字段补充说明:
- id 为主键
[
{
"question": "查询所有公司信息",
"sql": "SELECT * FROM bss_company WHERE delete_ts IS NULL"
},
{
"question": "按公司统计服务区数量",
"sql": "SELECT company_name, COUNT(*) FROM bss_service_area GROUP BY company_name"
}
]
def generate_safe_filename(schema_name: str, table_name: str, suffix: str) -> str:
"""
生成安全的文件名,避免冲突
规则:
- public.table_name → table_name.ddl
- schema.table_name → schema__table_name.ddl
- 特殊字符替换: . → __, - → _, 空格 → _
"""
if schema_name.lower() == 'public':
safe_name = table_name
else:
safe_name = f"{schema_name}__{table_name}"
# 替换特殊字符
safe_name = safe_name.replace('.', '__').replace('-', '_').replace(' ', '_')
return f"{safe_name}{suffix}"
data_pipeline/training_data/
├── bss_company.ddl # public.bss_company
├── hr__employees.ddl # hr.employees
├── sales__order_items.ddl # sales.order_items
├── bss_company_detail.md # 对应的MD文档
├── hr__employees_detail.md
├── sales__order_items_detail.md
├── qs_highway_db_20240626_pair.json # Question-SQL对文件
├── filename_mapping.txt # 文件名映射
└── sql_validation_20240626_summary.log # SQL验证报告
Data Pipeline系统提供了完整的数据库逆向工程解决方案,从原始数据库schema到可用的训练数据,整个流程完全自动化。系统设计遵循以下原则:
通过工作流编排器,用户可以一键完成所有步骤,也可以根据需要分步执行和调试。系统适合各种规模的数据处理需求,是vanna.ai训练数据准备的理想解决方案。