wangxq 32e79e37cb 准备修改解决api 执行时的pgvector备份问题. 10 tháng trước cách đây
..
analyzers e606828120 修正了data_pipline日志的模块,它现在是独立的,下面准备修改作业执行过程的监控功能。 11 tháng trước cách đây
api 32e79e37cb 准备修改解决api 执行时的pgvector备份问题. 10 tháng trước cách đây
ddl_generation 32e79e37cb 准备修改解决api 执行时的pgvector备份问题. 10 tháng trước cách đây
dp_logging 346d9bb862 重构了整个项目的log服务,测试时又发现了Event loop is closed错误,准备修复这个错误。 10 tháng trước cách đây
prompts 847e45252b 训练数据生成与加载模块重构完成. 11 tháng trước cách đây
qa_generation 32e79e37cb 准备修改解决api 执行时的pgvector备份问题. 10 tháng trước cách đây
sql d6ffe2fac0 发现数据库连接的bug,准备为data_pipeline模块修改单例模式。 11 tháng trước cách đây
tools bb0b2a4687 修复data_pipeline单实例的数据库连接问题,增加上传数据文件到task目录的API. 11 tháng trước cách đây
trainer 32e79e37cb 准备修改解决api 执行时的pgvector备份问题. 10 tháng trước cách đây
training_data 32e79e37cb 准备修改解决api 执行时的pgvector备份问题. 10 tháng trước cách đây
utils 32e79e37cb 准备修改解决api 执行时的pgvector备份问题. 10 tháng trước cách đây
validators 32e79e37cb 准备修改解决api 执行时的pgvector备份问题. 10 tháng trước cách đây
README.md 847e45252b 训练数据生成与加载模块重构完成. 11 tháng trước cách đây
__init__.py 847e45252b 训练数据生成与加载模块重构完成. 11 tháng trước cách đây
config.py 1946fe5ac3 已经为./data_pipeline模块的命令行模式添加了truncate/backup参数,现在准备为API添加这两个参数。 10 tháng trước cách đây
create_task_cli.py 32e79e37cb 准备修改解决api 执行时的pgvector备份问题. 10 tháng trước cách đây
metadata_only_generator.py e606828120 修正了data_pipline日志的模块,它现在是独立的,下面准备修改作业执行过程的监控功能。 11 tháng trước cách đây
schema_workflow.py 32e79e37cb 准备修改解决api 执行时的pgvector备份问题. 10 tháng trước cách đây
tables.txt 246c3b61b0 修复./data_pipeline模块脚本执行时的问题,增加输出日志,修改output目录下也要创建task子目录。 10 tháng trước cách đây
task_executor.py 32e79e37cb 准备修改解决api 执行时的pgvector备份问题. 10 tháng trước cách đây

README.md

Schema Tools

自动化数据库逆向工程工具,用于从PostgreSQL数据库生成vanna.ai格式的训练数据。

功能特性

  • 🚀 自动连接PostgreSQL数据库
  • 📋 批量处理表清单
  • 🤖 LLM智能生成中文注释
  • 🔍 自动检测枚举字段
  • ⚡ 并发处理提高效率
  • 📁 生成标准化的DDL和MD文档
  • 🛡️ 完整的错误处理和日志记录
  • 🎯 新增:Question-SQL训练数据生成
  • 新增:SQL语句有效性验证
  • 🎭 新增:端到端工作流编排器

安装依赖

pip install asyncpg asyncio

使用方法

0. 🚀 一键执行完整工作流程(推荐)

使用新的工作流编排器,一个命令完成所有步骤:

命令行方式

python -m schema_tools.schema_workflow \
  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
  --table-list tables.txt \
  --business-context "高速公路服务区管理系统" \
  --db-name highway_db \
  --output-dir ./output

编程方式

import asyncio
from schema_tools.schema_workflow import SchemaWorkflowOrchestrator

async def run_complete_workflow():
    orchestrator = SchemaWorkflowOrchestrator(
        db_connection="postgresql://user:pass@localhost:5432/dbname",
        table_list_file="tables.txt",
        business_context="高速公路服务区管理系统",
        db_name="highway_db",
        output_dir="./output"
    )
    
    # 一键执行完整流程
    report = await orchestrator.execute_complete_workflow()
    
    if report["success"]:
        print(f"✅ 编排完成!最终生成 {report['final_outputs']['final_question_count']} 个问答对")
        print(f"📄 输出文件: {report['final_outputs']['primary_output_file']}")
    else:
        print(f"❌ 编排失败: {report['error']['message']}")

asyncio.run(run_complete_workflow())

工作流编排器特性:

  • 🔄 自动执行:DDL/MD生成 → Question-SQL生成 → SQL验证修复
  • 📊 详细报告:每个步骤的执行状态和性能指标
  • 🛠️ 灵活配置:可选择跳过验证、禁用修复等
  • 💾 中间结果保护:失败时保留已完成步骤的输出
  • 🎯 智能恢复:支持从中断点继续执行

