# Data Pipeline 系统概要设计说明书
## 1. 项目概述
### 1.1 项目目标
**Data Pipeline** 是一个完整的数据库逆向工程和训练数据生成系统,将PostgreSQL数据库结构转换为vanna.ai格式的训练数据,包括DDL文档、Markdown说明、Question-SQL问答对以及训练数据加载。
### 1.2 核心功能
- 自动连接PostgreSQL数据库
- 批量处理表清单,支持多Schema
- 生成带中文注释的DDL文件
- 生成详细的MD格式说明文档
- LLM辅助的智能注释生成和枚举检测
- Question-SQL训练数据生成
- SQL验证和自动修复
- 训练数据加载到向量数据库
- 端到端工作流编排
- 完整的错误处理和日志记录
## 2. 系统架构
### 2.1 整体架构
```
data_pipeline/ # 数据管道模块
├── __init__.py # 模块入口
├── config.py # 统一配置文件
├── schema_workflow.py # 端到端工作流编排器
├── ddl_generation/ # DDL/MD生成模块
│ ├── __init__.py
│ ├── ddl_md_generator.py # 命令行入口
│ └── training_data_agent.py # 主AI Agent
├── qa_generation/ # Question-SQL生成模块
│ ├── __init__.py
│ ├── qs_agent.py # Q&A生成Agent
│ └── qs_generator.py # Q&A命令行入口
├── validators/ # SQL验证模块
│ ├── __init__.py
│ ├── sql_validation_agent.py # SQL验证Agent
│ ├── sql_validate_cli.py # SQL验证命令行入口
│ ├── sql_validator.py # SQL验证器核心
│ └── file_count_validator.py # 文件数量验证器
├── trainer/ # 训练数据管理模块
│ ├── __init__.py
│ ├── run_training.py # 训练数据加载脚本
│ └── vanna_trainer.py # 训练器核心模块
├── tools/ # Agent工具集
│ ├── __init__.py
│ ├── base.py # 基础工具类和注册机制
│ ├── database_inspector.py # 数据库元数据检查工具
│ ├── data_sampler.py # 数据采样工具
│ ├── comment_generator.py # LLM注释生成工具
│ ├── ddl_generator.py # DDL格式生成工具
│ └── doc_generator.py # MD文档生成工具
├── analyzers/ # 分析器模块
│ ├── __init__.py
│ ├── md_analyzer.py # MD文件分析器
│ └── theme_extractor.py # 主题提取器
├── utils/ # 工具函数
│ ├── __init__.py
│ ├── table_parser.py # 表清单解析器
│ ├── logger.py # 日志管理
│ ├── file_manager.py # 文件管理器
│ ├── data_structures.py # 数据结构定义
│ ├── large_table_handler.py # 大表处理
│ ├── permission_checker.py # 权限检查器
│ └── system_filter.py # 系统表过滤器
├── prompts/ # 提示词和业务词典
│ ├── __init__.py
│ └── business_dictionary.txt
└── training_data/ # 训练数据存储目录
├── *.ddl # DDL文件
├── *_detail.md # MD文档
├── qs_*_pair.json # 问答对文件
└── filename_mapping.txt # 文件映射
```
### 2.2 核心组件
#### 2.2.1 工作流编排器
- **类名**: `SchemaWorkflowOrchestrator`
- **文件**: `schema_workflow.py`
- **职责**: 端到端执行完整的数据管道流程
- **特点**: 统一管理4个步骤的执行,支持配置覆盖
#### 2.2.2 DDL/MD生成Agent
- **类名**: `SchemaTrainingDataAgent`
- **文件**: `ddl_generation/training_data_agent.py`
- **职责**: 数据库逆向工程,生成DDL和MD文档
- **特点**: 单一Agent管理多工具的架构
#### 2.2.3 Question-SQL生成Agent
- **类名**: `QuestionSQLGenerationAgent`
- **文件**: `qa_generation/qs_agent.py`
- **职责**: 基于DDL/MD生成Question-SQL训练数据对
- **特点**: 独立的功能模块,支持主题分析和并发生成
#### 2.2.4 SQL验证Agent
- **类名**: `SQLValidationAgent`
- **文件**: `validators/sql_validation_agent.py`
- **职责**: 验证Question-SQL对中的SQL语句有效性,支持LLM自动修复
- **特点**: 支持并发验证、自动修复、原文件修改
#### 2.2.5 训练数据管理
- **文件**: `trainer/run_training.py`
- **职责**: 将生成的训练数据加载到向量数据库
- **特点**: 支持多种文件格式,自动识别和分类
#### 2.2.6 Agent工具集(基于装饰器注册)
1. **DatabaseInspectorTool**: 获取表元数据
2. **DataSamplerTool**: 采样表数据
3. **CommentGeneratorTool**: LLM生成注释和枚举建议
4. **DDLGeneratorTool**: 生成DDL格式文件
5. **DocGeneratorTool**: 生成MD文档
#### 2.2.7 验证器和分析器
1. **FileCountValidator**: 验证DDL和MD文件数量
2. **SQLValidator**: 验证SQL语句有效性,支持LLM自动修复
3. **MDFileAnalyzer**: 读取和分析MD文件内容
4. **ThemeExtractor**: 使用LLM提取业务分析主题
## 3. 详细设计
### 3.1 完整工作流程(4步骤)
```mermaid
graph TD
A[开始] --> B[步骤1: DDL/MD生成]
B --> C{成功?}
C -->|否| D[生成错误报告]
C -->|是| E[步骤2: Question-SQL生成]
E --> F{成功?}
F -->|否| D
F -->|是| G{启用SQL验证?}
G -->|否| H[步骤4: 训练数据加载]
G -->|是| I[步骤3: SQL验证和修复]
I --> J{成功?}
J -->|否| D
J -->|是| K{启用训练数据加载?}
K -->|否| L[生成最终报告]
K -->|是| H
H --> M{成功?}
M -->|否| D
M -->|是| L
L --> N[完成]
D --> N
```
### 3.2 DDL/MD生成流程
```mermaid
graph TD
A[开始处理表] --> B[DatabaseInspectorTool]
B --> C[DataSamplerTool]
C --> D[CommentGeneratorTool
生成注释+枚举建议]
D --> E[验证枚举字段
SELECT DISTINCT]
E --> F[DDLGeneratorTool]
F --> G[DocGeneratorTool]
G --> H[写入文件]
H --> I[记录日志]
I --> J[完成]
```
### 3.3 Question-SQL生成流程
```mermaid
graph TD
A[开始] --> B[FileCountValidator
验证文件数量]
B --> C{验证通过?}
C -->|否| D[报错退出]
C -->|是| E[MDFileAnalyzer
读取所有MD文件]
E --> F[ThemeExtractor
提取分析主题]
F --> G[处理每个主题]
G --> H[生成Question-SQL对]
H --> I[保存JSON文件]
I --> J[完成]
```
### 3.4 SQL验证和修复流程
```mermaid
graph TD
A[开始] --> B[读取Question-SQL文件]
B --> C[提取SQL语句]
C --> D[批量验证SQL]
D --> E{有失败的SQL?}
E -->|否| F[生成验证报告]
E -->|是| G{启用LLM修复?}
G -->|否| H[仅生成报告]
G -->|是| I[LLM修复失败SQL]
I --> J[重新验证修复后SQL]
J --> K{启用文件修改?}
K -->|否| F
K -->|是| L[创建备份文件]
L --> M[更新原文件]
M --> N[生成修改日志]
N --> F
F --> O[完成]
H --> F
```
### 3.5 训练数据加载流程
```mermaid
graph TD
A[开始] --> B[扫描训练数据目录]
B --> C[识别文件类型]
C --> D[处理DDL文件]
C --> E[处理MD文件]
C --> F[处理JSON问答对]
C --> G[处理SQL文件]
D --> H[加载到向量数据库]
E --> H
F --> H
G --> H
H --> I[验证加载结果]
I --> J[生成统计报告]
J --> K[完成]
```
## 4. 配置设计
### 4.1 统一配置架构
所有data_pipeline相关配置统一在 `data_pipeline/config.py` 中:
```python
SCHEMA_TOOLS_CONFIG = {
# 核心配置
"default_db_connection": None,
"default_business_context": "数据库管理系统",
"output_directory": "./data_pipeline/training_data/",
# 处理链配置
"default_pipeline": "full",
"available_pipelines": {
"full": ["database_inspector", "data_sampler", "comment_generator", "ddl_generator", "doc_generator"],
"ddl_only": ["database_inspector", "data_sampler", "comment_generator", "ddl_generator"],
"analysis_only": ["database_inspector", "data_sampler", "comment_generator"]
},
# 数据处理配置
"sample_data_limit": 20,
"enum_detection_sample_limit": 5000,
"enum_max_distinct_values": 20,
"large_table_threshold": 1000000,
# 并发配置
"max_concurrent_tables": 1, # 避免LLM并发调用问题
# Question-SQL生成配置
"qs_generation": {
"max_tables": 20,
"theme_count": 5,
"questions_per_theme": 10,
"max_concurrent_themes": 1,
"continue_on_theme_error": True,
"save_intermediate": True,
"output_file_prefix": "qs",
},
# SQL验证配置
"sql_validation": {
"max_concurrent_validations": 5,
"validation_timeout": 30,
"batch_size": 10,
"enable_sql_repair": False, # 默认禁用
"modify_original_file": False, # 默认禁用
"readonly_mode": True,
}
}
```
### 4.2 配置优先级
```
命令行参数 > data_pipeline/config.py > 默认值
```
### 4.3 与全局配置的分离
- **系统级配置** (`app_config.py`): LLM配置、向量数据库配置、全局开关
- **模块级配置** (`data_pipeline/config.py`): 数据管道专用配置
- **清晰分工**: 避免配置冲突和重复
## 5. 使用方式
### 5.1 端到端工作流(推荐)
```bash
# 完整4步流程
python -m data_pipeline.schema_workflow \
--db-connection "postgresql://user:pass@localhost:5432/database_name" \
--table-list tables.txt \
--business-context "高速公路服务区管理系统" \
--output-dir ./data_pipeline/training_data/
# 跳过SQL验证
python -m data_pipeline.schema_workflow \
--db-connection "postgresql://user:pass@localhost:5432/database_name" \
--table-list tables.txt \
--business-context "电商系统" \
--skip-validation
# 跳过训练数据加载
python -m data_pipeline.schema_workflow \
--db-connection "postgresql://user:pass@localhost:5432/database_name" \
--table-list tables.txt \
--business-context "管理系统" \
--skip-training-load
```
### 5.2 分步执行
```bash
# 第1步:DDL/MD生成
python -m data_pipeline.ddl_generation.ddl_md_generator \
--db-connection "postgresql://user:pass@localhost:5432/database" \
--table-list tables.txt \
--business-context "业务描述"
# 第2步:Q&A生成
python -m data_pipeline.qa_generation.qs_generator \
--output-dir ./data_pipeline/training_data/ \
--table-list tables.txt \
--business-context "业务描述"
# 第3步:SQL验证
python -m data_pipeline.validators.sql_validate_cli \
--db-connection "postgresql://user:pass@localhost:5432/database" \
--input-file ./qs_xxx.json
# 第4步:训练数据加载
python -m data_pipeline.trainer.run_training \
--data_path ./data_pipeline/training_data/
```
### 5.3 编程方式
```python
import asyncio
from data_pipeline.schema_workflow import SchemaWorkflowOrchestrator
async def run_complete_workflow():
orchestrator = SchemaWorkflowOrchestrator(
db_connection="postgresql://user:pass@localhost:5432/database_name",
table_list_file="tables.txt",
business_context="高速公路服务区管理系统",
output_dir="./data_pipeline/training_data/",
enable_sql_validation=True,
enable_llm_repair=True,
modify_original_file=True,
enable_training_data_load=True
)
report = await orchestrator.execute_complete_workflow()
return report
# 运行工作流程
asyncio.run(run_complete_workflow())
```
## 6. 文件格式设计
### 6.1 DDL文件格式
```sql
-- 中文名: 存储高速公路管理公司信息
-- 描述: 存储高速公路管理公司信息,用于服务区运营管理
create table public.bss_company (
id varchar(32) not null -- 主键ID,主键,
version integer not null -- 版本号,
company_name varchar(255) -- 公司名称,
primary key (id)
);
```
### 6.2 MD文档格式
```markdown
## bss_company(存储高速公路管理公司信息)
bss_company 表存储高速公路管理公司信息,用于服务区运营管理
字段列表:
- id (varchar(32)) - 主键ID [主键, 非空] [示例: 30675d85ba5044c31acfa243b9d16334]
- company_name (varchar(255)) - 公司名称 [示例: 上饶分公司, 宜春分公司]
字段补充说明:
- id 为主键
```
### 6.3 Question-SQL文件格式
```json
[
{
"question": "查询所有公司信息",
"sql": "SELECT * FROM bss_company WHERE delete_ts IS NULL"
},
{
"question": "按公司统计服务区数量",
"sql": "SELECT company_name, COUNT(*) FROM bss_service_area GROUP BY company_name"
}
]
```
## 7. 多Schema支持
### 7.1 文件命名防冲突机制
```python
def generate_safe_filename(schema_name: str, table_name: str, suffix: str) -> str:
"""
生成安全的文件名,避免冲突
规则:
- public.table_name → table_name.ddl
- schema.table_name → schema__table_name.ddl
- 特殊字符替换: . → __, - → _, 空格 → _
"""
if schema_name.lower() == 'public':
safe_name = table_name
else:
safe_name = f"{schema_name}__{table_name}"
# 替换特殊字符
safe_name = safe_name.replace('.', '__').replace('-', '_').replace(' ', '_')
return f"{safe_name}{suffix}"
```
### 7.2 输出目录结构
```
data_pipeline/training_data/
├── bss_company.ddl # public.bss_company
├── hr__employees.ddl # hr.employees
├── sales__order_items.ddl # sales.order_items
├── bss_company_detail.md # 对应的MD文档
├── hr__employees_detail.md
├── sales__order_items_detail.md
├── qs_highway_db_20240626_pair.json # Question-SQL对文件
├── filename_mapping.txt # 文件名映射
└── sql_validation_20240626_summary.log # SQL验证报告
```
## 8. 错误处理与容错
### 8.1 错误处理策略
#### 8.1.1 工作流级错误处理
- 步骤失败 → 生成详细错误报告,包含失败原因和已完成步骤
- 支持从失败步骤重新开始
- 保留中间结果用于调试
#### 8.1.2 表级错误处理
- 某表处理失败 → 记录错误,继续下一表
- 失败表数超过阈值 → 警告但继续执行
#### 8.1.3 LLM调用错误处理
- 超时/失败 → 自动重试(最大3次)
- 重试失败 → 使用原始注释或默认注释
### 8.2 日志设计
#### 8.2.1 日志级别
- **INFO**: 正常处理流程
- **WARNING**: 可恢复错误
- **ERROR**: 影响结果的错误
#### 8.2.2 日志输出
- **控制台**: 进度信息和关键错误
- **文件**: 详细执行日志
- **格式**: 包含时间戳、级别、组件、消息
## 9. 性能考虑
### 9.1 并发控制
- 表级并发: 控制数据库连接数
- LLM调用: 避免过于频繁的API调用(建议最大并发1)
- 内存管理: 及时释放大数据集
### 9.2 数据采样优化
- 限制采样数量避免大表性能问题
- 智能采样策略(分层采样)
- 大表检测和特殊处理
### 9.3 缓存策略
- 表元数据缓存(单次运行内)
- LLM结果缓存(避免重复调用)
- 连接池复用
## 10. 扩展性设计
### 10.1 数据库适配
- 当前支持: PostgreSQL
- 预留扩展: MySQL适配接口
- 设计原则: 数据库特定代码隔离在DatabaseInspectorTool中
### 10.2 LLM模型适配
- 当前支持: qwen/deepseek/ollama
- 复用现有vanna配置,自动适配不同模型
- 提示词模板支持不同模型的特点
### 10.3 输出格式扩展
- 当前: DDL + MD + JSON
- 预留: Excel格式、其他结构化格式
### 10.4 工作流扩展
- 模块化设计,可添加新的处理步骤
- 插件式架构,支持自定义工具
- 配置驱动,灵活组合处理链
## 11. 总结
Data Pipeline系统提供了完整的数据库逆向工程解决方案,从原始数据库schema到可用的训练数据,整个流程完全自动化。系统设计遵循以下原则:
1. **模块化**: 清晰的模块划分,便于维护和扩展
2. **可配置**: 丰富的配置选项,适应不同场景需求
3. **容错性**: 完善的错误处理,确保系统稳定运行
4. **可观测**: 详细的日志和报告,便于问题定位
5. **高性能**: 合理的并发控制和资源管理
6. **可扩展**: 预留扩展接口,支持未来功能增强
通过工作流编排器,用户可以一键完成所有步骤,也可以根据需要分步执行和调试。系统适合各种规模的数据处理需求,是vanna.ai训练数据准备的理想解决方案。