![]() |
1 tydzień temu | |
---|---|---|
.. | ||
analyzers | 1 tydzień temu | |
prompts | 1 tydzień temu | |
tools | 1 tydzień temu | |
utils | 1 tydzień temu | |
validators | 1 tydzień temu | |
README.md | 1 tydzień temu | |
__init__.py | 1 tydzień temu | |
__main__.py | 1 tydzień temu | |
config.py | 1 tydzień temu | |
qs_agent.py | 1 tydzień temu | |
qs_generator.py | 1 tydzień temu | |
schema_workflow_orchestrator.py | 1 tydzień temu | |
sql_validation_agent.py | 1 tydzień temu | |
sql_validation_example.py | 1 tydzień temu | |
sql_validator.py | 1 tydzień temu | |
tables.txt | 1 tydzień temu | |
test_schema_tools.py | 1 tydzień temu | |
training_data_agent.py | 1 tydzień temu | |
workflow_example.py | 1 tydzień temu |
自动化数据库逆向工程工具,用于从PostgreSQL数据库生成vanna.ai格式的训练数据。
pip install asyncpg asyncio
使用新的工作流编排器,一个命令完成所有步骤:
python -m schema_tools.schema_workflow_orchestrator \
--db-connection "postgresql://user:pass@localhost:5432/dbname" \
--table-list tables.txt \
--business-context "高速公路服务区管理系统" \
--db-name highway_db \
--output-dir ./output
import asyncio
from schema_tools.schema_workflow_orchestrator import SchemaWorkflowOrchestrator
async def run_complete_workflow():
orchestrator = SchemaWorkflowOrchestrator(
db_connection="postgresql://user:pass@localhost:5432/dbname",
table_list_file="tables.txt",
business_context="高速公路服务区管理系统",
db_name="highway_db",
output_dir="./output"
)
# 一键执行完整流程
report = await orchestrator.execute_complete_workflow()
if report["success"]:
print(f"✅ 编排完成!最终生成 {report['final_outputs']['final_question_count']} 个问答对")
print(f"📄 输出文件: {report['final_outputs']['primary_output_file']}")
else:
print(f"❌ 编排失败: {report['error']['message']}")
asyncio.run(run_complete_workflow())
工作流编排器特性:
# 跳过SQL验证
python -m schema_tools.schema_workflow_orchestrator \
--db-connection "postgresql://..." --table-list tables.txt \
--business-context "系统" --db-name test_db --skip-validation
# 禁用LLM修复
python -m schema_tools.schema_workflow_orchestrator \
--db-connection "postgresql://..." --table-list tables.txt \
--business-context "系统" --db-name test_db --disable-llm-repair
# 详细日志
python -m schema_tools.schema_workflow_orchestrator \
--db-connection "postgresql://..." --table-list tables.txt \
--business-context "系统" --db-name test_db --verbose
python -m schema_tools \
--db-connection "postgresql://user:pass@localhost:5432/dbname" \
--table-list tables.txt \
--business-context "高速公路服务区管理系统"
python -m schema_tools \
--db-connection "postgresql://user:pass@localhost:5432/dbname" \
--table-list tables.txt \
--business-context "电商系统" \
--output-dir ./output \
--pipeline full
在生成DDL和MD文件后,可以使用新的Question-SQL生成功能:
python -m schema_tools.qs_generator \
--output-dir ./output \
--table-list ./tables.txt \
--business-context "高速公路服务区管理系统" \
--db-name highway_db
这将:
qs_highway_db_时间戳_pair.json
文件在生成Question-SQL对后,可以验证其中的SQL语句:
python -m schema_tools.sql_validator \
--db-connection "postgresql://user:pass@localhost:5432/dbname" \
--input-file ./qs_highway_db_20240101_143052_pair.json \
--output-dir ./validation_reports
这将:
# 基本验证(启用修复和文件修改)
python -m schema_tools.sql_validator \
--db-connection "postgresql://user:pass@localhost:5432/dbname" \
--input-file ./data.json
# 仅生成报告,不修改文件
python -m schema_tools.sql_validator \
--db-connection "postgresql://user:pass@localhost:5432/dbname" \
--input-file ./data.json \
--no-modify-file
# 启用文件修改,但禁用LLM修复(仅删除无效SQL)
python -m schema_tools.sql_validator \
--db-connection "postgresql://user:pass@localhost:5432/dbname" \
--input-file ./data.json \
--disable-llm-repair
# 性能调优参数
python -m schema_tools.sql_validator \
--db-connection "postgresql://user:pass@localhost:5432/dbname" \
--input-file ./data.json \
--max-concurrent 10 \
--batch-size 20 \
--timeout 60 \
--verbose
import asyncio
from schema_tools import SchemaTrainingDataAgent
async def generate_training_data():
agent = SchemaTrainingDataAgent(
db_connection="postgresql://user:pass@localhost:5432/dbname",
table_list_file="tables.txt",
business_context="高速公路服务区管理系统",
output_dir="./output",
pipeline="full"
)
report = await agent.generate_training_data()
print(f"处理完成: {report['summary']}")
asyncio.run(generate_training_data())
import asyncio
from schema_tools import QuestionSQLGenerationAgent
async def generate_qs_data():
agent = QuestionSQLGenerationAgent(
output_dir="./output",
table_list_file="tables.txt",
business_context="高速公路服务区管理系统",
db_name="highway_db"
)
report = await agent.generate()
print(f"生成完成: {report['total_questions']} 个问题")
asyncio.run(generate_qs_data())
import asyncio
from schema_tools import SQLValidationAgent
async def validate_sqls():
agent = SQLValidationAgent(
db_connection="postgresql://user:pass@localhost:5432/dbname",
input_file="./qs_highway_db_20240101_143052_pair.json",
output_dir="./validation_reports"
)
report = await agent.validate()
print(f"验证完成: {report['summary']['success_rate']:.1%} 成功率")
asyncio.run(validate_sqls())
output/
├── bss_car_day_count.ddl # DDL文件
├── bss_car_day_count_detail.md # MD文档
├── logs/ # 日志目录
│ └── schema_tools_20240101_120000.log
├── filename_mapping.txt # 文件名映射报告
├── qs_highway_db_20240101_143052_pair.json # Question-SQL训练数据
├── metadata.txt # 主题元数据(INSERT语句)
└── validation_reports/ # SQL验证报告
├── sql_validation_20240101_150000_report.json
└── sql_validation_20240101_150000_summary.txt
注意:配置已更新为不再创建ddl/和docs/子目录,所有文件直接放在output目录下。
主要配置在 schema_tools/config.py
中:
SCHEMA_TOOLS_CONFIG = {
# 核心配置
"output_directory": "training/generated_data",
"default_pipeline": "full",
"create_subdirectories": False, # 不创建子目录
# 数据处理配置
"sample_data_limit": 20, # 采样数据量
"max_concurrent_tables": 3, # 最大并发数
# Question-SQL生成配置
"qs_generation": {
"max_tables": 20, # 最大表数量限制
"theme_count": 5, # 生成主题数量
"questions_per_theme": 10, # 每主题问题数
"max_concurrent_themes": 3, # 并行处理主题数
},
# SQL验证配置
"sql_validation": {
"reuse_connection_pool": True, # 复用现有连接池
"max_concurrent_validations": 5, # 并发验证数
"validation_timeout": 30, # 单个验证超时(秒)
"batch_size": 10, # 批处理大小
"continue_on_error": True, # 错误时是否继续
"save_validation_report": True, # 保存验证报告
"readonly_mode": True, # 启用只读模式
}
}
full: 完整处理链(默认)
ddl_only: 仅生成DDL
analysis_only: 仅分析不生成文件
[
{
"question": "按服务区统计每日营收趋势(最近30天)?",
"sql": "SELECT service_name AS 服务区, oper_date AS 营业日期, SUM(pay_sum) AS 每日营收 FROM bss_business_day_data WHERE oper_date >= CURRENT_DATE - INTERVAL '30 day' AND delete_ts IS NULL GROUP BY service_name, oper_date ORDER BY 营业日期 ASC;"
}
]
A: 工具自动检测并适配只读数据库,不会尝试写操作。SQL验证器专门设计为只读模式。
A: 自动生成唯一文件名,如 hr__users.ddl
和 sales__users.ddl
。
A: 在表清单文件中注释掉(使用 # 开头)或删除相应行。
A: 自动重试3次,失败后使用原始注释或默认值。
A: 检查SQL语法、表名是否正确,使用 --verbose
查看详细错误信息。
@ToolRegistry.register("my_tool") class MyTool(BaseTool):
needs_llm = False
tool_name = "我的工具"
async def execute(self, context):
# 实现工具逻辑
return ProcessingResult(success=True)
2. 添加到处理链:
```python
"my_pipeline": [
"database_inspector",
"my_tool",
"ddl_generator"
]
本工具作为VANNA-CHAINLIT-CHROMADB项目的一部分,遵循项目许可证。