工作流编排器命令行选项

# 跳过SQL验证
python -m schema_tools.schema_workflow \
  --db-connection "postgresql://..." --table-list tables.txt \
  --business-context "系统" --db-name test_db --skip-validation

# 禁用LLM修复
python -m schema_tools.schema_workflow \
  --db-connection "postgresql://..." --table-list tables.txt \
  --business-context "系统" --db-name test_db --disable-llm-repair

# 详细日志
python -m schema_tools.schema_workflow \
  --db-connection "postgresql://..." --table-list tables.txt \
  --business-context "系统" --db-name test_db --verbose

1. 生成DDL和MD文档(分步执行)

基本使用

python -m schema_tools \
  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
  --table-list tables.txt \
  --business-context "高速公路服务区管理系统"

指定输出目录和处理链

python -m schema_tools \
  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
  --table-list tables.txt \
  --business-context "电商系统" \
  --output-dir ./output \
  --pipeline full

2. 生成Question-SQL训练数据(分步执行)

在生成DDL和MD文件后,可以使用新的Question-SQL生成功能:

python -m schema_tools.qs_generator \
  --output-dir ./output \
  --table-list ./tables.txt \
  --business-context "高速公路服务区管理系统" \
  --db-name highway_db

这将:

  1. 验证DDL和MD文件数量是否正确
  2. 读取所有MD文件内容
  3. 使用LLM提取业务分析主题
  4. 为每个主题生成10个Question-SQL对
  5. 输出到 qs_highway_db_时间戳_pair.json 文件

3. 验证SQL语句有效性(分步执行)

在生成Question-SQL对后,可以验证其中的SQL语句:

python -m schema_tools.sql_validator \
  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
  --input-file ./qs_highway_db_20240101_143052_pair.json \
  --output-dir ./validation_reports

这将:

  1. 读取Question-SQL对文件
  2. 使用PostgreSQL的EXPLAIN语句验证每个SQL
  3. 生成详细的验证报告
  4. 统计成功率和性能指标

SQL验证高级选项

# 基本验证(启用修复和文件修改)
python -m schema_tools.sql_validator \
  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
  --input-file ./data.json

# 仅生成报告,不修改文件
python -m schema_tools.sql_validator \
  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
  --input-file ./data.json \
  --no-modify-file

# 启用文件修改,但禁用LLM修复(仅删除无效SQL)
python -m schema_tools.sql_validator \
  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
  --input-file ./data.json \
  --disable-llm-repair

# 性能调优参数
python -m schema_tools.sql_validator \
  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
  --input-file ./data.json \
  --max-concurrent 10 \
  --batch-size 20 \
  --timeout 60 \
  --verbose

4. 编程方式使用(分步执行)

生成DDL/MD文档

import asyncio
from schema_tools import SchemaTrainingDataAgent

async def generate_training_data():
    agent = SchemaTrainingDataAgent(
        db_connection="postgresql://user:pass@localhost:5432/dbname",
        table_list_file="tables.txt",
        business_context="高速公路服务区管理系统",
        output_dir="./output",
        pipeline="full"
    )
    
    report = await agent.generate_training_data()
    print(f"处理完成: {report['summary']}")

asyncio.run(generate_training_data())

生成Question-SQL数据

import asyncio
from schema_tools import QuestionSQLGenerationAgent

async def generate_qs_data():
    agent = QuestionSQLGenerationAgent(
        output_dir="./output",
        table_list_file="tables.txt",
        business_context="高速公路服务区管理系统",
        db_name="highway_db"
    )
    
    report = await agent.generate()
    print(f"生成完成: {report['total_questions']} 个问题")

asyncio.run(generate_qs_data())

验证SQL语句

import asyncio
from schema_tools import SQLValidationAgent

async def validate_sqls():
    agent = SQLValidationAgent(
        db_connection="postgresql://user:pass@localhost:5432/dbname",
        input_file="./qs_highway_db_20240101_143052_pair.json",
        output_dir="./validation_reports"
    )
    
    report = await agent.validate()
    print(f"验证完成: {report['summary']['success_rate']:.1%} 成功率")

asyncio.run(validate_sqls())

输出文件结构

output/
├── bss_car_day_count.ddl         # DDL文件
├── bss_car_day_count_detail.md   # MD文档
├── logs/                         # 日志目录
│   └── schema_tools_20240101_120000.log
├── filename_mapping.txt          # 文件名映射报告
├── qs_highway_db_20240101_143052_pair.json  # Question-SQL训练数据
├── metadata.txt                  # 主题元数据(INSERT语句)
└── validation_reports/           # SQL验证报告
    ├── sql_validation_20240101_150000_report.json
    └── sql_validation_20240101_150000_summary.txt

