|  wangxq
				
				bb0b2a4687
				修复data_pipeline单实例的数据库连接问题,增加上传数据文件到task目录的API. | 3 ay önce | |
|---|---|---|
| .. | ||
| analyzers | 3 ay önce | |
| api | 3 ay önce | |
| ddl_generation | 3 ay önce | |
| dp_logging | 3 ay önce | |
| prompts | 3 ay önce | |
| qa_generation | 3 ay önce | |
| sql | 3 ay önce | |
| tools | 3 ay önce | |
| trainer | 3 ay önce | |
| training_data | 3 ay önce | |
| utils | 3 ay önce | |
| validators | 3 ay önce | |
| README.md | 3 ay önce | |
| __init__.py | 3 ay önce | |
| config.py | 3 ay önce | |
| metadata_only_generator.py | 3 ay önce | |
| schema_workflow.py | 3 ay önce | |
| tables.txt | 3 ay önce | |
| task_executor.py | 3 ay önce | |
自动化数据库逆向工程工具,用于从PostgreSQL数据库生成vanna.ai格式的训练数据。
pip install asyncpg asyncio
使用新的工作流编排器,一个命令完成所有步骤:
python -m schema_tools.schema_workflow \
  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
  --table-list tables.txt \
  --business-context "高速公路服务区管理系统" \
  --db-name highway_db \
  --output-dir ./output
import asyncio
from schema_tools.schema_workflow import SchemaWorkflowOrchestrator
async def run_complete_workflow():
    orchestrator = SchemaWorkflowOrchestrator(
        db_connection="postgresql://user:pass@localhost:5432/dbname",
        table_list_file="tables.txt",
        business_context="高速公路服务区管理系统",
        db_name="highway_db",
        output_dir="./output"
    )
    
    # 一键执行完整流程
    report = await orchestrator.execute_complete_workflow()
    
    if report["success"]:
        print(f"✅ 编排完成!最终生成 {report['final_outputs']['final_question_count']} 个问答对")
        print(f"📄 输出文件: {report['final_outputs']['primary_output_file']}")
    else:
        print(f"❌ 编排失败: {report['error']['message']}")
asyncio.run(run_complete_workflow())
工作流编排器特性:
# 跳过SQL验证
python -m schema_tools.schema_workflow \
  --db-connection "postgresql://..." --table-list tables.txt \
  --business-context "系统" --db-name test_db --skip-validation
# 禁用LLM修复
python -m schema_tools.schema_workflow \
  --db-connection "postgresql://..." --table-list tables.txt \
  --business-context "系统" --db-name test_db --disable-llm-repair
# 详细日志
python -m schema_tools.schema_workflow \
  --db-connection "postgresql://..." --table-list tables.txt \
  --business-context "系统" --db-name test_db --verbose
python -m schema_tools \
  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
  --table-list tables.txt \
  --business-context "高速公路服务区管理系统"
python -m schema_tools \
  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
  --table-list tables.txt \
  --business-context "电商系统" \
  --output-dir ./output \
  --pipeline full
在生成DDL和MD文件后,可以使用新的Question-SQL生成功能:
python -m schema_tools.qs_generator \
  --output-dir ./output \
  --table-list ./tables.txt \
  --business-context "高速公路服务区管理系统" \
  --db-name highway_db
这将:
qs_highway_db_时间戳_pair.json 文件在生成Question-SQL对后,可以验证其中的SQL语句:
python -m schema_tools.sql_validator \
  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
  --input-file ./qs_highway_db_20240101_143052_pair.json \
  --output-dir ./validation_reports
这将:
# 基本验证(启用修复和文件修改)
python -m schema_tools.sql_validator \
  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
  --input-file ./data.json
# 仅生成报告,不修改文件
python -m schema_tools.sql_validator \
  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
  --input-file ./data.json \
  --no-modify-file
# 启用文件修改,但禁用LLM修复(仅删除无效SQL)
python -m schema_tools.sql_validator \
  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
  --input-file ./data.json \
  --disable-llm-repair
# 性能调优参数
python -m schema_tools.sql_validator \
  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
  --input-file ./data.json \
  --max-concurrent 10 \
  --batch-size 20 \
  --timeout 60 \
  --verbose
