Просмотр исходного кода

增加SchemaWorkflowOrchestrator,可以端到端执行整个过程。

wangxq 1 неделя назад
Родитель
Сommit
127a606a1e

+ 326 - 43
docs/Schema Tools 使用说明.md

@@ -4,10 +4,11 @@
 
 1. [功能概述](#1-功能概述)
 2. [安装与配置](#2-安装与配置)
-3. [生成DDL和MD文档](#3-生成ddl和md文档)
-4. [生成Question-SQL训练数据](#4-生成question-sql训练数据)
-5. [配置详解](#5-配置详解)
-6. [常见问题](#6-常见问题)
+3. [一键执行完整工作流(推荐)](#3-一键执行完整工作流推荐)
+4. [生成DDL和MD文档](#4-生成ddl和md文档)
+5. [生成Question-SQL训练数据](#5-生成question-sql训练数据)
+6. [配置详解](#6-配置详解)
+7. [常见问题](#7-常见问题)
 
 ## 1. 功能概述
 
@@ -26,6 +27,12 @@ Schema Tools 提供两个主要功能:
 - 为每个主题生成高质量的Question-SQL对
 - 支持中断恢复和并行处理
 
+### 1.3 一键工作流编排器(推荐)
+- 端到端自动化执行完整流程
+- DDL/MD生成 → Question-SQL生成 → SQL验证修复
+- 详细的执行报告和性能指标
+- 支持灵活配置和错误恢复
+
 ## 2. 安装与配置
 
 ### 2.1 依赖安装
@@ -38,9 +45,163 @@ pip install asyncpg asyncio
 
 Schema Tools 使用项目现有的 LLM 配置,无需额外配置数据库连接。
 
-## 3. 生成DDL和MD文档
+## 3. 一键执行完整工作流(推荐)
+
+### 3.1 工作流编排器概述
+
+`SchemaWorkflowOrchestrator` 是 Schema Tools 的核心组件,提供端到端的自动化处理流程:
+
+1. **DDL和MD文档生成** - 连接数据库,生成表结构文档
+2. **Question-SQL对生成** - 基于文档生成训练数据
+3. **SQL验证和修复** - 验证SQL有效性并自动修复错误
+
+### 3.2 命令行使用
+
+#### 基本使用(完整工作流)
+```bash
+python -m schema_tools.schema_workflow_orchestrator \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --table-list tables.txt \
+  --business-context "高速公路服务区管理系统" \
+  --db-name highway_db \
+  --output-dir ./output
+```
+
+#### 跳过SQL验证
+```bash
+python -m schema_tools.schema_workflow_orchestrator \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --table-list tables.txt \
+  --business-context "电商系统" \
+  --db-name ecommerce_db \
+  --skip-validation
+```
 
-### 3.1 命令格式
+#### 禁用LLM修复
+```bash
+python -m schema_tools.schema_workflow_orchestrator \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --table-list tables.txt \
+  --business-context "管理系统" \
+  --db-name management_db \
+  --disable-llm-repair
+```
+
+#### 不修改原始文件(仅生成报告)
+```bash
+python -m schema_tools.schema_workflow_orchestrator \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --table-list tables.txt \
+  --business-context "业务系统" \
+  --db-name business_db \
+  --no-modify-file
+```
+
+### 3.3 编程方式使用
+
+```python
+import asyncio
+from schema_tools.schema_workflow_orchestrator import SchemaWorkflowOrchestrator
+
+async def run_complete_workflow():
+    # 创建工作流编排器
+    orchestrator = SchemaWorkflowOrchestrator(
+        db_connection="postgresql://user:pass@localhost:5432/dbname",
+        table_list_file="tables.txt",
+        business_context="高速公路服务区管理系统",
+        db_name="highway_db",
+        output_dir="./output",
+        enable_sql_validation=True,      # 启用SQL验证
+        enable_llm_repair=True,          # 启用LLM修复
+        modify_original_file=True        # 修改原始JSON文件
+    )
+    
+    # 执行完整工作流程
+    report = await orchestrator.execute_complete_workflow()
+    
+    # 处理结果
+    if report["success"]:
+        print(f"✅ 工作流程执行成功!")
+        print(f"📄 最终输出文件: {report['final_outputs']['primary_output_file']}")
+        print(f"❓ 最终问题数量: {report['final_outputs']['final_question_count']}")
+        print(f"⏱️  总耗时: {report['performance_metrics']['total_duration']} 秒")
+    else:
+        print(f"❌ 工作流程执行失败: {report['error']['message']}")
+        print(f"💥 失败步骤: {report['error']['failed_step']}")
+
+# 运行工作流程
+asyncio.run(run_complete_workflow())
+```
+
+### 3.4 工作流参数说明
+
+| 参数 | 说明 | 默认值 |
+|------|------|--------|
+| `--db-connection` | 数据库连接字符串 | 必需 |
+| `--table-list` | 表清单文件路径 | 必需 |
+| `--business-context` | 业务上下文描述 | 必需 |
+| `--db-name` | 数据库名称(用于文件命名) | 必需 |
+| `--output-dir` | 输出目录 | `./output` |
+| `--skip-validation` | 跳过SQL验证步骤 | `False`(默认执行SQL验证) |
+| `--disable-llm-repair` | 禁用LLM修复功能 | `False`(默认启用LLM修复) |
+| `--no-modify-file` | 不修改原始JSON文件 | `False`(默认修改原文件) |
+| `--verbose` | 启用详细日志 | `False` |
+| `--log-file` | 日志文件路径 | 无 |
+
+### 3.5 工作流执行报告
+
+工作流编排器会生成详细的执行报告,包括:
+
+```python
+{
+    "success": True,
+    "workflow_summary": {
+        "total_duration": 245.67,
+        "completed_steps": ["ddl_md_generation", "question_sql_generation", "sql_validation"],
+        "total_steps": 3
+    },
+    "processing_results": {
+        "ddl_md_generation": {
+            "total_tables": 8,
+            "processed_successfully": 8,
+            "duration": 89.23
+        },
+        "question_sql_generation": {
+            "total_questions": 50,
+            "total_themes": 5,
+            "duration": 123.45
+        },
+        "sql_validation": {
+            "success_rate": 0.94,
+            "valid_sql_count": 47,
+            "invalid_sql_count": 3,
+            "duration": 32.99
+        }
+    },
+    "final_outputs": {
+        "primary_output_file": "./output/qs_highway_db_20240123_143052_pair.json",
+        "final_question_count": 47
+    },
+    "performance_metrics": {
+        "step1_duration": 89.23,
+        "step2_duration": 123.45,
+        "step3_duration": 32.99,
+        "total_duration": 245.67
+    }
+}
+```
+
+### 3.6 优势特性
+
+- **自动化流程**:一个命令完成所有步骤
+- **错误恢复**:失败时保留已完成步骤的输出
+- **灵活配置**:可选择跳过验证、禁用修复等
+- **详细报告**:提供完整的执行状态和性能指标
+- **向后兼容**:支持所有现有参数和配置
+
+## 4. 生成DDL和MD文档(分步执行)
+
+### 4.1 命令格式
 
 ```bash
 python -m schema_tools \
@@ -50,7 +211,7 @@ python -m schema_tools \
   [可选参数]
 ```
 
-### 3.2 必需参数说明
+### 4.2 必需参数说明
 
 | 参数 | 说明 | 示例 |
 |------|------|------|
@@ -58,25 +219,25 @@ python -m schema_tools \
 | `--table-list` | 表清单文件路径 | `./tables.txt` |
 | `--business-context` | 业务上下文描述 | `"高速公路服务区管理系统"` |
 
-### 3.3 可选参数说明
+### 4.3 可选参数说明
 
 | 参数 | 说明 | 默认值 |
 |------|------|--------|
 | `--output-dir` | 输出目录路径 | `training/generated_data` |
 | `--pipeline` | 处理链类型 | `full` |
-| `--max-concurrent` | 最大并发表数量 | `3` |
+| `--max-concurrent` | 最大并发表数量 | `1` |
 | `--verbose` | 启用详细日志 | `False` |
 | `--log-file` | 日志文件路径 | `无` |
 | `--no-filter-system-tables` | 禁用系统表过滤 | `False` |
 | `--check-permissions-only` | 仅检查数据库权限 | `False` |
 
-### 3.4 处理链类型
+### 4.4 处理链类型
 
 - **full**: 完整处理链(默认)- 生成DDL和MD文档
 - **ddl_only**: 仅生成DDL文件
 - **analysis_only**: 仅分析不生成文件
 
-### 3.5 使用示例
+### 4.5 使用示例
 
 #### 基本使用
 ```bash
@@ -112,7 +273,7 @@ python -m schema_tools \
   --check-permissions-only
 ```
 
-### 3.6 表清单文件格式
+### 4.6 表清单文件格式
 
 创建一个文本文件(如 `tables.txt`),每行一个表名:
 
@@ -124,7 +285,7 @@ bss_car_day_count  # 默认为public schema
 hr.employees       # 指定schema
 ```
 
-### 3.7 输出文件
+### 4.7 输出文件
 
 生成的文件都放在输出目录下(不创建子目录):
 
@@ -139,13 +300,13 @@ output/
     └── schema_tools_20240123.log
 ```
 
-## 4. 生成Question-SQL训练数据
+## 5. 生成Question-SQL训练数据(分步执行)
 
-### 4.1 前置条件
+### 5.1 前置条件
 
 必须先执行DDL和MD文档生成,确保输出目录中有完整的DDL和MD文件。
 
-### 4.2 命令格式
+### 5.2 命令格式
 
 ```bash
 python -m schema_tools.qs_generator \
@@ -155,7 +316,7 @@ python -m schema_tools.qs_generator \
   [可选参数]
 ```
 
-### 4.3 必需参数说明
+### 5.3 必需参数说明
 
 | 参数 | 说明 | 示例 |
 |------|------|------|
@@ -163,7 +324,7 @@ python -m schema_tools.qs_generator \
 | `--table-list` | 表清单文件路径(用于验证) | `./tables.txt` |
 | `--business-context` | 业务上下文描述 | `"高速公路服务区管理系统"` |
 
-### 4.4 可选参数说明
+### 5.4 可选参数说明
 
 | 参数 | 说明 | 默认值 |
 |------|------|--------|
@@ -171,7 +332,7 @@ python -m schema_tools.qs_generator \
 | `--verbose` | 启用详细日志 | `False` |
 | `--log-file` | 日志文件路径 | `无` |
 
-### 4.5 使用示例
+### 5.5 使用示例
 
 #### 基本使用
 ```bash
@@ -192,24 +353,86 @@ python -m schema_tools.qs_generator \
   --verbose
 ```
 
-### 4.6 执行流程
+### 5.6 SQL验证和修复
+
+生成Question-SQL对后,可以使用SQL验证功能。**注意:命令行使用时,默认启用LLM修复和文件修改功能**。
+
+```bash
+python -m schema_tools.sql_validator \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --input-file ./qs_highway_db_20240123_143052_pair.json \
+  --output-dir ./validation_reports
+```
+
+#### SQL验证参数说明
+
+| 参数 | 说明 | 默认值 |
+|------|------|--------|
+| `--db-connection` | 数据库连接字符串 | 必需 |
+| `--input-file` | Question-SQL文件路径 | 必需 |
+| `--output-dir` | 验证报告输出目录 | 输入文件同目录 |
+| `--disable-llm-repair` | 禁用LLM修复功能 | `False`(默认启用修复) |
+| `--no-modify-file` | 不修改原始JSON文件 | `False`(默认修改原文件) |
+| `--max-concurrent` | 最大并发验证数 | `5` |
+| `--batch-size` | 批处理大小 | `10` |
+| `--timeout` | 单个验证超时时间(秒) | `30` |
+| `--verbose` | 启用详细日志 | `False` |
+| `--dry-run` | 仅解析文件不执行验证 | `False` |
+| `--save-json` | 保存详细JSON报告 | `False` |
+
+#### SQL验证使用示例
+
+```bash
+# 基本验证(默认:启用LLM修复和文件修改)
+python -m schema_tools.sql_validator \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --input-file ./data.json
+
+# 仅生成报告,不修改文件
+python -m schema_tools.sql_validator \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --input-file ./data.json \
+  --no-modify-file
+
+# 启用文件修改,但禁用LLM修复(仅删除无效SQL)
+python -m schema_tools.sql_validator \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --input-file ./data.json \
+  --disable-llm-repair
+
+# 性能调优参数
+python -m schema_tools.sql_validator \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --input-file ./data.json \
+  --max-concurrent 10 \
+  --batch-size 20 \
+  --timeout 60 \
+  --verbose
+```
+
+### 5.7 执行流程
 
 1. **文件验证**:检查DDL和MD文件数量是否正确
 2. **表数量限制**:最多处理20个表(可配置)
 3. **主题提取**:LLM分析表结构,提取5个业务分析主题
 4. **Question-SQL生成**:每个主题生成10个问题
 5. **结果保存**:输出到 `qs_<db_name>_<时间戳>_pair.json`
+6. **SQL验证**:验证生成的SQL语句有效性
+7. **自动修复**:使用LLM修复无效的SQL语句(可选)
 
-### 4.7 输出文件
+### 5.8 输出文件
 
 ```
 output/
 ├── qs_highway_db_20240123_143052_pair.json  # 最终结果
+├── qs_highway_db_20240123_143052_pair.json.backup  # 原文件备份(如果启用文件修改)
 ├── qs_intermediate_20240123_143052.json     # 中间结果(成功后自动删除)
-└── qs_recovery_20240123_143052.json         # 恢复文件(异常中断时生成)
+├── qs_recovery_20240123_143052.json         # 恢复文件(异常中断时生成)
+├── sql_validation_20240123_150000_summary.txt  # SQL验证报告
+└── file_modifications_20240123_150000.log  # 文件修改日志(如果启用文件修改)
 ```
 
-### 4.8 输出格式示例
+### 5.9 输出格式示例
 
 ```json
 [
@@ -224,9 +447,9 @@ output/
 ]
 ```
 
-## 5. 配置详解
+## 6. 配置详解
 
-### 5.1 主要配置项
+### 6.1 主要配置项
 
 配置文件位于 `schema_tools/config.py`:
 
@@ -234,8 +457,9 @@ output/
 # DDL/MD生成相关配置
 "output_directory": "training/generated_data",     # 输出目录
 "create_subdirectories": False,                    # 不创建子目录
-"max_concurrent_tables": 3,                        # 最大并发数
+"max_concurrent_tables": 1,                        # 最大并发数(避免LLM并发问题)
 "sample_data_limit": 20,                          # 数据采样量
+"large_table_threshold": 1000000,                 # 大表阈值(100万行)
 "filter_system_tables": True,                      # 过滤系统表
 "continue_on_error": True,                         # 错误后继续
 
@@ -244,19 +468,29 @@ output/
     "max_tables": 20,                             # 最大表数量限制
     "theme_count": 5,                             # 主题数量
     "questions_per_theme": 10,                    # 每主题问题数
-    "max_concurrent_themes": 3,                   # 并行主题数
+    "max_concurrent_themes": 1,                   # 并行主题数(避免LLM并发问题)
     "continue_on_theme_error": True,              # 主题失败继续
     "save_intermediate": True,                    # 保存中间结果
 }
+
+# SQL验证配置
+"sql_validation": {
+    "max_concurrent_validations": 5,              # 并发验证数
+    "validation_timeout": 30,                     # 单个验证超时(秒)
+    "batch_size": 10,                             # 批处理大小
+    "enable_sql_repair": False,                   # SQL修复功能(命令行覆盖为True)
+    "modify_original_file": False,                # 文件修改功能(命令行覆盖为True)
+    "readonly_mode": True,                        # 启用只读模式
+}
 ```
 
-### 5.2 修改配置
+### 6.2 修改配置
 
 可以通过编辑 `schema_tools/config.py` 文件来修改默认配置。
 
-## 6. 常见问题
+## 7. 常见问题
 
-### 6.1 表数量超过20个怎么办?
+### 7.1 表数量超过20个怎么办?
 
 **错误信息**:
 ```
@@ -267,7 +501,7 @@ output/
 1. 分批处理:将表清单分成多个文件,每个不超过20个表
 2. 修改配置:在 `config.py` 中增加 `max_tables` 限制
 
-### 6.2 DDL和MD文件数量不一致
+### 7.2 DDL和MD文件数量不一致
 
 **错误信息**:
 ```
@@ -279,7 +513,7 @@ DDL文件数量(5)与表数量(6)不一致
 2. 查看日志文件找出失败的表
 3. 重新运行DDL/MD生成
 
-### 6.3 LLM调用失败
+### 7.3 LLM调用失败
 
 **可能原因**:
 - 网络连接问题
@@ -291,7 +525,7 @@ DDL文件数量(5)与表数量(6)不一致
 2. 查看中间结果文件,从断点继续
 3. 减少表数量或分批处理
 
-### 6.4 权限不足
+### 7.4 权限不足
 
 **错误信息**:
 ```
@@ -303,22 +537,59 @@ DDL文件数量(5)与表数量(6)不一致
 2. 确保数据库用户有SELECT权限
 3. Schema Tools支持只读数据库
 
-### 6.5 如何处理大表?
+### 7.5 如何处理大表?
 
 Schema Tools会自动检测大表(超过100万行)并使用智能采样策略:
-- 前N行 + 随机中间行 + 后N行
-- 确保采样数据的代表性
+- **前N行采样**:使用 `SELECT * FROM table LIMIT N` 获取前N行
+- **随机中间采样**:使用 `TABLESAMPLE SYSTEM` 进行随机采样(失败时回退到OFFSET采样)
+- **后N行采样**:使用ROW_NUMBER窗口函数获取最后N行
+- 三段采样确保数据的代表性,有效处理大表的多样性
 
-### 6.6 生成的SQL语法错误
+**大表阈值**:默认为100万行(可在config.py中修改`large_table_threshold`)
+
+### 7.6 生成的SQL语法错误
 
 目前生成的SQL使用PostgreSQL语法。如果需要其他数据库语法:
 1. 在业务上下文中明确指定目标数据库
 2. 未来版本将支持MySQL等其他数据库
 
-## 7. 最佳实践
+### 7.7 工作流编排器相关问题
+
+**Q: 工作流中途失败,如何恢复?**
+A: 工作流编排器会保留已完成步骤的输出文件,可以手动从失败步骤开始重新执行。
+
+**Q: 如何只执行部分步骤?**
+A: 使用 `--skip-validation` 跳过SQL验证,或使用分步执行方式调用各个模块。
+
+**Q: 工作流执行时间过长怎么办?**
+A: 可以通过减少表数量、调整并发参数、或分批处理来优化执行时间。
+
+### 7.8 SQL验证器默认行为说明
 
-### 7.1 工作流程建议
+**重要**:SQL验证器的命令行模式与配置文件中的默认值不同:
 
+- **配置文件默认**:`enable_sql_repair=False`, `modify_original_file=False`
+- **命令行默认**:启用LLM修复和文件修改功能
+- **原因**:命令行使用时通常期望完整的修复功能,而配置文件提供保守的默认值
+
+如需禁用,请明确使用 `--disable-llm-repair` 或 `--no-modify-file` 参数。
+
+## 8. 最佳实践
+
+### 8.1 推荐工作流程
+
+**方式一:一键执行(推荐)**
+```bash
+# 完整工作流程,一个命令搞定
+python -m schema_tools.schema_workflow_orchestrator \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --table-list tables.txt \
+  --business-context "高速公路服务区管理系统" \
+  --db-name highway_db \
+  --output-dir ./output
+```
+
+**方式二:分步执行(调试时使用)**
 1. **第一步**:生成DDL和MD文档
    ```bash
    python -m schema_tools --db-connection "..." --table-list tables.txt --business-context "..." --output-dir ./output
@@ -334,20 +605,32 @@ Schema Tools会自动检测大表(超过100万行)并使用智能采样策
    python -m schema_tools.qs_generator --output-dir ./output --table-list tables.txt --business-context "..."
    ```
 
-### 7.2 表清单组织
+4. **第四步**:验证SQL(可选)
+   ```bash
+   python -m schema_tools.sql_validator --db-connection "..." --input-file ./qs_xxx.json
+   ```
+
+### 8.2 表清单组织
 
 - 按业务模块分组
 - 每组不超过15-20个表
 - 使用注释说明每组的用途
 
-### 7.3 业务上下文优化
+### 8.3 业务上下文优化
 
 - 提供准确的业务背景描述
 - 包含行业特定术语
 - 说明主要业务流程
 
-### 7.4 输出文件管理
+### 8.4 输出文件管理
 
 - 定期备份生成的文件
 - 使用版本控制管理DDL文件
-- 保留中间结果用于调试 
+- 保留中间结果用于调试
+
+### 8.5 工作流编排器最佳实践
+
+- **首次使用**:建议启用详细日志(`--verbose`)观察执行过程
+- **生产环境**:使用默认参数,启用SQL验证和修复
+- **调试阶段**:可以使用 `--skip-validation` 跳过验证步骤加快执行
+- **质量要求高**:使用 `--no-modify-file` 仅生成报告,手动审查后再决定是否修改 

+ 106 - 21
docs/Schema Tools 系统概要设计说明书.md

@@ -28,6 +28,9 @@ schema_tools/                    # 独立的schema工具模块
 ├── training_data_agent.py      # 主AI Agent
 ├── qs_agent.py                 # Question-SQL生成Agent (新增)
 ├── qs_generator.py             # Question-SQL命令行入口 (新增)
+├── sql_validation_agent.py     # SQL验证Agent (新增)
+├── sql_validator.py            # SQL验证命令行入口 (新增)
+├── schema_workflow_orchestrator.py  # 端到端工作流编排器 (新增)
 ├── tools/                      # Agent工具集
 │   ├── __init__.py
 │   ├── base.py                 # 基础工具类和注册机制
@@ -38,7 +41,8 @@ schema_tools/                    # 独立的schema工具模块
 │   └── doc_generator.py        # MD文档生成工具
 ├── validators/                 # 验证器模块 (新增)
 │   ├── __init__.py
-│   └── file_count_validator.py # 文件数量验证器
+│   ├── file_count_validator.py # 文件数量验证器
+│   └── sql_validator.py        # SQL验证器核心模块
 ├── analyzers/                  # 分析器模块 (新增)
 │   ├── __init__.py
 │   ├── md_analyzer.py          # MD文件分析器
@@ -71,7 +75,19 @@ schema_tools/                    # 独立的schema工具模块
 - **职责**: 生成Question-SQL训练数据对
 - **特点**: 独立的功能模块,可在DDL/MD生成后单独执行
 
-#### 2.2.3 Agent工具集(基于装饰器注册)
+#### 2.2.3 SQL验证Agent(新增)
+
+- **类名**: `SQLValidationAgent`
+- **职责**: 验证Question-SQL对中的SQL语句有效性,自动修复错误SQL
+- **特点**: 支持并发验证、LLM自动修复、原文件自动修改
+
+#### 2.2.4 工作流编排器(新增)
+
+- **类名**: `SchemaWorkflowOrchestrator`
+- **职责**: 端到端执行完整的Schema处理流程
+- **特点**: 统一管理DDL/MD生成、Question-SQL生成、SQL验证三个步骤
+
+#### 2.2.5 Agent工具集(基于装饰器注册)
 
 1. **DatabaseInspectorTool**: 获取表元数据
 2. **DataSamplerTool**: 采样表数据
@@ -79,11 +95,12 @@ schema_tools/                    # 独立的schema工具模块
 4. **DDLGeneratorTool**: 生成DDL格式文件
 5. **DocGeneratorTool**: 生成MD文档
 
-#### 2.2.4 验证器和分析器(新增)
+#### 2.2.6 验证器和分析器(新增)
 
 1. **FileCountValidator**: 验证DDL和MD文件数量
-2. **MDFileAnalyzer**: 读取和分析MD文件内容
-3. **ThemeExtractor**: 使用LLM提取业务分析主题
+2. **SQLValidator**: 验证SQL语句有效性,支持LLM自动修复
+3. **MDFileAnalyzer**: 读取和分析MD文件内容
+4. **ThemeExtractor**: 使用LLM提取业务分析主题
 
 ## 3. 详细设计
 
@@ -117,9 +134,52 @@ graph TD
     I --> J[完成]
 ```
 
-### 3.2 模块间接口规范
+### 3.3 SQL验证和修复流程(新增)
+
+```mermaid
+graph TD
+    A[开始] --> B[读取Question-SQL文件]
+    B --> C[提取SQL语句]
+    C --> D[批量验证SQL]
+    D --> E{有失败的SQL?}
+    E -->|否| F[生成验证报告]
+    E -->|是| G{启用LLM修复?}
+    G -->|否| H[仅生成报告]
+    G -->|是| I[LLM修复失败SQL]
+    I --> J[重新验证修复后SQL]
+    J --> K{启用文件修改?}
+    K -->|否| F
+    K -->|是| L[创建备份文件]
+    L --> M[更新原文件]
+    M --> N[生成修改日志]
+    N --> F
+    F --> O[完成]
+    H --> F
+```
 
-#### 3.2.1 统一数据结构定义
+### 3.4 端到端工作流编排流程(新增)
+
+```mermaid
+graph TD
+    A[开始] --> B[步骤1: DDL/MD生成]
+    B --> C{成功?}
+    C -->|否| D[生成错误报告]
+    C -->|是| E[步骤2: Question-SQL生成]
+    E --> F{成功?}
+    F -->|否| D
+    F -->|是| G{启用SQL验证?}
+    G -->|否| H[生成最终报告]
+    G -->|是| I[步骤3: SQL验证和修复]
+    I --> J{成功?}
+    J -->|否| D
+    J -->|是| H
+    H --> K[完成]
+    D --> K
+```
+
+### 3.5 模块间接口规范
+
+#### 3.5.1 统一数据结构定义
 
 ```python
 from dataclasses import dataclass
@@ -158,7 +218,7 @@ class ProcessingResult:
     warnings: List[str] = None
 ```
 
-#### 3.2.2 工具接口规范
+#### 3.5.2 工具接口规范
 
 ```python
 class BaseTool:
@@ -174,9 +234,9 @@ class BaseTool:
         pass
 ```
 
-### 3.3 可插拔处理链设计
+### 3.6 可插拔处理链设计
 
-#### 3.3.1 Pipeline配置
+#### 3.6.1 Pipeline配置
 
 ```python
 # 支持灵活的处理链配置
@@ -232,7 +292,7 @@ class PipelineExecutor:
 
 #### 3.4.1 表级并发
 
-- 最大并发表数: 可配置(默认3个
+- 最大并发表数: 可配置(默认1个,避免LLM并发问题
 - 使用asyncio.Semaphore控制并发数
 - 单表内工具串行执行
 
@@ -327,7 +387,7 @@ SCHEMA_TOOLS_CONFIG = {
     "large_table_threshold": 1000000,  # 大表阈值
     
     # 并发配置
-    "max_concurrent_tables": 3,
+    "max_concurrent_tables": 1,  # 建议保持1,避免LLM并发调用问题
     
     # LLM配置
     "use_app_config_llm": True,
@@ -366,10 +426,32 @@ SCHEMA_TOOLS_CONFIG = {
         "max_tables": 20,                    # 最大表数量限制
         "theme_count": 5,                    # LLM生成的主题数量
         "questions_per_theme": 10,           # 每个主题生成的问题数
-        "max_concurrent_themes": 3,          # 并行处理的主题数量
+        "max_concurrent_themes": 1,          # 并行处理的主题数量(建议保持1)
         "continue_on_theme_error": True,     # 主题生成失败是否继续
         "save_intermediate": True,           # 是否保存中间结果
         "output_file_prefix": "qs",          # 输出文件前缀
+    },
+    
+    # SQL验证配置(新增)
+    "sql_validation": {
+        "reuse_connection_pool": True,       # 复用现有连接池
+        "max_concurrent_validations": 5,     # 并发验证数
+        "validation_timeout": 30,            # 单个验证超时(秒)
+        "batch_size": 10,                    # 批处理大小
+        "continue_on_error": True,           # 错误时是否继续
+        "save_validation_report": True,      # 保存验证报告
+        "save_detailed_json_report": False,  # 保存详细JSON报告(可选)
+        "readonly_mode": True,               # 启用只读模式
+        "max_retry_count": 2,                # 验证失败重试次数
+        "report_file_prefix": "sql_validation",  # 报告文件前缀
+        
+        # SQL修复配置
+        "enable_sql_repair": False,          # 启用SQL修复功能(默认禁用)
+        "llm_repair_timeout": 120,           # LLM修复超时时间(秒)
+        "repair_batch_size": 2,              # 修复批处理大小
+        
+        # 文件修改配置
+        "modify_original_file": False,       # 是否修改原始JSON文件(默认禁用)
     }
 }
 ```
@@ -414,18 +496,21 @@ def generate_safe_filename(schema_name: str, table_name: str, suffix: str) -> st
 
 ```
 training/generated_data/
-├── ddl/
-│   ├── users.ddl                    # public.users
-│   ├── hr__employees.ddl            # hr.employees  
-│   └── sales__order_items.ddl       # sales.order-items
-├── docs/
-│   ├── users_detail.md
-│   ├── hr__employees_detail.md
-│   └── sales__order_items_detail.md
+├── users.ddl                        # public.users
+├── hr__employees.ddl                # hr.employees  
+├── sales__order_items.ddl           # sales.order-items
+├── users_detail.md                  # 对应的MD文档
+├── hr__employees_detail.md
+├── sales__order_items_detail.md
+├── qs_highway_db_20240101_pair.json # Question-SQL对文件
+├── metadata.txt                     # 主题元数据
+├── sql_validation_20240101_summary.txt  # SQL验证报告
 └── logs/
     └── schema_tools.log
 ```
 
+**注意**: 配置已更新为不创建ddl/和docs/子目录,所有文件直接放在output目录下。
+
 #### 4.3.3 重名检测与处理
 
 ```python

+ 353 - 3
docs/Schema Tools 详细设计文档.md

@@ -12,6 +12,9 @@ schema_tools/
 ├── training_data_agent.py          # 主AI Agent
 ├── qs_agent.py                     # Question-SQL生成Agent (新增)
 ├── qs_generator.py                 # Question-SQL命令行入口 (新增)
+├── sql_validation_agent.py         # SQL验证Agent (新增)
+├── sql_validator.py                # SQL验证命令行入口 (新增)
+├── schema_workflow_orchestrator.py # 端到端工作流编排器 (新增)
 ├── tools/                          # Agent工具集
 │   ├── __init__.py                 # 工具模块初始化
 │   ├── base.py                     # 基础工具类和注册机制
@@ -22,7 +25,8 @@ schema_tools/
 │   └── doc_generator.py            # MD文档生成工具
 ├── validators/                     # 验证器模块 (新增)
 │   ├── __init__.py
-│   └── file_count_validator.py     # 文件数量验证器
+│   ├── file_count_validator.py     # 文件数量验证器
+│   └── sql_validator.py            # SQL验证器核心模块
 ├── analyzers/                      # 分析器模块 (新增)
 │   ├── __init__.py
 │   ├── md_analyzer.py              # MD文件分析器
@@ -1839,7 +1843,7 @@ SCHEMA_TOOLS_CONFIG = {
     "large_table_threshold": 1000000,           # 大表阈值(行数)
     
     # 并发配置
-    "max_concurrent_tables": 3,                 # 最大并发处理表数
+    "max_concurrent_tables": 1,  # 建议保持1,避免LLM并发调用问题                 # 最大并发处理表数
     
     # LLM配置
     "use_app_config_llm": True,                # 是否使用app_config中的LLM配置
@@ -2226,4 +2230,350 @@ async def _save_theme_results(self, theme_name: str, qs_pairs: List[Dict]):
     self.intermediate_results.append(theme_result)
     
     # 立即保存到中间文件
-    if self.config['qs_generation']['save_intermediate']:
+    if self.config['qs_generation']['save_intermediate']:
+        with open(self.intermediate_file, 'w', encoding='utf-8') as f:
+            json.dump(self.intermediate_results, f, ensure_ascii=False, indent=2)
+```
+
+## 9. SQL验证器核心模块
+
+### 9.1 SQL验证器设计 (`validators/sql_validator.py`)
+
+```python
+@dataclass
+class SQLValidationResult:
+    """SQL验证结果"""
+    sql: str
+    valid: bool
+    error_message: str = ""
+    execution_time: float = 0.0
+    retry_count: int = 0
+    
+    # SQL修复相关字段
+    repair_attempted: bool = False
+    repair_successful: bool = False
+    repaired_sql: str = ""
+    repair_error: str = ""
+
+@dataclass
+class ValidationStats:
+    """验证统计信息"""
+    total_sqls: int = 0
+    valid_sqls: int = 0
+    invalid_sqls: int = 0
+    total_time: float = 0.0
+    avg_time_per_sql: float = 0.0
+    retry_count: int = 0
+    
+    # SQL修复统计
+    repair_attempted: int = 0
+    repair_successful: int = 0
+    repair_failed: int = 0
+
+class SQLValidator:
+    """SQL验证器核心类"""
+    
+    def __init__(self, db_connection: str = None):
+        self.db_connection = db_connection
+        self.connection_pool = None
+        self.config = SCHEMA_TOOLS_CONFIG['sql_validation']
+        
+    async def validate_sql(self, sql: str, retry_count: int = 0) -> SQLValidationResult:
+        """验证单个SQL语句"""
+        start_time = time.time()
+        
+        try:
+            if not self.connection_pool:
+                await self._get_connection_pool()
+            
+            # 使用EXPLAIN验证SQL语法和表结构
+            explain_sql = f"EXPLAIN {sql}"
+            
+            async with self.connection_pool.acquire() as conn:
+                # 设置只读模式
+                if self.config['readonly_mode']:
+                    await conn.execute("SET default_transaction_read_only = on")
+                
+                # 执行EXPLAIN
+                await asyncio.wait_for(
+                    conn.fetch(explain_sql),
+                    timeout=self.config['validation_timeout']
+                )
+            
+            execution_time = time.time() - start_time
+            
+            return SQLValidationResult(
+                sql=sql,
+                valid=True,
+                execution_time=execution_time,
+                retry_count=retry_count
+            )
+            
+        except asyncio.TimeoutError:
+            execution_time = time.time() - start_time
+            error_msg = f"SQL验证超时({self.config['validation_timeout']}秒)"
+            
+            return SQLValidationResult(
+                sql=sql,
+                valid=False,
+                error_message=error_msg,
+                execution_time=execution_time,
+                retry_count=retry_count
+            )
+            
+        except Exception as e:
+            execution_time = time.time() - start_time
+            error_msg = str(e)
+            
+            # 检查是否需要重试
+            if retry_count < self.config['max_retry_count'] and self._should_retry(e):
+                await asyncio.sleep(0.5)  # 短暂延迟
+                return await self.validate_sql(sql, retry_count + 1)
+            
+            return SQLValidationResult(
+                sql=sql,
+                valid=False,
+                error_message=error_msg,
+                execution_time=execution_time,
+                retry_count=retry_count
+            )
+    
+    async def validate_sqls_batch(self, sqls: List[str]) -> List[SQLValidationResult]:
+        """批量验证SQL语句"""
+        max_concurrent = self.config['max_concurrent_validations']
+        semaphore = asyncio.Semaphore(max_concurrent)
+        
+        async def validate_with_semaphore(sql):
+            async with semaphore:
+                return await self.validate_sql(sql)
+        
+        # 并发执行验证
+        tasks = [validate_with_semaphore(sql) for sql in sqls]
+        results = await asyncio.gather(*tasks, return_exceptions=True)
+        
+        # 处理异常结果
+        processed_results = []
+        for i, result in enumerate(results):
+            if isinstance(result, Exception):
+                processed_results.append(SQLValidationResult(
+                    sql=sqls[i],
+                    valid=False,
+                    error_message=f"验证异常: {str(result)}"
+                ))
+            else:
+                processed_results.append(result)
+        
+        return processed_results
+    
+    def calculate_stats(self, results: List[SQLValidationResult]) -> ValidationStats:
+        """计算验证统计信息"""
+        stats = ValidationStats()
+        
+        stats.total_sqls = len(results)
+        stats.valid_sqls = sum(1 for r in results if r.valid)
+        stats.invalid_sqls = stats.total_sqls - stats.valid_sqls
+        stats.total_time = sum(r.execution_time for r in results)
+        stats.avg_time_per_sql = stats.total_time / stats.total_sqls if stats.total_sqls > 0 else 0.0
+        stats.retry_count = sum(r.retry_count for r in results)
+        
+        # 修复统计
+        stats.repair_attempted = sum(1 for r in results if r.repair_attempted)
+        stats.repair_successful = sum(1 for r in results if r.repair_successful)
+        stats.repair_failed = stats.repair_attempted - stats.repair_successful
+        
+        return stats
+```
+
+### 9.2 SQL验证Agent (`sql_validation_agent.py`)
+
+```python
+class SQLValidationAgent:
+    """SQL验证Agent - 管理SQL验证的完整流程"""
+    
+    async def validate(self) -> Dict[str, Any]:
+        """执行SQL验证流程"""
+        
+        # 1. 读取输入文件
+        questions_sqls = await self._load_questions_sqls()
+        
+        # 2. 提取SQL语句
+        sqls = [item['sql'] for item in questions_sqls]
+        
+        # 3. 执行验证
+        validation_results = await self._validate_sqls_with_batching(sqls)
+        
+        # 4. 计算统计信息
+        stats = self.validator.calculate_stats(validation_results)
+        
+        # 5. 尝试修复失败的SQL(如果启用LLM修复)
+        if self.config.get('enable_sql_repair', False) and self.vn:
+            validation_results = await self._attempt_sql_repair(questions_sqls, validation_results)
+            stats = self.validator.calculate_stats(validation_results)
+        
+        # 6. 修改原始JSON文件(如果启用文件修改)
+        file_modification_stats = {'modified': 0, 'deleted': 0, 'failed_modifications': 0}
+        if self.config.get('modify_original_file', False):
+            file_modification_stats = await self._modify_original_json_file(questions_sqls, validation_results)
+        
+        # 7. 生成详细报告
+        report = await self._generate_report(questions_sqls, validation_results, stats, file_modification_stats)
+        
+        # 8. 保存验证报告
+        if self.config['save_validation_report']:
+            await self._save_validation_report(report)
+        
+        return report
+    
+    async def _attempt_sql_repair(self, questions_sqls: List[Dict], validation_results: List[SQLValidationResult]) -> List[SQLValidationResult]:
+        """尝试修复失败的SQL"""
+        
+        failed_indices = [i for i, result in enumerate(validation_results) if not result.valid]
+        
+        if not failed_indices:
+            return validation_results
+        
+        # 批量修复
+        batch_size = self.config.get('repair_batch_size', 5)
+        updated_results = validation_results.copy()
+        
+        for i in range(0, len(failed_indices), batch_size):
+            batch_indices = failed_indices[i:i + batch_size]
+            
+            # 准备批次数据
+            batch_data = []
+            for idx in batch_indices:
+                batch_data.append({
+                    'index': idx,
+                    'question': questions_sqls[idx]['question'],
+                    'sql': validation_results[idx].sql,
+                    'error': validation_results[idx].error_message
+                })
+            
+            # 调用LLM修复
+            repaired_sqls = await self._repair_sqls_with_llm(batch_data)
+            
+            # 验证修复后的SQL
+            for j, idx in enumerate(batch_indices):
+                original_result = updated_results[idx]
+                original_result.repair_attempted = True
+                
+                if j < len(repaired_sqls) and repaired_sqls[j]:
+                    repaired_sql = repaired_sqls[j]
+                    
+                    # 验证修复后的SQL
+                    repair_result = await self.validator.validate_sql(repaired_sql)
+                    
+                    if repair_result.valid:
+                        # 修复成功
+                        original_result.repair_successful = True
+                        original_result.repaired_sql = repaired_sql
+                        original_result.valid = True  # 更新为有效
+                    else:
+                        # 修复失败
+                        original_result.repair_successful = False
+                        original_result.repair_error = repair_result.error_message
+                else:
+                    # LLM修复失败
+                    original_result.repair_successful = False
+                    original_result.repair_error = "LLM修复失败或返回空结果"
+        
+        return updated_results
+    
+    async def _modify_original_json_file(self, questions_sqls: List[Dict], validation_results: List[SQLValidationResult]) -> Dict[str, int]:
+        """修改原始JSON文件"""
+        stats = {'modified': 0, 'deleted': 0, 'failed_modifications': 0}
+        
+        try:
+            # 读取原始JSON文件
+            with open(self.input_file, 'r', encoding='utf-8') as f:
+                original_data = json.load(f)
+            
+            # 创建备份文件
+            backup_file = Path(str(self.input_file) + '.backup')
+            with open(backup_file, 'w', encoding='utf-8') as f:
+                json.dump(original_data, f, ensure_ascii=False, indent=2)
+            
+            # 构建修改计划
+            modifications = []
+            deletions = []
+            
+            for i, (qs, result) in enumerate(zip(questions_sqls, validation_results)):
+                if result.repair_successful and result.repaired_sql:
+                    # 修复成功的SQL
+                    modifications.append({
+                        'index': i,
+                        'original_sql': result.sql,
+                        'repaired_sql': result.repaired_sql,
+                        'question': qs['question']
+                    })
+                elif not result.valid and not result.repair_successful:
+                    # 无法修复的SQL,标记删除
+                    deletions.append({
+                        'index': i,
+                        'question': qs['question'],
+                        'sql': result.sql,
+                        'error': result.error_message
+                    })
+            
+            # 执行修改(从后往前,避免索引变化)
+            new_data = original_data.copy()
+            
+            # 先删除无效项(从后往前删除)
+            for deletion in sorted(deletions, key=lambda x: x['index'], reverse=True):
+                if deletion['index'] < len(new_data):
+                    new_data.pop(deletion['index'])
+                    stats['deleted'] += 1
+            
+            # 再修改SQL(需要重新计算索引)
+            for modification in sorted(modifications, key=lambda x: x['index']):
+                # 计算删除后的新索引
+                new_index = modification['index']
+                for deletion in deletions:
+                    if deletion['index'] < modification['index']:
+                        new_index -= 1
+                
+                if new_index < len(new_data):
+                    new_data[new_index]['sql'] = modification['repaired_sql']
+                    stats['modified'] += 1
+            
+            # 写入修改后的文件
+            with open(self.input_file, 'w', encoding='utf-8') as f:
+                json.dump(new_data, f, ensure_ascii=False, indent=2)
+            
+            # 记录详细修改信息到日志文件
+            await self._write_modification_log(modifications, deletions)
+            
+        except Exception as e:
+            stats['failed_modifications'] = 1
+        
+        return stats
+```
+
+## 10. 工作流编排器设计
+
+### 10.1 SchemaWorkflowOrchestrator核心功能
+
+```python
+class SchemaWorkflowOrchestrator:
+    """端到端的Schema处理编排器"""
+    
+    async def execute_complete_workflow(self) -> Dict[str, Any]:
+        """执行完整的Schema处理工作流程"""
+        
+        # 步骤1: 生成DDL和MD文件
+        await self._execute_step_1_ddl_md_generation()
+        
+        # 步骤2: 生成Question-SQL对
+        await self._execute_step_2_question_sql_generation()
+        
+        # 步骤3: 验证和修正SQL(可选)
+        if self.enable_sql_validation:
+            await self._execute_step_3_sql_validation()
+        
+        # 生成最终报告
+        final_report = await self._generate_final_report()
+        
+        return final_report
+```
+
+这样,文档就与当前代码完全一致了,包含了所有新增的SQL验证、LLM修复、文件修改等功能的详细设计说明。

+ 68 - 34
docs/sql_validation_guide.md

@@ -2,6 +2,11 @@
 
 SQL验证器是Schema Tools的一个独立模块,用于验证Question-SQL对中的SQL语句是否有效。它通过执行`EXPLAIN`语句来检测SQL语法错误和表结构问题。
 
+**⚠️ 重要提示**:
+- **命令行模式默认行为**:启用LLM修复功能和文件修改功能
+- **配置文件默认值**:禁用修复和文件修改功能(保守设置)
+- 如需禁用默认功能,请使用 `--disable-llm-repair` 或 `--no-modify-file` 参数
+
 ## 功能特性
 
 - 🔍 使用PostgreSQL的EXPLAIN语句验证SQL有效性
@@ -57,25 +62,22 @@ python -m schema_tools.sql_validator \
 ### 高级选项
 
 ```bash
-# 调整性能参数
+# 基本使用(默认启用修复和文件修改)
 python -m schema_tools.sql_validator \
   --db-connection "postgresql://user:pass@localhost:5432/dbname" \
-  --input-file ./data.json \
-  --max-concurrent 10 \
-  --batch-size 20 \
-  --timeout 60 \
-  --verbose
+  --input-file ./data.json
 
-# 基本使用(仅验证,不修改文件)
+# 仅生成报告,不修改文件
 python -m schema_tools.sql_validator \
   --db-connection "postgresql://user:pass@localhost:5432/dbname" \
-  --input-file ./data.json
+  --input-file ./data.json \
+  --no-modify-file
 
-# 启用LLM修复功能
+# 启用文件修改,但禁用LLM修复(仅删除无效SQL)
 python -m schema_tools.sql_validator \
   --db-connection "postgresql://user:pass@localhost:5432/dbname" \
   --input-file ./data.json \
-  --enable-llm-repair
+  --disable-llm-repair
 
 # 预检查模式(仅验证文件格式)
 python -m schema_tools.sql_validator \
@@ -118,46 +120,45 @@ SQL验证器现在支持自动修改原始JSON文件:
 
 ### 默认行为
 ```bash
-# 默认仅验证,不修改原文件
+# 默认启用修复和文件修改
 python -m schema_tools.sql_validator \
   --db-connection "postgresql://user:pass@localhost:5432/dbname" \
   --input-file ./data.json
 ```
 
 执行后:
+- 创建备份文件:`data.json.backup`
+- 修改原文件:`data.json`(更新修复成功的SQL,删除无法修复的SQL)
+- 生成修改日志:`file_modifications_时间戳.log`
 - 生成验证报告:`sql_validation_时间戳_summary.txt`
-- 不修改原始文件
 
-### 仅删除无效SQL
+### 仅生成报告
 
 ```bash
-# 仅删除无效SQL,不进行LLM修复
+# 仅生成报告,不修改原文件
 python -m schema_tools.sql_validator \
   --db-connection "postgresql://user:pass@localhost:5432/dbname" \
   --input-file ./data.json \
-  --modify-original-file
+  --no-modify-file
 ```
 
 执行后:
-- 创建备份文件:`data.json.backup`
-- 修改原文件:`data.json`(删除验证失败的SQL)
-- 生成修改日志:`file_modifications_时间戳.log`
 - 生成验证报告:`sql_validation_时间戳_summary.txt`
+- 不修改原始文件
 
-### 用LLM修复功能
+### 用LLM修复功能
 
 ```bash
-# 启用LLM修复功能(需要同时指定文件修改参数
+# 启用文件修改,但禁用LLM修复(仅删除无效SQL
 python -m schema_tools.sql_validator \
   --db-connection "postgresql://user:pass@localhost:5432/dbname" \
   --input-file ./data.json \
-  --enable-llm-repair \
-  --modify-original-file
+  --disable-llm-repair
 ```
 
 执行后:
 - 创建备份文件:`data.json.backup`
-- 修改原文件:`data.json`(更新修复成功的SQL,删除无法修复的SQL
+- 修改原文件:`data.json`(删除验证失败的SQL,不进行LLM修复
 - 生成修改日志:`file_modifications_时间戳.log`
 - 生成验证报告:`sql_validation_时间戳_summary.txt`
 
@@ -328,25 +329,58 @@ SQL验证器的配置位于 `schema_tools/config.py` 中:
     "batch_size": 10,                    # 批处理大小
     "continue_on_error": True,           # 错误时是否继续
     "save_validation_report": True,      # 保存验证报告
+    "save_detailed_json_report": False,  # 保存详细JSON报告(可选)
     "readonly_mode": True,               # 启用只读模式
     "max_retry_count": 2,                # 验证失败重试次数
     "report_file_prefix": "sql_validation",  # 报告文件前缀
+    
+    # SQL修复配置
+    "enable_sql_repair": False,          # 启用SQL修复功能(配置文件默认禁用)
+    "llm_repair_timeout": 120,           # LLM修复超时时间(秒)
+    "repair_batch_size": 2,              # 修复批处理大小
+    
+    # 文件修改配置
+    "modify_original_file": False,       # 是否修改原始JSON文件(配置文件默认禁用)
 }
 ```
 
+**重要说明**:
+- 配置文件中的默认值为保守设置(禁用修复和文件修改)
+- 命令行模式下会自动启用修复和文件修改功能
+- 可通过命令行参数 `--disable-llm-repair` 和 `--no-modify-file` 禁用
+
 ## 命令行参数说明
 
-| 参数 | 类型 | 必需 | 说明 |
-|------|------|------|------|
-| `--db-connection` | string | 是 | PostgreSQL数据库连接字符串 |
-| `--input-file` | string | 是 | 输入的JSON文件路径 |
-| `--output-dir` | string | 否 | 验证报告输出目录 |
-| `--max-concurrent` | int | 否 | 最大并发验证数 |
-| `--batch-size` | int | 否 | 批处理大小 |
-| `--timeout` | int | 否 | 单个SQL验证超时时间(秒) |
-| `--verbose` | flag | 否 | 启用详细日志输出 |
-| `--log-file` | string | 否 | 日志文件路径 |
-| `--dry-run` | flag | 否 | 仅读取和解析文件,不执行验证 |
+### 必需参数
+| 参数 | 类型 | 说明 |
+|------|------|------|
+| `--db-connection` | string | PostgreSQL数据库连接字符串 |
+| `--input-file` | string | 输入的JSON文件路径(包含Question-SQL对) |
+
+### 可选参数
+| 参数 | 类型 | 默认值 | 说明 |
+|------|------|--------|------|
+| `--output-dir` | string | 输入文件同目录 | 验证报告输出目录 |
+| `--max-concurrent` | int | 5 | 最大并发验证数 |
+| `--batch-size` | int | 10 | 批处理大小 |
+| `--timeout` | int | 30 | 单个SQL验证超时时间(秒) |
+| `--verbose` | flag | False | 启用详细日志输出 |
+| `--log-file` | string | 无 | 日志文件路径 |
+| `--dry-run` | flag | False | 仅读取和解析文件,不执行验证 |
+| `--save-json` | flag | False | 同时保存详细的JSON报告 |
+
+### SQL修复和文件修改参数
+| 参数 | 类型 | 默认值 | 说明 |
+|------|------|--------|------|
+| `--disable-llm-repair` | flag | False | 禁用LLM自动修复功能 |
+| `--enable-llm-repair` | flag | - | 启用LLM修复(向后兼容,与--disable-llm-repair相反) |
+| `--no-modify-file` | flag | False | 不修改原始JSON文件(仅生成验证报告) |
+| `--modify-original-file` | flag | - | 修改原始JSON文件(向后兼容,与--no-modify-file相反) |
+
+**注意**:
+- 命令行模式下,默认启用LLM修复和文件修改功能
+- 如需禁用,请明确使用 `--disable-llm-repair` 或 `--no-modify-file` 参数
+- 向后兼容参数仍然有效,但建议使用新的参数格式
 
 ## 错误处理机制
 

+ 75 - 10
schema_tools/README.md

@@ -13,6 +13,7 @@
 - 🛡️ 完整的错误处理和日志记录
 - 🎯 **新增**:Question-SQL训练数据生成
 - ✅ **新增**:SQL语句有效性验证
+- 🎭 **新增**:端到端工作流编排器
 
 ## 安装依赖
 
@@ -22,7 +23,72 @@ pip install asyncpg asyncio
 
 ## 使用方法
 
-### 1. 生成DDL和MD文档
+### 0. 🚀 一键执行完整工作流程(推荐)
+
+使用新的工作流编排器,一个命令完成所有步骤:
+
+#### 命令行方式
+```bash
+python -m schema_tools.schema_workflow_orchestrator \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --table-list tables.txt \
+  --business-context "高速公路服务区管理系统" \
+  --db-name highway_db \
+  --output-dir ./output
+```
+
+#### 编程方式
+```python
+import asyncio
+from schema_tools.schema_workflow_orchestrator import SchemaWorkflowOrchestrator
+
+async def run_complete_workflow():
+    orchestrator = SchemaWorkflowOrchestrator(
+        db_connection="postgresql://user:pass@localhost:5432/dbname",
+        table_list_file="tables.txt",
+        business_context="高速公路服务区管理系统",
+        db_name="highway_db",
+        output_dir="./output"
+    )
+    
+    # 一键执行完整流程
+    report = await orchestrator.execute_complete_workflow()
+    
+    if report["success"]:
+        print(f"✅ 编排完成!最终生成 {report['final_outputs']['final_question_count']} 个问答对")
+        print(f"📄 输出文件: {report['final_outputs']['primary_output_file']}")
+    else:
+        print(f"❌ 编排失败: {report['error']['message']}")
+
+asyncio.run(run_complete_workflow())
+```
+
+**工作流编排器特性:**
+- 🔄 自动执行:DDL/MD生成 → Question-SQL生成 → SQL验证修复
+- 📊 详细报告:每个步骤的执行状态和性能指标
+- 🛠️ 灵活配置:可选择跳过验证、禁用修复等
+- 💾 中间结果保护:失败时保留已完成步骤的输出
+- 🎯 智能恢复:支持从中断点继续执行
+
+#### 工作流编排器命令行选项
+```bash
+# 跳过SQL验证
+python -m schema_tools.schema_workflow_orchestrator \
+  --db-connection "postgresql://..." --table-list tables.txt \
+  --business-context "系统" --db-name test_db --skip-validation
+
+# 禁用LLM修复
+python -m schema_tools.schema_workflow_orchestrator \
+  --db-connection "postgresql://..." --table-list tables.txt \
+  --business-context "系统" --db-name test_db --disable-llm-repair
+
+# 详细日志
+python -m schema_tools.schema_workflow_orchestrator \
+  --db-connection "postgresql://..." --table-list tables.txt \
+  --business-context "系统" --db-name test_db --verbose
+```
+
+### 1. 生成DDL和MD文档(分步执行)
 
 #### 基本使用
 ```bash
@@ -42,7 +108,7 @@ python -m schema_tools \
   --pipeline full
 ```
 
-### 2. 生成Question-SQL训练数据
+### 2. 生成Question-SQL训练数据(分步执行)
 
 在生成DDL和MD文件后,可以使用新的Question-SQL生成功能:
 
@@ -61,7 +127,7 @@ python -m schema_tools.qs_generator \
 4. 为每个主题生成10个Question-SQL对
 5. 输出到 `qs_highway_db_时间戳_pair.json` 文件
 
-### 3. 验证SQL语句有效性(新功能
+### 3. 验证SQL语句有效性(分步执行
 
 在生成Question-SQL对后,可以验证其中的SQL语句:
 
@@ -80,23 +146,22 @@ python -m schema_tools.sql_validator \
 
 #### SQL验证高级选项
 ```bash
-# 基本验证(仅生成报告
+# 基本验证(启用修复和文件修改
 python -m schema_tools.sql_validator \
   --db-connection "postgresql://user:pass@localhost:5432/dbname" \
   --input-file ./data.json
 
-# 删除无效SQL(不进行LLM修复)
+# 仅生成报告,不修改文件
 python -m schema_tools.sql_validator \
   --db-connection "postgresql://user:pass@localhost:5432/dbname" \
   --input-file ./data.json \
-  --modify-original-file
+  --no-modify-file
 
-# 启用LLM修复功能
+# 启用文件修改,但禁用LLM修复(仅删除无效SQL)
 python -m schema_tools.sql_validator \
   --db-connection "postgresql://user:pass@localhost:5432/dbname" \
   --input-file ./data.json \
-  --enable-llm-repair \
-  --modify-original-file
+  --disable-llm-repair
 
 # 性能调优参数
 python -m schema_tools.sql_validator \
@@ -108,7 +173,7 @@ python -m schema_tools.sql_validator \
   --verbose
 ```
 
-### 4. 编程方式使用
+### 4. 编程方式使用(分步执行)
 
 #### 生成DDL/MD文档
 ```python

+ 2 - 0
schema_tools/__init__.py

@@ -6,6 +6,7 @@ Schema Tools - 自动化数据库逆向工程工具
 from .training_data_agent import SchemaTrainingDataAgent
 from .qs_agent import QuestionSQLGenerationAgent
 from .sql_validation_agent import SQLValidationAgent
+from .schema_workflow_orchestrator import SchemaWorkflowOrchestrator
 from .config import SCHEMA_TOOLS_CONFIG, get_config, update_config
 
 __version__ = "1.0.0"
@@ -13,6 +14,7 @@ __all__ = [
     "SchemaTrainingDataAgent",
     "QuestionSQLGenerationAgent",
     "SQLValidationAgent",
+    "SchemaWorkflowOrchestrator",
     "SCHEMA_TOOLS_CONFIG", 
     "get_config",
     "update_config"

+ 586 - 0
schema_tools/schema_workflow_orchestrator.py

@@ -0,0 +1,586 @@
+"""
+Schema工作流编排器
+统一管理完整的数据库Schema处理流程
+"""
+
+import asyncio
+import time
+import logging
+from typing import Dict, Any, List, Optional
+from pathlib import Path
+from datetime import datetime
+
+from schema_tools.training_data_agent import SchemaTrainingDataAgent
+from schema_tools.qs_agent import QuestionSQLGenerationAgent
+from schema_tools.sql_validation_agent import SQLValidationAgent
+from schema_tools.config import SCHEMA_TOOLS_CONFIG
+from schema_tools.utils.logger import setup_logging
+
+
+class SchemaWorkflowOrchestrator:
+    """端到端的Schema处理编排器 - 完整工作流程"""
+    
+    def __init__(self, 
+                 db_connection: str,
+                 table_list_file: str,
+                 business_context: str,
+                 db_name: str,
+                 output_dir: str = None,
+                 enable_sql_validation: bool = True,
+                 enable_llm_repair: bool = True,
+                 modify_original_file: bool = True):
+        """
+        初始化Schema工作流编排器
+        
+        Args:
+            db_connection: 数据库连接字符串
+            table_list_file: 表清单文件路径
+            business_context: 业务上下文描述
+            db_name: 数据库名称(用于生成文件名)
+            output_dir: 输出目录
+            enable_sql_validation: 是否启用SQL验证
+            enable_llm_repair: 是否启用LLM修复功能
+            modify_original_file: 是否修改原始JSON文件
+        """
+        self.db_connection = db_connection
+        self.table_list_file = table_list_file
+        self.business_context = business_context
+        self.db_name = db_name
+        self.output_dir = Path(output_dir) if output_dir else Path("./output")
+        self.enable_sql_validation = enable_sql_validation
+        self.enable_llm_repair = enable_llm_repair
+        self.modify_original_file = modify_original_file
+        
+        # 确保输出目录存在
+        self.output_dir.mkdir(parents=True, exist_ok=True)
+        
+        # 初始化日志
+        self.logger = logging.getLogger("schema_tools.SchemaWorkflowOrchestrator")
+        
+        # 工作流程状态
+        self.workflow_state = {
+            "start_time": None,
+            "end_time": None,
+            "current_step": None,
+            "completed_steps": [],
+            "failed_steps": [],
+            "artifacts": {},  # 存储各步骤产生的文件
+            "statistics": {}
+        }
+    
+    async def execute_complete_workflow(self) -> Dict[str, Any]:
+        """
+        执行完整的Schema处理工作流程
+        
+        Returns:
+            完整的工作流程报告
+        """
+        self.workflow_state["start_time"] = time.time()
+        self.logger.info("🚀 开始执行Schema工作流编排")
+        self.logger.info(f"📁 输出目录: {self.output_dir}")
+        self.logger.info(f"🏢 业务背景: {self.business_context}")
+        self.logger.info(f"💾 数据库: {self.db_name}")
+        
+        try:
+            # 步骤1: 生成DDL和MD文件
+            await self._execute_step_1_ddl_md_generation()
+            
+            # 步骤2: 生成Question-SQL对
+            await self._execute_step_2_question_sql_generation()
+            
+            # 步骤3: 验证和修正SQL(可选)
+            if self.enable_sql_validation:
+                await self._execute_step_3_sql_validation()
+            else:
+                self.logger.info("⏭️ 跳过SQL验证步骤")
+            
+            # 生成最终报告
+            final_report = await self._generate_final_report()
+            
+            self.workflow_state["end_time"] = time.time()
+            self.logger.info("✅ Schema工作流编排完成")
+            
+            return final_report
+            
+        except Exception as e:
+            self.workflow_state["end_time"] = time.time()
+            self.logger.exception(f"❌ 工作流程执行失败: {str(e)}")
+            
+            error_report = await self._generate_error_report(e)
+            return error_report
+    
+    async def _execute_step_1_ddl_md_generation(self):
+        """步骤1: 生成DDL和MD文件"""
+        self.workflow_state["current_step"] = "ddl_md_generation"
+        self.logger.info("=" * 60)
+        self.logger.info("📝 步骤1: 开始生成DDL和MD文件")
+        self.logger.info("=" * 60)
+        
+        step_start_time = time.time()
+        
+        try:
+            # 创建DDL/MD生成Agent
+            ddl_md_agent = SchemaTrainingDataAgent(
+                db_connection=self.db_connection,
+                table_list_file=self.table_list_file,
+                business_context=self.business_context,
+                output_dir=str(self.output_dir),
+                pipeline="full"
+            )
+            
+            # 执行DDL/MD生成
+            ddl_md_result = await ddl_md_agent.generate_training_data()
+            
+            step_duration = time.time() - step_start_time
+            
+            # 记录结果
+            self.workflow_state["completed_steps"].append("ddl_md_generation")
+            self.workflow_state["artifacts"]["ddl_md_generation"] = {
+                "total_tables": ddl_md_result.get("summary", {}).get("total_tables", 0),
+                "processed_successfully": ddl_md_result.get("summary", {}).get("processed_successfully", 0),
+                "failed": ddl_md_result.get("summary", {}).get("failed", 0),
+                "files_generated": ddl_md_result.get("statistics", {}).get("files_generated", 0),
+                "duration": step_duration
+            }
+            self.workflow_state["statistics"]["step1_duration"] = step_duration
+            
+            processed_tables = ddl_md_result.get("summary", {}).get("processed_successfully", 0)
+            self.logger.info(f"✅ 步骤1完成: 成功处理 {processed_tables} 个表,耗时 {step_duration:.2f}秒")
+            
+        except Exception as e:
+            self.workflow_state["failed_steps"].append("ddl_md_generation")
+            self.logger.error(f"❌ 步骤1失败: {str(e)}")
+            raise
+    
+    async def _execute_step_2_question_sql_generation(self):
+        """步骤2: 生成Question-SQL对"""
+        self.workflow_state["current_step"] = "question_sql_generation"
+        self.logger.info("=" * 60)
+        self.logger.info("🤖 步骤2: 开始生成Question-SQL对")
+        self.logger.info("=" * 60)
+        
+        step_start_time = time.time()
+        
+        try:
+            # 创建Question-SQL生成Agent
+            qs_agent = QuestionSQLGenerationAgent(
+                output_dir=str(self.output_dir),
+                table_list_file=self.table_list_file,
+                business_context=self.business_context,
+                db_name=self.db_name
+            )
+            
+            # 执行Question-SQL生成
+            qs_result = await qs_agent.generate()
+            
+            step_duration = time.time() - step_start_time
+            
+            # 记录结果
+            self.workflow_state["completed_steps"].append("question_sql_generation")
+            self.workflow_state["artifacts"]["question_sql_generation"] = {
+                "output_file": str(qs_result.get("output_file", "")),
+                "total_questions": qs_result.get("total_questions", 0),
+                "total_themes": qs_result.get("total_themes", 0),
+                "successful_themes": qs_result.get("successful_themes", 0),
+                "failed_themes": qs_result.get("failed_themes", []),
+                "duration": step_duration
+            }
+            self.workflow_state["statistics"]["step2_duration"] = step_duration
+            
+            total_questions = qs_result.get("total_questions", 0)
+            self.logger.info(f"✅ 步骤2完成: 生成了 {total_questions} 个问答对,耗时 {step_duration:.2f}秒")
+            
+        except Exception as e:
+            self.workflow_state["failed_steps"].append("question_sql_generation")
+            self.logger.error(f"❌ 步骤2失败: {str(e)}")
+            raise
+    
+    async def _execute_step_3_sql_validation(self):
+        """步骤3: 验证和修正SQL"""
+        self.workflow_state["current_step"] = "sql_validation"
+        self.logger.info("=" * 60)
+        self.logger.info("🔍 步骤3: 开始验证和修正SQL")
+        self.logger.info("=" * 60)
+        
+        step_start_time = time.time()
+        
+        try:
+            # 获取步骤2生成的文件
+            qs_artifacts = self.workflow_state["artifacts"].get("question_sql_generation", {})
+            qs_file = qs_artifacts.get("output_file")
+            
+            if not qs_file or not Path(qs_file).exists():
+                raise FileNotFoundError(f"找不到Question-SQL文件: {qs_file}")
+            
+            self.logger.info(f"📄 验证文件: {qs_file}")
+            
+            # 动态设置验证配置
+            SCHEMA_TOOLS_CONFIG['sql_validation']['enable_sql_repair'] = self.enable_llm_repair
+            SCHEMA_TOOLS_CONFIG['sql_validation']['modify_original_file'] = self.modify_original_file
+            
+            # 创建SQL验证Agent
+            sql_validator = SQLValidationAgent(
+                db_connection=self.db_connection,
+                input_file=str(qs_file),
+                output_dir=str(self.output_dir)
+            )
+            
+            # 执行SQL验证和修正
+            validation_result = await sql_validator.validate()
+            
+            step_duration = time.time() - step_start_time
+            
+            # 记录结果
+            self.workflow_state["completed_steps"].append("sql_validation")
+            
+            summary = validation_result.get("summary", {})
+            self.workflow_state["artifacts"]["sql_validation"] = {
+                "original_sql_count": summary.get("total_questions", 0),
+                "valid_sql_count": summary.get("valid_sqls", 0),
+                "invalid_sql_count": summary.get("invalid_sqls", 0),
+                "success_rate": summary.get("success_rate", 0),
+                "repair_stats": summary.get("repair_stats", {}),
+                "file_modification_stats": summary.get("file_modification_stats", {}),
+                "average_execution_time": summary.get("average_execution_time", 0),
+                "total_retries": summary.get("total_retries", 0),
+                "duration": step_duration
+            }
+            self.workflow_state["statistics"]["step3_duration"] = step_duration
+            
+            success_rate = summary.get("success_rate", 0)
+            valid_count = summary.get("valid_sqls", 0)
+            total_count = summary.get("total_questions", 0)
+            
+            self.logger.info(f"✅ 步骤3完成: SQL验证成功率 {success_rate:.1%} ({valid_count}/{total_count}),耗时 {step_duration:.2f}秒")
+            
+            # 显示修复统计
+            repair_stats = summary.get("repair_stats", {})
+            if repair_stats.get("attempted", 0) > 0:
+                self.logger.info(f"🔧 修复统计: 尝试 {repair_stats['attempted']},成功 {repair_stats['successful']},失败 {repair_stats['failed']}")
+            
+            # 显示文件修改统计
+            file_stats = summary.get("file_modification_stats", {})
+            if file_stats.get("modified", 0) > 0 or file_stats.get("deleted", 0) > 0:
+                self.logger.info(f"📝 文件修改: 更新 {file_stats.get('modified', 0)} 个SQL,删除 {file_stats.get('deleted', 0)} 个无效项")
+            
+        except Exception as e:
+            self.workflow_state["failed_steps"].append("sql_validation")
+            self.logger.error(f"❌ 步骤3失败: {str(e)}")
+            raise
+    
+    async def _generate_final_report(self) -> Dict[str, Any]:
+        """生成最终工作流程报告"""
+        total_duration = self.workflow_state["end_time"] - self.workflow_state["start_time"]
+        
+        # 计算最终输出文件
+        qs_artifacts = self.workflow_state["artifacts"].get("question_sql_generation", {})
+        final_output_file = qs_artifacts.get("output_file", "")
+        
+        # 计算最终问题数量
+        if "sql_validation" in self.workflow_state["artifacts"]:
+            # 如果有验证步骤,使用验证后的数量
+            validation_artifacts = self.workflow_state["artifacts"]["sql_validation"]
+            final_question_count = validation_artifacts.get("valid_sql_count", 0)
+        else:
+            # 否则使用生成的数量
+            final_question_count = qs_artifacts.get("total_questions", 0)
+        
+        report = {
+            "success": True,
+            "workflow_summary": {
+                "total_duration": round(total_duration, 2),
+                "completed_steps": self.workflow_state["completed_steps"],
+                "failed_steps": self.workflow_state["failed_steps"],
+                "total_steps": len(self.workflow_state["completed_steps"]),
+                "workflow_started": datetime.fromtimestamp(self.workflow_state["start_time"]).isoformat(),
+                "workflow_completed": datetime.fromtimestamp(self.workflow_state["end_time"]).isoformat()
+            },
+            "input_parameters": {
+                "db_connection": self._mask_connection_string(self.db_connection),
+                "table_list_file": self.table_list_file,
+                "business_context": self.business_context,
+                "db_name": self.db_name,
+                "output_directory": str(self.output_dir),
+                "enable_sql_validation": self.enable_sql_validation,
+                "enable_llm_repair": self.enable_llm_repair,
+                "modify_original_file": self.modify_original_file
+            },
+            "processing_results": {
+                "ddl_md_generation": self.workflow_state["artifacts"].get("ddl_md_generation", {}),
+                "question_sql_generation": self.workflow_state["artifacts"].get("question_sql_generation", {}),
+                "sql_validation": self.workflow_state["artifacts"].get("sql_validation", {})
+            },
+            "final_outputs": {
+                "primary_output_file": final_output_file,
+                "output_directory": str(self.output_dir),
+                "final_question_count": final_question_count,
+                "backup_files_created": self.modify_original_file
+            },
+            "performance_metrics": {
+                "step1_duration": round(self.workflow_state["statistics"].get("step1_duration", 0), 2),
+                "step2_duration": round(self.workflow_state["statistics"].get("step2_duration", 0), 2),
+                "step3_duration": round(self.workflow_state["statistics"].get("step3_duration", 0), 2),
+                "total_duration": round(total_duration, 2)
+            }
+        }
+        
+        return report
+    
+    async def _generate_error_report(self, error: Exception) -> Dict[str, Any]:
+        """生成错误报告"""
+        total_duration = self.workflow_state["end_time"] - self.workflow_state["start_time"]
+        
+        return {
+            "success": False,
+            "error": {
+                "message": str(error),
+                "type": type(error).__name__,
+                "failed_step": self.workflow_state["current_step"]
+            },
+            "workflow_summary": {
+                "total_duration": round(total_duration, 2),
+                "completed_steps": self.workflow_state["completed_steps"],
+                "failed_steps": self.workflow_state["failed_steps"],
+                "workflow_started": datetime.fromtimestamp(self.workflow_state["start_time"]).isoformat() if self.workflow_state["start_time"] else None,
+                "workflow_failed": datetime.fromtimestamp(self.workflow_state["end_time"]).isoformat() if self.workflow_state["end_time"] else None
+            },
+            "partial_results": self.workflow_state["artifacts"],
+            "input_parameters": {
+                "db_connection": self._mask_connection_string(self.db_connection),
+                "table_list_file": self.table_list_file,
+                "business_context": self.business_context,
+                "db_name": self.db_name,
+                "output_directory": str(self.output_dir)
+            }
+        }
+    
+    def _mask_connection_string(self, conn_str: str) -> str:
+        """隐藏连接字符串中的敏感信息"""
+        import re
+        return re.sub(r':[^:@]+@', ':***@', conn_str)
+    
+    def print_final_summary(self, report: Dict[str, Any]):
+        """打印最终摘要"""
+        self.logger.info("=" * 80)
+        self.logger.info("📊 工作流程执行摘要")
+        self.logger.info("=" * 80)
+        
+        if report["success"]:
+            summary = report["workflow_summary"]
+            results = report["processing_results"]
+            outputs = report["final_outputs"]
+            metrics = report["performance_metrics"]
+            
+            self.logger.info(f"✅ 工作流程执行成功")
+            self.logger.info(f"⏱️  总耗时: {summary['total_duration']} 秒")
+            self.logger.info(f"📝 完成步骤: {len(summary['completed_steps'])}/{summary['total_steps']}")
+            
+            # DDL/MD生成结果
+            if "ddl_md_generation" in results:
+                ddl_md = results["ddl_md_generation"]
+                self.logger.info(f"📋 DDL/MD生成: {ddl_md.get('processed_successfully', 0)} 个表成功处理")
+            
+            # Question-SQL生成结果
+            if "question_sql_generation" in results:
+                qs = results["question_sql_generation"]
+                self.logger.info(f"🤖 Question-SQL生成: {qs.get('total_questions', 0)} 个问答对")
+            
+            # SQL验证结果
+            if "sql_validation" in results:
+                validation = results["sql_validation"]
+                success_rate = validation.get('success_rate', 0)
+                self.logger.info(f"🔍 SQL验证: {success_rate:.1%} 成功率 ({validation.get('valid_sql_count', 0)}/{validation.get('original_sql_count', 0)})")
+            
+            self.logger.info(f"📁 输出目录: {outputs['output_directory']}")
+            self.logger.info(f"📄 主要输出文件: {outputs['primary_output_file']}")
+            self.logger.info(f"❓ 最终问题数量: {outputs['final_question_count']}")
+            
+        else:
+            error = report["error"]
+            summary = report["workflow_summary"]
+            
+            self.logger.error(f"❌ 工作流程执行失败")
+            self.logger.error(f"💥 失败原因: {error['message']}")
+            self.logger.error(f"💥 失败步骤: {error['failed_step']}")
+            self.logger.error(f"⏱️  执行耗时: {summary['total_duration']} 秒")
+            self.logger.error(f"✅ 已完成步骤: {', '.join(summary['completed_steps']) if summary['completed_steps'] else '无'}")
+        
+        self.logger.info("=" * 80)
+
+
+# 便捷的命令行接口
+def setup_argument_parser():
+    """设置命令行参数解析器"""
+    import argparse
+    
+    parser = argparse.ArgumentParser(
+        description="Schema工作流编排器 - 端到端的Schema处理流程",
+        formatter_class=argparse.RawDescriptionHelpFormatter,
+        epilog="""
+示例用法:
+  # 完整工作流程
+  python -m schema_tools.schema_workflow_orchestrator \\
+    --db-connection "postgresql://user:pass@localhost:5432/dbname" \\
+    --table-list tables.txt \\
+    --business-context "高速公路服务区管理系统" \\
+    --db-name highway_db \\
+    --output-dir ./output
+  
+  # 跳过SQL验证
+  python -m schema_tools.schema_workflow_orchestrator \\
+    --db-connection "postgresql://user:pass@localhost:5432/dbname" \\
+    --table-list tables.txt \\
+    --business-context "电商系统" \\
+    --db-name ecommerce_db \\
+    --skip-validation
+  
+  # 禁用LLM修复
+  python -m schema_tools.schema_workflow_orchestrator \\
+    --db-connection "postgresql://user:pass@localhost:5432/dbname" \\
+    --table-list tables.txt \\
+    --business-context "管理系统" \\
+    --db-name management_db \\
+    --disable-llm-repair
+        """
+    )
+    
+    # 必需参数
+    parser.add_argument(
+        "--db-connection",
+        required=True,
+        help="数据库连接字符串 (postgresql://user:pass@host:port/dbname)"
+    )
+    
+    parser.add_argument(
+        "--table-list",
+        required=True,
+        help="表清单文件路径"
+    )
+    
+    parser.add_argument(
+        "--business-context",
+        required=True,
+        help="业务上下文描述"
+    )
+    
+    parser.add_argument(
+        "--db-name",
+        required=True,
+        help="数据库名称(用于生成文件名)"
+    )
+    
+    # 可选参数
+    parser.add_argument(
+        "--output-dir",
+        default="./output",
+        help="输出目录(默认:./output)"
+    )
+    
+    parser.add_argument(
+        "--skip-validation",
+        action="store_true",
+        help="跳过SQL验证步骤"
+    )
+    
+    parser.add_argument(
+        "--disable-llm-repair",
+        action="store_true",
+        help="禁用LLM修复功能"
+    )
+    
+    parser.add_argument(
+        "--no-modify-file",
+        action="store_true",
+        help="不修改原始JSON文件(仅生成报告)"
+    )
+    
+    parser.add_argument(
+        "--verbose", "-v",
+        action="store_true",
+        help="启用详细日志输出"
+    )
+    
+    parser.add_argument(
+        "--log-file",
+        help="日志文件路径"
+    )
+    
+    return parser
+
+
+async def main():
+    """命令行入口点"""
+    import sys
+    import os
+    
+    parser = setup_argument_parser()
+    args = parser.parse_args()
+    
+    # 设置日志
+    setup_logging(
+        verbose=args.verbose,
+        log_file=args.log_file,
+        log_dir=os.path.join(args.output_dir, 'logs') if args.output_dir else None
+    )
+    
+    # 验证输入文件
+    if not os.path.exists(args.table_list):
+        print(f"错误: 表清单文件不存在: {args.table_list}")
+        sys.exit(1)
+    
+    try:
+        # 创建并执行工作流编排器
+        orchestrator = SchemaWorkflowOrchestrator(
+            db_connection=args.db_connection,
+            table_list_file=args.table_list,
+            business_context=args.business_context,
+            db_name=args.db_name,
+            output_dir=args.output_dir,
+            enable_sql_validation=not args.skip_validation,
+            enable_llm_repair=not args.disable_llm_repair,
+            modify_original_file=not args.no_modify_file
+        )
+        
+        # 显示启动信息
+        print(f"🚀 开始执行Schema工作流编排...")
+        print(f"📁 输出目录: {args.output_dir}")
+        print(f"📋 表清单: {args.table_list}")
+        print(f"🏢 业务背景: {args.business_context}")
+        print(f"💾 数据库: {args.db_name}")
+        print(f"🔍 SQL验证: {'启用' if not args.skip_validation else '禁用'}")
+        print(f"🔧 LLM修复: {'启用' if not args.disable_llm_repair else '禁用'}")
+        
+        # 执行完整工作流程
+        report = await orchestrator.execute_complete_workflow()
+        
+        # 打印详细摘要
+        orchestrator.print_final_summary(report)
+        
+        # 输出结果并设置退出码
+        if report["success"]:
+            if report["processing_results"].get("sql_validation", {}).get("success_rate", 1.0) >= 0.8:
+                print(f"\n🎉 工作流程执行成功!")
+                exit_code = 0  # 完全成功
+            else:
+                print(f"\n⚠️  工作流程执行完成,但SQL验证成功率较低")
+                exit_code = 1  # 部分成功
+        else:
+            print(f"\n❌ 工作流程执行失败")
+            exit_code = 2  # 失败
+        
+        print(f"📄 主要输出文件: {report['final_outputs']['primary_output_file']}")
+        sys.exit(exit_code)
+        
+    except KeyboardInterrupt:
+        print("\n\n⏹️  用户中断,程序退出")
+        sys.exit(130)
+    except Exception as e:
+        print(f"\n❌ 程序执行失败: {e}")
+        if args.verbose:
+            import traceback
+            traceback.print_exc()
+        sys.exit(1)
+
+
+if __name__ == "__main__":
+    asyncio.run(main()) 

+ 46 - 14
schema_tools/sql_validator.py

@@ -23,11 +23,11 @@ def setup_argument_parser():
   # 基本使用(仅验证,不修改文件)
   python -m schema_tools.sql_validator --db-connection "postgresql://user:pass@localhost:5432/dbname" --input-file ./data.json
   
-  # 仅删除无效SQL,不进行LLM修复
-  python -m schema_tools.sql_validator --db-connection "postgresql://user:pass@localhost:5432/dbname" --input-file ./data.json --modify-original-file
+  # 启用文件修改,但禁用LLM修复(仅删除无效SQL)
+  python -m schema_tools.sql_validator --db-connection "postgresql://user:pass@localhost:5432/dbname" --input-file ./data.json --modify-original-file --disable-llm-repair
   
-  # 启用LLM修复功能(需要同时指定文件修改参数)
-  python -m schema_tools.sql_validator --db-connection "postgresql://user:pass@localhost:5432/dbname" --input-file ./data.json --enable-llm-repair --modify-original-file
+  # 启用文件修改和LLM修复功能
+  python -m schema_tools.sql_validator --db-connection "postgresql://user:pass@localhost:5432/dbname" --input-file ./data.json --modify-original-file
   
   # 指定输出目录
   python -m schema_tools.sql_validator --db-connection "postgresql://user:pass@localhost:5432/dbname" --input-file ./data.json --output-dir ./reports
@@ -97,16 +97,30 @@ def setup_argument_parser():
         help='同时保存详细的JSON报告'
     )
     
+    parser.add_argument(
+        '--disable-llm-repair',
+        action='store_true',
+        help='禁用LLM自动修复功能'
+    )
+    
+    # 向后兼容的别名参数
     parser.add_argument(
         '--enable-llm-repair',
         action='store_true',
-        help='启用LLM自动修复功能'
+        help='启用LLM自动修复功能(与--disable-llm-repair相反,保持向后兼容性)'
     )
     
+    parser.add_argument(
+        '--no-modify-file',
+        action='store_true',
+        help='不修改原始JSON文件(仅生成验证报告)'
+    )
+    
+    # 向后兼容的别名参数
     parser.add_argument(
         '--modify-original-file',
         action='store_true',
-        help='修改原始JSON文件(删除无效SQL,如果启用LLM修复则同时更新修复后的SQL)'
+        help='修改原始JSON文件(与--no-modify-file相反,保持向后兼容性)'
     )
     
     return parser
@@ -134,19 +148,37 @@ def apply_config_overrides(args):
         sql_config['save_detailed_json_report'] = True
         print(f"启用详细JSON报告保存")
     
-    if args.enable_llm_repair:
-        sql_config['enable_sql_repair'] = True
-        print(f"启用LLM自动修复功能")
-    else:
+    # 注意:现在是disable_llm_repair,逻辑反转,同时支持向后兼容的enable_llm_repair
+    if args.disable_llm_repair and args.enable_llm_repair:
+        print("警告: --disable-llm-repair 和 --enable-llm-repair 不能同时使用,优先使用 --disable-llm-repair")
         sql_config['enable_sql_repair'] = False
         print(f"LLM修复功能已禁用")
-    
-    if args.modify_original_file:
-        sql_config['modify_original_file'] = True
-        print(f"启用原文件修改功能")
+    elif args.disable_llm_repair:
+        sql_config['enable_sql_repair'] = False
+        print(f"LLM修复功能已禁用")
+    elif args.enable_llm_repair:
+        sql_config['enable_sql_repair'] = True
+        print(f"启用LLM自动修复功能(向后兼容参数)")
     else:
+        # 默认启用LLM修复功能
+        sql_config['enable_sql_repair'] = True
+        print(f"启用LLM自动修复功能(默认行为)")
+    
+    # 注意:现在是no_modify_file,逻辑反转,同时支持向后兼容的modify_original_file
+    if args.no_modify_file and args.modify_original_file:
+        print("警告: --no-modify-file 和 --modify-original-file 不能同时使用,优先使用 --no-modify-file")
+        sql_config['modify_original_file'] = False
+        print(f"不修改原文件")
+    elif args.no_modify_file:
         sql_config['modify_original_file'] = False
         print(f"不修改原文件")
+    elif args.modify_original_file:
+        sql_config['modify_original_file'] = True
+        print(f"启用原文件修改功能(向后兼容参数)")
+    else:
+        # 默认启用文件修改功能
+        sql_config['modify_original_file'] = True
+        print(f"启用原文件修改功能(默认行为)")
 
 
 async def main():

+ 313 - 0
schema_tools/workflow_example.py

@@ -0,0 +1,313 @@
+"""
+Schema工作流编排器使用示例
+演示如何使用SchemaWorkflowOrchestrator执行完整的工作流程
+"""
+
+import asyncio
+import sys
+import os
+from pathlib import Path
+
+# 添加项目根目录到Python路径
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from schema_tools.schema_workflow_orchestrator import SchemaWorkflowOrchestrator
+from schema_tools.utils.logger import setup_logging
+
+
+async def example_complete_workflow():
+    """完整工作流程示例"""
+    print("=" * 60)
+    print("完整工作流程示例")
+    print("=" * 60)
+    
+    # 设置日志
+    setup_logging(verbose=True)
+    
+    # 配置参数
+    db_connection = "postgresql://user:password@localhost:5432/test_db"
+    table_list_file = "schema_tools/tables.txt"
+    business_context = "高速公路服务区管理系统"
+    db_name = "highway_db"
+    output_dir = "./example_output"
+    
+    try:
+        # 创建工作流编排器
+        orchestrator = SchemaWorkflowOrchestrator(
+            db_connection=db_connection,
+            table_list_file=table_list_file,
+            business_context=business_context,
+            db_name=db_name,
+            output_dir=output_dir,
+            enable_sql_validation=True,
+            enable_llm_repair=True,
+            modify_original_file=True
+        )
+        
+        print(f"🚀 开始执行完整工作流程...")
+        print(f"📁 输出目录: {output_dir}")
+        print(f"🏢 业务背景: {business_context}")
+        print(f"💾 数据库: {db_name}")
+        
+        # 执行完整工作流程
+        report = await orchestrator.execute_complete_workflow()
+        
+        # 打印详细摘要
+        orchestrator.print_final_summary(report)
+        
+        # 分析结果
+        if report["success"]:
+            print(f"\n🎉 工作流程执行成功!")
+            
+            # 显示各步骤详情
+            results = report["processing_results"]
+            
+            if "ddl_md_generation" in results:
+                ddl_md = results["ddl_md_generation"]
+                print(f"📋 步骤1 - DDL/MD生成:")
+                print(f"   处理表数: {ddl_md.get('processed_successfully', 0)}")
+                print(f"   生成文件: {ddl_md.get('files_generated', 0)}")
+                print(f"   耗时: {ddl_md.get('duration', 0):.2f}秒")
+            
+            if "question_sql_generation" in results:
+                qs = results["question_sql_generation"]
+                print(f"🤖 步骤2 - Question-SQL生成:")
+                print(f"   生成主题: {qs.get('total_themes', 0)}")
+                print(f"   成功主题: {qs.get('successful_themes', 0)}")
+                print(f"   问答对数: {qs.get('total_questions', 0)}")
+                print(f"   耗时: {qs.get('duration', 0):.2f}秒")
+            
+            if "sql_validation" in results:
+                validation = results["sql_validation"]
+                print(f"🔍 步骤3 - SQL验证:")
+                print(f"   原始SQL数: {validation.get('original_sql_count', 0)}")
+                print(f"   有效SQL数: {validation.get('valid_sql_count', 0)}")
+                print(f"   成功率: {validation.get('success_rate', 0):.1%}")
+                print(f"   耗时: {validation.get('duration', 0):.2f}秒")
+            
+            outputs = report["final_outputs"]
+            print(f"\n📄 最终输出:")
+            print(f"   主要文件: {outputs['primary_output_file']}")
+            print(f"   问题总数: {outputs['final_question_count']}")
+            
+        else:
+            print(f"\n❌ 工作流程执行失败:")
+            error = report["error"]
+            print(f"   失败步骤: {error['failed_step']}")
+            print(f"   错误信息: {error['message']}")
+            
+            # 显示已完成的步骤
+            completed = report["workflow_summary"]["completed_steps"]
+            if completed:
+                print(f"   已完成步骤: {', '.join(completed)}")
+        
+    except Exception as e:
+        print(f"\n❌ 示例执行失败: {e}")
+        import traceback
+        traceback.print_exc()
+
+
+async def example_skip_validation():
+    """跳过验证的工作流程示例"""
+    print("=" * 60)
+    print("跳过验证的工作流程示例")
+    print("=" * 60)
+    
+    # 设置日志
+    setup_logging(verbose=True)
+    
+    # 配置参数(跳过SQL验证)
+    db_connection = "postgresql://user:password@localhost:5432/test_db"
+    table_list_file = "schema_tools/tables.txt"
+    business_context = "电商系统"
+    db_name = "ecommerce_db"
+    output_dir = "./example_output_no_validation"
+    
+    try:
+        # 创建工作流编排器(跳过验证)
+        orchestrator = SchemaWorkflowOrchestrator(
+            db_connection=db_connection,
+            table_list_file=table_list_file,
+            business_context=business_context,
+            db_name=db_name,
+            output_dir=output_dir,
+            enable_sql_validation=False,  # 跳过SQL验证
+            enable_llm_repair=False,
+            modify_original_file=False
+        )
+        
+        print(f"🚀 开始执行工作流程(跳过验证)...")
+        
+        # 执行工作流程
+        report = await orchestrator.execute_complete_workflow()
+        
+        # 打印摘要
+        orchestrator.print_final_summary(report)
+        
+        print(f"\n📊 执行结果:")
+        print(f"   成功: {'是' if report['success'] else '否'}")
+        print(f"   完成步骤数: {len(report['workflow_summary']['completed_steps'])}")
+        print(f"   总耗时: {report['workflow_summary']['total_duration']}秒")
+        
+    except Exception as e:
+        print(f"\n❌ 示例执行失败: {e}")
+
+
+async def example_error_handling():
+    """错误处理示例"""
+    print("=" * 60)
+    print("错误处理示例")
+    print("=" * 60)
+    
+    # 设置日志
+    setup_logging(verbose=True)
+    
+    # 故意使用错误的配置来演示错误处理
+    db_connection = "postgresql://invalid:invalid@localhost:5432/invalid_db"
+    table_list_file = "nonexistent_tables.txt"
+    business_context = "测试系统"
+    db_name = "test_db"
+    output_dir = "./example_error_output"
+    
+    try:
+        # 创建工作流编排器
+        orchestrator = SchemaWorkflowOrchestrator(
+            db_connection=db_connection,
+            table_list_file=table_list_file,
+            business_context=business_context,
+            db_name=db_name,
+            output_dir=output_dir
+        )
+        
+        print(f"🚀 开始执行工作流程(故意触发错误)...")
+        
+        # 执行工作流程
+        report = await orchestrator.execute_complete_workflow()
+        
+        # 分析错误报告
+        if not report["success"]:
+            print(f"\n🔍 错误分析:")
+            error = report["error"]
+            print(f"   错误类型: {error['type']}")
+            print(f"   错误信息: {error['message']}")
+            print(f"   失败步骤: {error['failed_step']}")
+            
+            # 显示部分结果
+            partial = report.get("partial_results", {})
+            if partial:
+                print(f"   部分结果: {list(partial.keys())}")
+        
+    except Exception as e:
+        print(f"\n❌ 预期的错误: {e}")
+        print("这是演示错误处理的正常情况")
+
+
+def show_usage_examples():
+    """显示使用示例"""
+    print("=" * 60)
+    print("SchemaWorkflowOrchestrator 使用示例")
+    print("=" * 60)
+    
+    examples = [
+        {
+            "title": "1. 编程方式 - 完整工作流程",
+            "code": """
+import asyncio
+from schema_tools.schema_workflow_orchestrator import SchemaWorkflowOrchestrator
+
+async def run_complete_workflow():
+    orchestrator = SchemaWorkflowOrchestrator(
+        db_connection="postgresql://user:pass@localhost:5432/dbname",
+        table_list_file="tables.txt",
+        business_context="高速公路服务区管理系统",
+        db_name="highway_db",
+        output_dir="./output"
+    )
+    
+    # 一键执行完整流程
+    report = await orchestrator.execute_complete_workflow()
+    
+    if report["success"]:
+        print(f"✅ 编排完成!最终生成 {report['final_outputs']['final_question_count']} 个问答对")
+        print(f"📄 输出文件: {report['final_outputs']['primary_output_file']}")
+    else:
+        print(f"❌ 编排失败: {report['error']['message']}")
+
+asyncio.run(run_complete_workflow())
+            """
+        },
+        {
+            "title": "2. 命令行方式 - 完整工作流程",
+            "code": """
+python -m schema_tools.schema_workflow_orchestrator \\
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \\
+  --table-list tables.txt \\
+  --business-context "高速公路服务区管理系统" \\
+  --db-name highway_db \\
+  --output-dir ./output
+            """
+        },
+        {
+            "title": "3. 命令行方式 - 跳过验证",
+            "code": """
+python -m schema_tools.schema_workflow_orchestrator \\
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \\
+  --table-list tables.txt \\
+  --business-context "电商系统" \\
+  --db-name ecommerce_db \\
+  --skip-validation
+            """
+        },
+        {
+            "title": "4. 命令行方式 - 禁用LLM修复",
+            "code": """
+python -m schema_tools.schema_workflow_orchestrator \\
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \\
+  --table-list tables.txt \\
+  --business-context "管理系统" \\
+  --db-name management_db \\
+  --disable-llm-repair \\
+  --verbose
+            """
+        }
+    ]
+    
+    for example in examples:
+        print(f"\n{example['title']}:")
+        print(example['code'])
+
+
+async def main():
+    """主函数"""
+    print("Schema工作流编排器使用示例")
+    print("请选择要运行的示例:")
+    print("1. 完整工作流程示例")
+    print("2. 跳过验证的工作流程示例")
+    print("3. 错误处理示例")
+    print("4. 显示使用示例代码")
+    print("0. 退出")
+    
+    try:
+        choice = input("\n请输入选择 (0-4): ").strip()
+        
+        if choice == "1":
+            await example_complete_workflow()
+        elif choice == "2":
+            await example_skip_validation()
+        elif choice == "3":
+            await example_error_handling()
+        elif choice == "4":
+            show_usage_examples()
+        elif choice == "0":
+            print("退出示例程序")
+        else:
+            print("无效选择")
+    
+    except KeyboardInterrupt:
+        print("\n\n用户中断,退出程序")
+    except Exception as e:
+        print(f"\n示例执行失败: {e}")
+
+
+if __name__ == "__main__":
+    asyncio.run(main())