注意:配置已更新为不再创建ddl/和docs/子目录,所有文件直接放在output目录下。

配置选项

主要配置在 schema_tools/config.py 中:

SCHEMA_TOOLS_CONFIG = {
    # 核心配置
    "output_directory": "training/generated_data",
    "default_pipeline": "full",
    "create_subdirectories": False,       # 不创建子目录
    
    # 数据处理配置
    "sample_data_limit": 20,              # 采样数据量
    "max_concurrent_tables": 3,           # 最大并发数
    
    # Question-SQL生成配置
    "qs_generation": {
        "max_tables": 20,                 # 最大表数量限制
        "theme_count": 5,                 # 生成主题数量
        "questions_per_theme": 10,        # 每主题问题数
        "max_concurrent_themes": 3,       # 并行处理主题数
    },
    
    # SQL验证配置
    "sql_validation": {
        "reuse_connection_pool": True,    # 复用现有连接池
        "max_concurrent_validations": 5,  # 并发验证数
        "validation_timeout": 30,         # 单个验证超时(秒)
        "batch_size": 10,                 # 批处理大小
        "continue_on_error": True,        # 错误时是否继续
        "save_validation_report": True,   # 保存验证报告
        "readonly_mode": True,            # 启用只读模式
    }
}

处理链类型

  • full: 完整处理链(默认)

    • 数据库检查 → 数据采样 → 注释生成 → DDL生成 → MD文档生成
  • ddl_only: 仅生成DDL

    • 数据库检查 → 数据采样 → 注释生成 → DDL生成
  • analysis_only: 仅分析不生成文件

    • 数据库检查 → 数据采样 → 注释生成

Question-SQL生成特性

功能亮点

  • 🔍 自动验证文件完整性
  • 📊 智能提取5个业务分析主题
  • 🤖 每个主题生成10个高质量Question-SQL对
  • 💾 支持中间结果保存和恢复
  • ⚡ 支持并行处理提高效率

限制说明

  • 一次最多处理20个表(可配置)
  • 表数量超限会抛出异常
  • 主题生成失败可跳过继续处理

输出格式

[
  {
    "question": "按服务区统计每日营收趋势(最近30天)?",
    "sql": "SELECT service_name AS 服务区, oper_date AS 营业日期, SUM(pay_sum) AS 每日营收 FROM bss_business_day_data WHERE oper_date >= CURRENT_DATE - INTERVAL '30 day' AND delete_ts IS NULL GROUP BY service_name, oper_date ORDER BY 营业日期 ASC;"
  }
]

SQL验证特性

功能亮点

  • ✅ 使用EXPLAIN语句验证SQL有效性
  • ⚡ 支持并发验证,提升验证效率
  • 🔒 只读模式运行,安全可靠
  • 📊 详细的验证报告和统计信息
  • 🔄 自动重试机制,处理临时网络问题

验证流程

  1. 读取Question-SQL对JSON文件
  2. 提取SQL语句并分批处理
  3. 使用PostgreSQL的EXPLAIN验证语法和表结构
  4. 生成详细验证报告(JSON和文本格式)
  5. 统计成功率和性能指标

报告内容

  • 总体统计(成功率、平均耗时等)
  • 错误详情和重试信息
  • 单个SQL的验证结果
  • 配置和元数据信息

常见问题

Q: 如何处理只读数据库?

A: 工具自动检测并适配只读数据库,不会尝试写操作。SQL验证器专门设计为只读模式。

Q: 如何处理重名表?

A: 自动生成唯一文件名,如 hr__users.ddlsales__users.ddl

Q: 如何跳过某些表?

A: 在表清单文件中注释掉(使用 # 开头)或删除相应行。

Q: LLM调用失败怎么办?

A: 自动重试3次,失败后使用原始注释或默认值。

Q: SQL验证失败率很高怎么办?

A: 检查SQL语法、表名是否正确,使用 --verbose 查看详细错误信息。

注意事项

  1. 数据库权限:至少需要SELECT权限
  2. LLM配置:复用项目的vanna实例配置
  3. 并发控制:默认最大3个表并发,可调整
  4. 内存使用:大表采样会限制数据量
  5. SQL验证:需要对所有相关表有SELECT权限

开发与扩展

添加新工具

  1. 创建工具类: ```python from schema_tools.tools.base import BaseTool, ToolRegistry

@ToolRegistry.register("my_tool") class MyTool(BaseTool):

needs_llm = False
tool_name = "我的工具"

async def execute(self, context):
    # 实现工具逻辑
    return ProcessingResult(success=True)

2. 添加到处理链:
```python
"my_pipeline": [
    "database_inspector",
    "my_tool",
    "ddl_generator"
]

许可证

本工具作为VANNA-CHAINLIT-CHROMADB项目的一部分,遵循项目许可证。