import asyncio
from schema_tools import SchemaTrainingDataAgent
async def generate_training_data():
    agent = SchemaTrainingDataAgent(
        db_connection="postgresql://user:pass@localhost:5432/dbname",
        table_list_file="tables.txt",
        business_context="高速公路服务区管理系统",
        output_dir="./output",
        pipeline="full"
    )
    
    report = await agent.generate_training_data()
    print(f"处理完成: {report['summary']}")
asyncio.run(generate_training_data())
import asyncio
from schema_tools import QuestionSQLGenerationAgent
async def generate_qs_data():
    agent = QuestionSQLGenerationAgent(
        output_dir="./output",
        table_list_file="tables.txt",
        business_context="高速公路服务区管理系统",
        db_name="highway_db"
    )
    
    report = await agent.generate()
    print(f"生成完成: {report['total_questions']} 个问题")
asyncio.run(generate_qs_data())
import asyncio
from schema_tools import SQLValidationAgent
async def validate_sqls():
    agent = SQLValidationAgent(
        db_connection="postgresql://user:pass@localhost:5432/dbname",
        input_file="./qs_highway_db_20240101_143052_pair.json",
        output_dir="./validation_reports"
    )
    
    report = await agent.validate()
    print(f"验证完成: {report['summary']['success_rate']:.1%} 成功率")
asyncio.run(validate_sqls())
output/
├── bss_car_day_count.ddl         # DDL文件
├── bss_car_day_count_detail.md   # MD文档
├── logs/                         # 日志目录
│   └── schema_tools_20240101_120000.log
├── filename_mapping.txt          # 文件名映射报告
├── qs_highway_db_20240101_143052_pair.json  # Question-SQL训练数据
├── metadata.txt                  # 主题元数据(INSERT语句)
└── validation_reports/           # SQL验证报告
    ├── sql_validation_20240101_150000_report.json
    └── sql_validation_20240101_150000_summary.txt
注意:配置已更新为不再创建ddl/和docs/子目录,所有文件直接放在output目录下。
主要配置在 schema_tools/config.py 中:
SCHEMA_TOOLS_CONFIG = {
    # 核心配置
    "output_directory": "training/generated_data",
    "default_pipeline": "full",
    "create_subdirectories": False,       # 不创建子目录
    
    # 数据处理配置
    "sample_data_limit": 20,              # 采样数据量
    "max_concurrent_tables": 3,           # 最大并发数
    
    # Question-SQL生成配置
    "qs_generation": {
        "max_tables": 20,                 # 最大表数量限制
        "theme_count": 5,                 # 生成主题数量
        "questions_per_theme": 10,        # 每主题问题数
        "max_concurrent_themes": 3,       # 并行处理主题数
    },
    
    # SQL验证配置
    "sql_validation": {
        "reuse_connection_pool": True,    # 复用现有连接池
        "max_concurrent_validations": 5,  # 并发验证数
        "validation_timeout": 30,         # 单个验证超时(秒)
        "batch_size": 10,                 # 批处理大小
        "continue_on_error": True,        # 错误时是否继续
        "save_validation_report": True,   # 保存验证报告
        "readonly_mode": True,            # 启用只读模式
    }
}
full: 完整处理链(默认)
ddl_only: 仅生成DDL
analysis_only: 仅分析不生成文件
[
  {
    "question": "按服务区统计每日营收趋势(最近30天)?",
    "sql": "SELECT service_name AS 服务区, oper_date AS 营业日期, SUM(pay_sum) AS 每日营收 FROM bss_business_day_data WHERE oper_date >= CURRENT_DATE - INTERVAL '30 day' AND delete_ts IS NULL GROUP BY service_name, oper_date ORDER BY 营业日期 ASC;"
  }
]
A: 工具自动检测并适配只读数据库,不会尝试写操作。SQL验证器专门设计为只读模式。
A: 自动生成唯一文件名,如 hr__users.ddl 和 sales__users.ddl。
A: 在表清单文件中注释掉(使用 # 开头)或删除相应行。
A: 自动重试3次,失败后使用原始注释或默认值。
A: 检查SQL语法、表名是否正确,使用 --verbose 查看详细错误信息。
@ToolRegistry.register("my_tool") class MyTool(BaseTool):
needs_llm = False
tool_name = "我的工具"
async def execute(self, context):
    # 实现工具逻辑
    return ProcessingResult(success=True)
2. 添加到处理链:
```python
"my_pipeline": [
    "database_inspector",
    "my_tool",
    "ddl_generator"
]
本工具作为VANNA-CHAINLIT-CHROMADB项目的一部分,遵循项目许可证。