Forráskód Böngészése

对pair.json中的sql的自动校验和自动修正模块开发完成。

wangxq 1 hete
szülő
commit
fddea86e86

+ 421 - 0
docs/sql_validation_guide.md

@@ -0,0 +1,421 @@
+# SQL验证器使用指南
+
+SQL验证器是Schema Tools的一个独立模块,用于验证Question-SQL对中的SQL语句是否有效。它通过执行`EXPLAIN`语句来检测SQL语法错误和表结构问题。
+
+## 功能特性
+
+- 🔍 使用PostgreSQL的EXPLAIN语句验证SQL有效性
+- ⚡ 支持并发验证,大幅提升验证效率
+- 🔒 只读模式运行,安全可靠
+- 📊 详细的验证报告和统计信息
+- 🔄 自动重试机制,处理临时网络问题
+- 📁 支持批处理,适合大量SQL验证
+- 🔧 灵活的配置选项
+- 🤖 **新增**:LLM自动修复错误SQL(可选启用)
+- 📝 **新增**:自动修改原始JSON文件,更新修复后的SQL
+- 🗑️ **新增**:自动删除无法修复的无效SQL条目
+- 💾 **新增**:自动创建备份文件,保护原始数据
+
+## 使用方法
+
+### 1. 命令行使用
+
+#### 基本用法
+```bash
+python -m schema_tools.sql_validator \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --input-file ./qs_highway_db_20240101_143052_pair.json
+```
+
+#### 指定输出目录
+```bash
+python -m schema_tools.sql_validator \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --input-file ./data.json \
+  --output-dir ./validation_reports
+```
+
+#### 调整验证参数
+```bash
+python -m schema_tools.sql_validator \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --input-file ./data.json \
+  --max-concurrent 10 \
+  --batch-size 20 \
+  --timeout 60 \
+  --verbose
+```
+
+#### 预检查模式
+```bash
+python -m schema_tools.sql_validator \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --input-file ./data.json \
+  --dry-run
+```
+
+### 高级选项
+
+```bash
+# 调整性能参数
+python -m schema_tools.sql_validator \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --input-file ./data.json \
+  --max-concurrent 10 \
+  --batch-size 20 \
+  --timeout 60 \
+  --verbose
+
+# 基本使用(仅验证,不修改文件)
+python -m schema_tools.sql_validator \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --input-file ./data.json
+
+# 启用LLM修复功能
+python -m schema_tools.sql_validator \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --input-file ./data.json \
+  --enable-llm-repair
+
+# 预检查模式(仅验证文件格式)
+python -m schema_tools.sql_validator \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --input-file ./data.json \
+  --dry-run
+```
+
+### 2. 编程接口使用
+
+```python
+import asyncio
+from schema_tools import SQLValidationAgent
+
+async def validate_sqls():
+    agent = SQLValidationAgent(
+        db_connection="postgresql://user:pass@localhost:5432/dbname",
+        input_file="./qs_highway_db_20240101_143052_pair.json",
+        output_dir="./validation_reports"
+    )
+    
+    report = await agent.validate()
+    
+    print(f"验证完成:")
+    print(f"  总SQL: {report['summary']['total_questions']}")
+    print(f"  有效: {report['summary']['valid_sqls']}")
+    print(f"  成功率: {report['summary']['success_rate']:.1%}")
+
+asyncio.run(validate_sqls())
+```
+
+## 文件自动修改功能
+
+### 功能说明
+SQL验证器现在支持自动修改原始JSON文件:
+
+1. **修复成功的SQL**:直接更新原始文件中的SQL内容
+2. **无法修复的SQL**:从原始文件中删除对应的Question-SQL对
+3. **自动备份**:修改前自动创建备份文件
+
+### 默认行为
+```bash
+# 默认仅验证,不修改原文件
+python -m schema_tools.sql_validator \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --input-file ./data.json
+```
+
+执行后:
+- 生成验证报告:`sql_validation_时间戳_summary.txt`
+- 不修改原始文件
+
+### 仅删除无效SQL
+
+```bash
+# 仅删除无效SQL,不进行LLM修复
+python -m schema_tools.sql_validator \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --input-file ./data.json \
+  --modify-original-file
+```
+
+执行后:
+- 创建备份文件:`data.json.backup`
+- 修改原文件:`data.json`(删除验证失败的SQL)
+- 生成修改日志:`file_modifications_时间戳.log`
+- 生成验证报告:`sql_validation_时间戳_summary.txt`
+
+### 启用LLM修复功能
+
+```bash
+# 启用LLM修复功能(需要同时指定文件修改参数)
+python -m schema_tools.sql_validator \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --input-file ./data.json \
+  --enable-llm-repair \
+  --modify-original-file
+```
+
+执行后:
+- 创建备份文件:`data.json.backup`
+- 修改原文件:`data.json`(更新修复成功的SQL,删除无法修复的SQL)
+- 生成修改日志:`file_modifications_时间戳.log`
+- 生成验证报告:`sql_validation_时间戳_summary.txt`
+
+### 修改日志
+每次修改都会生成详细的修改日志文件:
+
+```
+原始JSON文件修改日志
+==================================================
+修改时间: 2024-01-01 15:30:45
+原始文件: ./qs_highway_db_20240101_143052_pair.json
+备份文件: ./qs_highway_db_20240101_143052_pair.json.backup
+
+修改的SQL (3个):
+----------------------------------------
+1. 索引: 5
+   问题: 查询订单与车辆数据的相关性分析?
+   原SQL: SELECT EXTRACT(year FROM oper_date) AS 年份, COUNT(*) FROM bss_business_day_data GROUP BY 年份;
+   新SQL: SELECT EXTRACT(year FROM oper_date)::integer AS 年份, COUNT(*) FROM bss_business_day_data GROUP BY EXTRACT(year FROM oper_date);
+
+删除的无效项 (2个):
+----------------------------------------
+1. 索引: 12
+   问题: 查询不存在的表数据?
+   SQL: SELECT * FROM non_existent_table;
+   错误: relation "non_existent_table" does not exist
+```
+```
+
+## 输入文件格式
+
+SQL验证器接受标准的Question-SQL对JSON文件,格式如下:
+
+```json
+[
+  {
+    "question": "按服务区统计每日营收趋势(最近30天)?",
+    "sql": "SELECT service_name AS 服务区, oper_date AS 营业日期, SUM(pay_sum) AS 每日营收 FROM bss_business_day_data WHERE oper_date >= CURRENT_DATE - INTERVAL '30 day' AND delete_ts IS NULL GROUP BY service_name, oper_date ORDER BY 营业日期 ASC;"
+  },
+  {
+    "question": "查看车流量最大的前10个服务区",
+    "sql": "SELECT service_name AS 服务区, SUM(car_count) AS 总车流量 FROM bss_car_day_count WHERE delete_ts IS NULL GROUP BY service_name ORDER BY 总车流量 DESC LIMIT 10;"
+  }
+]
+```
+
+## 输出报告
+
+### 默认输出
+验证完成后,默认只生成一个**文本摘要报告**:
+- 文件名格式:`sql_validation_时间戳_summary.txt`
+- 包含验证统计信息和**完整的错误SQL**
+- 便于直接查看和分析错误
+
+### 可选JSON报告
+如果需要程序化处理验证结果,可以启用详细JSON报告:
+
+```bash
+# 启用JSON报告
+python -m schema_tools.sql_validator \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --input-file ./data.json \
+  --save-json
+```
+
+### 报告内容
+
+#### 文本摘要报告包含:
+1. **验证统计**:总数、成功率、平均耗时等
+2. **SQL修复统计**:修复尝试数、成功数、失败数、修复成功率
+3. **原始文件修改统计**:修改的SQL数量、删除的无效项数量、修改失败数量
+4. **完整错误详情**:
+   - 问题描述
+   - 错误信息
+   - **完整SQL语句**(不截断)
+   - LLM修复尝试结果
+   - 修复后的SQL(如果成功)
+   - 重试次数
+5. **成功修复的SQL列表**:显示原始错误和修复后的SQL
+
+#### JSON报告包含:
+- 所有验证结果的详细数据
+- 适合程序化分析和处理
+
+## 示例输出
+
+### 验证过程日志
+```
+🚀 开始SQL验证...
+📁 输入文件: ./qs_highway_db_20240101_143052_pair.json
+🔗 数据库: postgresql://***:***@localhost:5432/highway_db
+📦 处理批次 1/5 (10 个SQL)
+✅ 批次 1 完成: 9/10 有效
+📊 验证报告已保存: output/sql_validation_20240101_150000_summary.txt
+
+🎉 验证完成,成功率: 90.0%
+📊 详细结果: 45/50 SQL有效
+```
+
+### 文本摘要报告示例
+```
+SQL验证报告
+==================================================
+
+输入文件: ./qs_highway_db_20240101_143052_pair.json
+验证时间: 2024-01-01T15:00:00
+验证耗时: 2.45秒
+
+验证结果摘要:
+  总SQL数量: 50
+  有效SQL: 45
+  无效SQL: 5
+  成功率: 90.00%
+  平均耗时: 0.049秒
+  重试次数: 0
+
+SQL修复统计:
+  尝试修复: 5
+  修复成功: 3
+  修复失败: 2
+  修复成功率: 60.00%
+
+原始文件修改统计:
+  修改的SQL: 3
+  删除的无效项: 2
+  修改失败: 0
+
+错误详情(共2个):
+==================================================
+
+1. 问题: 查询订单与车辆数据的相关性分析?  
+   错误: 函数 round(double precision, integer) 不存在
+   LLM修复尝试: 失败
+   修复失败原因: 无法识别正确的函数语法
+   完整SQL:
+   SELECT a.oper_date, a.order_sum AS 订单数量, b.customer_count AS 车辆数量, 
+          ROUND(CORR(a.order_sum, b.customer_count), 2) AS 相关系数
+   FROM bss_business_day_data a 
+   JOIN bss_car_day_count b ON a.oper_date = b.count_date;
+   ----------------------------------------
+
+成功修复的SQL(共3个):
+==================================================
+
+1. 问题: 各区域管理公司单位能耗产出对比(需结合能耗表)
+   原始错误: 对于表"e",丢失FROM子句项
+   修复后SQL:
+   SELECT c.company_name AS 公司名称,
+       ROUND(SUM(b.pay_sum)::numeric/SUM(b.order_sum)::numeric, 2) AS 单位订单产出
+   FROM bss_company c
+   JOIN bss_service_area sa ON c.id = sa.company_id
+   JOIN bss_business_day_data b ON sa.service_area_no = b.service_no
+   WHERE b.oper_date BETWEEN '2023-01-01' AND '2023-03-31'
+   GROUP BY c.company_name
+   ORDER BY 单位订单产出 DESC;
+   ----------------------------------------
+```
+
+## 配置选项
+
+SQL验证器的配置位于 `schema_tools/config.py` 中:
+
+```python
+"sql_validation": {
+    "reuse_connection_pool": True,       # 复用现有连接池
+    "max_concurrent_validations": 5,     # 并发验证数
+    "validation_timeout": 30,            # 单个验证超时(秒)
+    "batch_size": 10,                    # 批处理大小
+    "continue_on_error": True,           # 错误时是否继续
+    "save_validation_report": True,      # 保存验证报告
+    "readonly_mode": True,               # 启用只读模式
+    "max_retry_count": 2,                # 验证失败重试次数
+    "report_file_prefix": "sql_validation",  # 报告文件前缀
+}
+```
+
+## 命令行参数说明
+
+| 参数 | 类型 | 必需 | 说明 |
+|------|------|------|------|
+| `--db-connection` | string | 是 | PostgreSQL数据库连接字符串 |
+| `--input-file` | string | 是 | 输入的JSON文件路径 |
+| `--output-dir` | string | 否 | 验证报告输出目录 |
+| `--max-concurrent` | int | 否 | 最大并发验证数 |
+| `--batch-size` | int | 否 | 批处理大小 |
+| `--timeout` | int | 否 | 单个SQL验证超时时间(秒) |
+| `--verbose` | flag | 否 | 启用详细日志输出 |
+| `--log-file` | string | 否 | 日志文件路径 |
+| `--dry-run` | flag | 否 | 仅读取和解析文件,不执行验证 |
+
+## 错误处理机制
+
+1. **超时处理**:单个SQL验证超过配置的超时时间会被标记为失败
+2. **重试机制**:网络相关错误会自动重试,最多重试2次
+3. **错误容忍**:单个SQL失败不会中断整个验证流程
+4. **批处理隔离**:批次间相互独立,一个批次失败不影响其他批次
+
+## 性能优化建议
+
+1. **并发调优**:根据数据库性能调整 `max_concurrent_validations`
+2. **批处理优化**:大文件建议增加 `batch_size` 以减少内存占用
+3. **超时设置**:复杂SQL可适当增加 `validation_timeout`
+4. **连接池复用**:启用 `reuse_connection_pool` 以减少连接开销
+
+## 安全说明
+
+- SQL验证器运行在只读模式下,不会修改数据库
+- 使用EXPLAIN语句进行验证,不会执行实际的数据操作
+- 敏感信息(如密码)在日志中会被自动隐藏
+
+## 故障排除
+
+### 常见问题
+
+1. **连接失败**
+   ```
+   解决:检查数据库连接字符串和网络连通性
+   ```
+
+2. **权限不足**
+   ```
+   解决:确保数据库用户有SELECT权限
+   ```
+
+3. **表不存在**
+   ```
+   解决:检查SQL中的表名是否正确,注意schema前缀
+   ```
+
+4. **语法错误**
+   ```
+   解决:检查SQL语法,注意PostgreSQL的语法特性
+   ```
+
+### 调试技巧
+
+1. 使用 `--verbose` 获取详细日志
+2. 使用 `--dry-run` 预检查文件格式
+3. 减小 `--batch-size` 以便定位问题SQL
+4. 检查验证报告中的错误详情
+
+## 与其他模块的集成
+
+SQL验证器可以与Schema Tools的其他模块无缝集成:
+
+```python
+# 先生成Question-SQL对
+from schema_tools import QuestionSQLGenerationAgent
+
+qs_agent = QuestionSQLGenerationAgent(...)
+qs_report = await qs_agent.generate()
+
+# 然后验证生成的SQL
+from schema_tools import SQLValidationAgent
+
+sql_agent = SQLValidationAgent(
+    db_connection="...",
+    input_file=qs_report['output_file']
+)
+validation_report = await sql_agent.validate()
+``` 

+ 1 - 17
output/qs_highway_db_20250623_192120_pair.json

@@ -35,10 +35,6 @@
     "question": "庐山服务区各档口收入排名(按支付金额降序排列)",
     "sql": "SELECT branch_name AS 档口名称, SUM(pay_sum) AS 累计收入 FROM bss_business_day_data WHERE delete_ts IS NULL AND service_name = '庐山服务区' GROUP BY branch_name ORDER BY 累计收入 DESC;"
   },
-  {
-    "question": "车辆数量与订单量相关性分析(关联车流数据)",
-    "sql": "SELECT a.oper_date, a.order_sum AS 订单数量, b.customer_count AS 车辆数量, ROUND(CORR(a.order_sum, b.customer_count),2) AS 相关系数 FROM (SELECT oper_date, SUM(order_sum) AS order_sum FROM bss_business_day_data WHERE delete_ts IS NULL GROUP BY oper_date) a JOIN (SELECT count_date, SUM(customer_count) AS customer_count FROM bss_car_day_count GROUP BY count_date) b ON a.oper_date = b.count_date GROUP BY a.oper_date;"
-  },
   {
     "question": "分析不同车辆类型(危化品/城际/过境/其他)对应的服务区平均消费金额差异",
     "sql": "SELECT c.car_type AS 车辆类型, AVG(b.pay_sum) AS 平均消费金额 FROM bss_business_day_data b JOIN bss_service_area_mapper m ON b.service_name = m.service_name AND m.delete_ts IS NULL JOIN bss_car_day_count c ON m.service_area_id = c.service_area_id AND b.oper_date = c.count_date WHERE b.delete_ts IS NULL AND c.delete_ts IS NULL GROUP BY c.car_type;"
@@ -67,18 +63,10 @@
     "question": "统计各服务区档口微信支付占比(微信金额/总支付金额)与车流高峰时段的关系",
     "sql": "SELECT m.service_name AS 服务区名称, b.branch_name AS 档口名称, (b.wx/SUM(b.pay_sum) OVER(PARTITION BY b.service_name, b.oper_date)) * 100 AS 微信占比, CASE WHEN c.customer_count > (SELECT PERCENTILE_CONT(0.75) WITHIN GROUP(ORDER BY customer_count) FROM bss_car_day_count) THEN '高峰时段' ELSE '非高峰' END AS 车流时段 FROM bss_business_day_data b JOIN bss_service_area_mapper m ON b.service_name = m.service_name AND m.delete_ts IS NULL JOIN bss_car_day_count c ON m.service_area_id = c.service_area_id AND b.oper_date = c.count_date WHERE b.delete_ts IS NULL AND c.delete_ts IS NULL;"
   },
-  {
-    "question": "预测未来一周各服务区基于历史车流与消费数据的消费趋势(使用线性回归)",
-    "sql": "SELECT service_area_id AS 服务区ID, date, REGR_INTERCEPT(pay_sum, day_num) + REGR_SLOPE(pay_sum, day_num) * EXTRACT(EPOCH FROM date)/86400 AS 预测消费 FROM (SELECT c.service_area_id, b.oper_date AS date, EXTRACT(EPOCH FROM b.oper_date - CURRENT_DATE + 7) AS day_num, b.pay_sum FROM bss_car_day_count c JOIN bss_service_area_mapper m ON c.service_area_id = m.service_area_id AND m.delete_ts IS NULL JOIN bss_business_day_data b ON m.service_name = b.service_name AND c.count_date = b.oper_date WHERE b.oper_date BETWEEN CURRENT_DATE - 30 AND CURRENT_DATE AND c.delete_ts IS NULL AND b.delete_ts IS NULL) sub GROUP BY service_area_id, date;"
-  },
   {
     "question": "找出最近3天连续出现车流量下降超过15%且消费金额波动异常的服务区",
     "sql": "WITH daily_diff AS (SELECT service_area_id, count_date, customer_count - LAG(customer_count,1) OVER(PARTITION BY service_area_id ORDER BY count_date) AS flow_diff, (customer_count - LAG(customer_count,1) OVER(PARTITION BY service_area_id ORDER BY count_date))/NULLIF(LAG(customer_count,1) OVER(PARTITION BY service_area_id ORDER BY count_date),0) * 100 AS flow_rate FROM bss_car_day_count WHERE delete_ts IS NULL) SELECT DISTINCT service_area_id AS 服务区ID FROM daily_diff WHERE count_date >= CURRENT_DATE -3 AND flow_rate < -15 GROUP BY service_area_id HAVING COUNT(*) =3;"
   },
-  {
-    "question": "分析不同档口类型(餐饮/零售/其他)的消费转化率与车流密度(车流量/营业面积)的关系",
-    "sql": "SELECT b.branch_type AS 档口类型, c.customer_count / NULLIF(s.area,0) AS 车流密度, SUM(b.order_sum)/SUM(c.customer_count) AS 转化率 FROM bss_business_day_data b JOIN (SELECT branch_name, CASE WHEN branch_name LIKE '%餐饮%' THEN '餐饮' WHEN branch_name LIKE '%零售%' THEN '零售' ELSE '其他' END AS branch_type FROM bss_business_day_data GROUP BY branch_name) t ON b.branch_name = t.branch_name JOIN bss_service_area_mapper m ON b.service_name = m.service_name AND m.delete_ts IS NULL JOIN bss_car_day_count c ON m.service_area_id = c.service_area_id AND b.oper_date = c.count_date JOIN (SELECT id, (RANDOM()*1000+500)::INT AS area FROM bss_service_area) s ON m.service_area_id = s.id WHERE b.delete_ts IS NULL AND c.delete_ts IS NULL GROUP BY ROLLUP(branch_type);"
-  },
   {
     "question": "各管理公司最近一个月平均每车次收入及订单转化率排名",
     "sql": "SELECT c.company_name AS 公司名称, \n       ROUND(SUM(b.pay_sum)/SUM(car.customer_count), 2) AS 单位车次收入,\n       ROUND(SUM(b.order_sum)*100/SUM(car.customer_count), 2) AS 订单转化率\nFROM bss_company c\nJOIN bss_service_area sa ON c.id = sa.company_id AND sa.delete_ts IS NULL\nJOIN bss_business_day_data b ON sa.service_area_no = b.service_no\nJOIN bss_car_day_count car ON sa.id = car.service_area_id\nWHERE b.oper_date BETWEEN '2023-03-01' AND '2023-03-31'\n  AND car.count_date = b.oper_date\nGROUP BY c.company_name\nORDER BY 单位车次收入 DESC\nLIMIT 10;"
@@ -107,10 +95,6 @@
     "question": "节假日与工作日运营效率差异对比(春节假期 vs 常规周)",
     "sql": "SELECT \n  CASE WHEN b.oper_date BETWEEN '2023-01-21' AND '2023-01-27' THEN '春节假期' ELSE '常规周' END AS 时段类型,\n  ROUND(AVG(b.pay_sum/car.customer_count), 2) AS 平均单车收入,\n  ROUND(AVG(b.order_sum/car.customer_count), 2) AS 平均转化率\nFROM bss_business_day_data b\nJOIN bss_car_day_count car ON b.service_no = car.service_area_id\nWHERE b.oper_date BETWEEN '2023-01-10' AND '2023-02-10'\n  AND car.count_date = b.oper_date\nGROUP BY \n  CASE WHEN b.oper_date BETWEEN '2023-01-21' AND '2023-01-27' THEN '春节假期' ELSE '常规周' END;"
   },
-  {
-    "question": "各区域管理公司单位能耗产出对比(需结合能耗表)",
-    "sql": "SELECT c.company_name AS 公司名称,\n       ROUND(SUM(b.pay_sum)/SUM(e.energy_consumption), 2) AS 单位能耗产出\nFROM bss_company c\nJOIN bss_service_area sa ON c.id = sa.company_id\nJOIN bss_business_day_data b ON sa.service_area_no = b.service_no\n-- 假设存在能耗表 energy_consumption_table e\n-- JOIN energy_consumption_table e ON sa.id = e.service_area_id\nWHERE b.oper_date BETWEEN '2023-01-01' AND '2023-03-31'\nGROUP BY c.company_name\nORDER BY 单位能耗产出 DESC;"
-  },
   {
     "question": "新入驻公司首月运营效率达标情况检查",
     "sql": "SELECT c.company_name AS 公司名称,\n       sa.service_area_name AS 服务区,\n       ROUND(SUM(b.pay_sum)/COUNT(DISTINCT b.oper_date), 2) AS 日均营收,\n       ROUND(SUM(b.order_sum)/COUNT(DISTINCT b.oper_date), 2) AS 日均订单量\nFROM bss_company c\nJOIN bss_service_area sa ON c.id = sa.company_id\nJOIN bss_business_day_data b ON sa.service_area_no = b.service_no\nWHERE sa.create_ts BETWEEN '2023-01-01' AND '2023-03-31'\nGROUP BY c.company_name, sa.service_area_name\nHAVING SUM(b.pay_sum)/COUNT(DISTINCT b.oper_date) < 5000;"
@@ -189,7 +173,7 @@
   },
   {
     "question": "各服务区不同数据源支付总额差异分析",
-    "sql": "SELECT service_name AS 服务区名称, source_type AS 数据源类型, SUM(pay_sum) AS 支付总额 FROM bss_business_day_data bdd JOIN bss_service_area_mapper sa ON bdd.service_no = sa.service_no WHERE bdd.delete_ts IS NULL GROUP BY service_name, source_type HAVING SUM(pay_sum) > 10000 ORDER BY 支付总额 DESC LIMIT 20;"
+    "sql": "SELECT sa.service_name AS 服务区名称, bdd.source_type AS 数据源类型, SUM(bdd.pay_sum) AS 支付总额 FROM bss_business_day_data bdd JOIN bss_service_area_mapper sa ON bdd.service_no = sa.service_no WHERE bdd.delete_ts IS NULL GROUP BY sa.service_name, bdd.source_type HAVING SUM(bdd.pay_sum) > 10000 ORDER BY 支付总额 DESC LIMIT 20;"
   },
   {
     "question": "手工录入数据每月占比变化趋势",

+ 202 - 0
output/qs_highway_db_20250623_192120_pair.json.backup

@@ -0,0 +1,202 @@
+[
+  {
+    "question": "最近7天各服务区总收入趋势分析(按日期排序)",
+    "sql": "SELECT oper_date AS 统计日期, service_name AS 服务区名称, SUM(pay_sum) AS 日总收入 FROM bss_business_day_data WHERE delete_ts IS NULL AND oper_date >= CURRENT_DATE - 7 GROUP BY oper_date, service_name ORDER BY oper_date;"
+  },
+  {
+    "question": "本月与上月收入对比分析(按月份分组)",
+    "sql": "SELECT DATE_TRUNC('month', oper_date) AS 月份, SUM(pay_sum) AS 月总收入 FROM bss_business_day_data WHERE delete_ts IS NULL AND oper_date >= DATE_TRUNC('month', CURRENT_DATE) - INTERVAL '1 month' AND oper_date < DATE_TRUNC('month', CURRENT_DATE) + INTERVAL '1 month' GROUP BY DATE_TRUNC('month', oper_date) ORDER BY 月份;"
+  },
+  {
+    "question": "累计收入排名前5的服务区(按总支付金额排序)",
+    "sql": "SELECT service_name AS 服务区名称, SUM(pay_sum) AS 累计总收入 FROM bss_business_day_data WHERE delete_ts IS NULL GROUP BY service_name ORDER BY 累计总收入 DESC LIMIT 5;"
+  },
+  {
+    "question": "各支付方式占比分析(微信/支付宝/现金/其他)",
+    "sql": "SELECT '微信' AS 支付方式, ROUND(SUM(wx)/SUM(pay_sum)*100,2) AS 占比百分比 FROM bss_business_day_data WHERE delete_ts IS NULL UNION ALL SELECT '支付宝', ROUND(SUM(zfb)/SUM(pay_sum)*100,2) FROM bss_business_day_data WHERE delete_ts IS NULL UNION ALL SELECT '现金', ROUND(SUM(rmb)/SUM(pay_sum)*100,2) FROM bss_business_day_data WHERE delete_ts IS NULL UNION ALL SELECT '其他支付', ROUND((SUM(xs)+SUM(jd))/SUM(pay_sum)*100,2) FROM bss_business_day_data WHERE delete_ts IS NULL ORDER BY 占比百分比 DESC;"
+  },
+  {
+    "question": "异常数据监控:单日收入突增超过平均值200%的服务区记录",
+    "sql": "WITH daily_avg AS (SELECT AVG(pay_sum) AS avg_income FROM bss_business_day_data WHERE delete_ts IS NULL) SELECT a.oper_date, a.service_name, a.pay_sum, ROUND(a.pay_sum/daily_avg.avg_income,2) AS 倍数 FROM bss_business_day_data a, daily_avg WHERE a.delete_ts IS NULL AND a.pay_sum > daily_avg.avg_income * 2 ORDER BY a.oper_date DESC;"
+  },
+  {
+    "question": "各档口客单价对比分析(按平均订单金额排序)",
+    "sql": "SELECT branch_name AS 档口名称, ROUND(SUM(pay_sum)/SUM(order_sum),2) AS 客单价 FROM bss_business_day_data WHERE delete_ts IS NULL AND order_sum > 0 GROUP BY branch_name ORDER BY 客单价 DESC;"
+  },
+  {
+    "question": "周末与工作日收入差异分析(统计星期几收入分布)",
+    "sql": "SELECT CASE WHEN EXTRACT(DOW FROM oper_date) IN (0,6) THEN '周末' ELSE '工作日' END AS 日期类型, AVG(pay_sum) AS 平均日收入 FROM bss_business_day_data WHERE delete_ts IS NULL GROUP BY CASE WHEN EXTRACT(DOW FROM oper_date) IN (0,6) THEN '周末' ELSE '工作日' END;"
+  },
+  {
+    "question": "微信支付金额月环比增长趋势分析",
+    "sql": "SELECT DATE_TRUNC('month', oper_date) AS 月份, SUM(wx) AS 微信支付总额, ROUND((SUM(wx) - LAG(SUM(wx),1,0) OVER(ORDER BY DATE_TRUNC('month', oper_date)))/LAG(SUM(wx),1,0) OVER(ORDER BY DATE_TRUNC('month', oper_date))*100,2) AS 环比增长率 FROM bss_business_day_data WHERE delete_ts IS NULL GROUP BY DATE_TRUNC('month', oper_date) ORDER BY 月份;"
+  },
+  {
+    "question": "庐山服务区各档口收入排名(按支付金额降序排列)",
+    "sql": "SELECT branch_name AS 档口名称, SUM(pay_sum) AS 累计收入 FROM bss_business_day_data WHERE delete_ts IS NULL AND service_name = '庐山服务区' GROUP BY branch_name ORDER BY 累计收入 DESC;"
+  },
+  {
+    "question": "车辆数量与订单量相关性分析(关联车流数据)",
+    "sql": "SELECT a.oper_date, a.order_sum AS 订单数量, b.customer_count AS 车辆数量, ROUND(CORR(a.order_sum, b.customer_count),2) AS 相关系数 FROM (SELECT oper_date, SUM(order_sum) AS order_sum FROM bss_business_day_data WHERE delete_ts IS NULL GROUP BY oper_date) a JOIN (SELECT count_date, SUM(customer_count) AS customer_count FROM bss_car_day_count GROUP BY count_date) b ON a.oper_date = b.count_date GROUP BY a.oper_date;"
+  },
+  {
+    "question": "分析不同车辆类型(危化品/城际/过境/其他)对应的服务区平均消费金额差异",
+    "sql": "SELECT c.car_type AS 车辆类型, AVG(b.pay_sum) AS 平均消费金额 FROM bss_business_day_data b JOIN bss_service_area_mapper m ON b.service_name = m.service_name AND m.delete_ts IS NULL JOIN bss_car_day_count c ON m.service_area_id = c.service_area_id AND b.oper_date = c.count_date WHERE b.delete_ts IS NULL AND c.delete_ts IS NULL GROUP BY c.car_type;"
+  },
+  {
+    "question": "找出最近7天车流量排名前10但订单转化率(订单数/车流量)低于5%的服务区",
+    "sql": "SELECT c.service_area_id AS 服务区ID, SUM(c.customer_count) AS 总车流量, SUM(b.order_sum) AS 总订单数, (SUM(b.order_sum)/SUM(c.customer_count)*100)::numeric(5,2) AS 转化率 FROM bss_car_day_count c JOIN bss_service_area_mapper m ON c.service_area_id = m.service_area_id AND m.delete_ts IS NULL JOIN bss_business_day_data b ON m.service_name = b.service_name AND c.count_date = b.oper_date WHERE c.count_date >= CURRENT_DATE - 7 AND c.delete_ts IS NULL AND b.delete_ts IS NULL GROUP BY c.service_area_id HAVING SUM(b.order_sum)/SUM(c.customer_count) < 0.05 ORDER BY 总车流量 DESC LIMIT 10;"
+  },
+  {
+    "question": "统计各服务区近一月日均车流量与日均消费金额的线性相关性",
+    "sql": "SELECT c.service_area_id AS 服务区ID, CORR(c.customer_count, b.pay_sum) AS 相关性系数 FROM bss_car_day_count c JOIN bss_service_area_mapper m ON c.service_area_id = m.service_area_id AND m.delete_ts IS NULL JOIN bss_business_day_data b ON m.service_name = b.service_name AND c.count_date = b.oper_date WHERE c.count_date >= CURRENT_DATE - 30 AND c.delete_ts IS NULL AND b.delete_ts IS NULL GROUP BY c.service_area_id HAVING COUNT(*) > 20;"
+  },
+  {
+    "question": "对比各分公司管辖服务区的月均车流量和客单价(消费金额/订单数)",
+    "sql": "SELECT com.company_name AS 所属公司, AVG(c.customer_count) AS 月均车流量, AVG(b.pay_sum/b.order_sum) AS 客单价 FROM bss_car_day_count c JOIN bss_service_area s ON c.service_area_id = s.id AND s.delete_ts IS NULL JOIN bss_company com ON s.company_id = com.id AND com.delete_ts IS NULL JOIN bss_service_area_mapper m ON s.id = m.service_area_id AND m.delete_ts IS NULL JOIN bss_business_day_data b ON m.service_name = b.service_name AND c.count_date = b.oper_date WHERE EXTRACT(MONTH FROM c.count_date) = EXTRACT(MONTH FROM CURRENT_DATE) AND c.delete_ts IS NULL AND b.delete_ts IS NULL GROUP BY com.company_name;"
+  },
+  {
+    "question": "识别昨日车流量超过该服务区历史平均值200%且消费金额下降超过30%的异常服务区",
+    "sql": "WITH daily_avg AS (SELECT service_area_id, AVG(customer_count) AS avg_count FROM bss_car_day_count WHERE delete_ts IS NULL GROUP BY service_area_id) SELECT c.service_area_id AS 服务区ID, c.customer_count AS 昨日车流, a.avg_count AS 历史均值, b.pay_sum AS 昨日消费 FROM bss_car_day_count c JOIN daily_avg a ON c.service_area_id = a.service_area_id JOIN bss_business_day_data b ON c.count_date = b.oper_date JOIN bss_service_area_mapper m ON c.service_area_id = m.service_area_id AND m.delete_ts IS NULL AND m.service_name = b.service_name WHERE c.count_date = CURRENT_DATE - 1 AND c.customer_count > 2*a.avg_count AND b.pay_sum < 0.7*(SELECT AVG(pay_sum) FROM bss_business_day_data WHERE service_name = m.service_name AND delete_ts IS NULL);"
+  },
+  {
+    "question": "分析周末与工作日的车型分布变化及对应消费差异",
+    "sql": "SELECT CASE WHEN EXTRACT(ISODOW FROM c.count_date) IN (6,7) THEN '周末' ELSE '工作日' END AS 日期类型, c.car_type AS 车型, COUNT(*) AS 记录次数, AVG(c.customer_count) AS 平均车流, AVG(b.pay_sum) AS 平均消费 FROM bss_car_day_count c JOIN bss_service_area_mapper m ON c.service_area_id = m.service_area_id AND m.delete_ts IS NULL JOIN bss_business_day_data b ON m.service_name = b.service_name AND c.count_date = b.oper_date WHERE c.count_date >= CURRENT_DATE - 30 AND c.delete_ts IS NULL AND b.delete_ts IS NULL GROUP BY ROLLUP(日期类型, 车型);"
+  },
+  {
+    "question": "统计各服务区档口微信支付占比(微信金额/总支付金额)与车流高峰时段的关系",
+    "sql": "SELECT m.service_name AS 服务区名称, b.branch_name AS 档口名称, (b.wx/SUM(b.pay_sum) OVER(PARTITION BY b.service_name, b.oper_date)) * 100 AS 微信占比, CASE WHEN c.customer_count > (SELECT PERCENTILE_CONT(0.75) WITHIN GROUP(ORDER BY customer_count) FROM bss_car_day_count) THEN '高峰时段' ELSE '非高峰' END AS 车流时段 FROM bss_business_day_data b JOIN bss_service_area_mapper m ON b.service_name = m.service_name AND m.delete_ts IS NULL JOIN bss_car_day_count c ON m.service_area_id = c.service_area_id AND b.oper_date = c.count_date WHERE b.delete_ts IS NULL AND c.delete_ts IS NULL;"
+  },
+  {
+    "question": "预测未来一周各服务区基于历史车流与消费数据的消费趋势(使用线性回归)",
+    "sql": "SELECT service_area_id AS 服务区ID, date, REGR_INTERCEPT(pay_sum, day_num) + REGR_SLOPE(pay_sum, day_num) * EXTRACT(EPOCH FROM date)/86400 AS 预测消费 FROM (SELECT c.service_area_id, b.oper_date AS date, EXTRACT(EPOCH FROM b.oper_date - CURRENT_DATE + 7) AS day_num, b.pay_sum FROM bss_car_day_count c JOIN bss_service_area_mapper m ON c.service_area_id = m.service_area_id AND m.delete_ts IS NULL JOIN bss_business_day_data b ON m.service_name = b.service_name AND c.count_date = b.oper_date WHERE b.oper_date BETWEEN CURRENT_DATE - 30 AND CURRENT_DATE AND c.delete_ts IS NULL AND b.delete_ts IS NULL) sub GROUP BY service_area_id, date;"
+  },
+  {
+    "question": "找出最近3天连续出现车流量下降超过15%且消费金额波动异常的服务区",
+    "sql": "WITH daily_diff AS (SELECT service_area_id, count_date, customer_count - LAG(customer_count,1) OVER(PARTITION BY service_area_id ORDER BY count_date) AS flow_diff, (customer_count - LAG(customer_count,1) OVER(PARTITION BY service_area_id ORDER BY count_date))/NULLIF(LAG(customer_count,1) OVER(PARTITION BY service_area_id ORDER BY count_date),0) * 100 AS flow_rate FROM bss_car_day_count WHERE delete_ts IS NULL) SELECT DISTINCT service_area_id AS 服务区ID FROM daily_diff WHERE count_date >= CURRENT_DATE -3 AND flow_rate < -15 GROUP BY service_area_id HAVING COUNT(*) =3;"
+  },
+  {
+    "question": "分析不同档口类型(餐饮/零售/其他)的消费转化率与车流密度(车流量/营业面积)的关系",
+    "sql": "SELECT b.branch_type AS 档口类型, c.customer_count / NULLIF(s.area,0) AS 车流密度, SUM(b.order_sum)/SUM(c.customer_count) AS 转化率 FROM bss_business_day_data b JOIN (SELECT branch_name, CASE WHEN branch_name LIKE '%餐饮%' THEN '餐饮' WHEN branch_name LIKE '%零售%' THEN '零售' ELSE '其他' END AS branch_type FROM bss_business_day_data GROUP BY branch_name) t ON b.branch_name = t.branch_name JOIN bss_service_area_mapper m ON b.service_name = m.service_name AND m.delete_ts IS NULL JOIN bss_car_day_count c ON m.service_area_id = c.service_area_id AND b.oper_date = c.count_date JOIN (SELECT id, (RANDOM()*1000+500)::INT AS area FROM bss_service_area) s ON m.service_area_id = s.id WHERE b.delete_ts IS NULL AND c.delete_ts IS NULL GROUP BY ROLLUP(branch_type);"
+  },
+  {
+    "question": "各管理公司最近一个月平均每车次收入及订单转化率排名",
+    "sql": "SELECT c.company_name AS 公司名称, \n       ROUND(SUM(b.pay_sum)/SUM(car.customer_count), 2) AS 单位车次收入,\n       ROUND(SUM(b.order_sum)*100/SUM(car.customer_count), 2) AS 订单转化率\nFROM bss_company c\nJOIN bss_service_area sa ON c.id = sa.company_id AND sa.delete_ts IS NULL\nJOIN bss_business_day_data b ON sa.service_area_no = b.service_no\nJOIN bss_car_day_count car ON sa.id = car.service_area_id\nWHERE b.oper_date BETWEEN '2023-03-01' AND '2023-03-31'\n  AND car.count_date = b.oper_date\nGROUP BY c.company_name\nORDER BY 单位车次收入 DESC\nLIMIT 10;"
+  },
+  {
+    "question": "不同季度各管理公司人均产出对比分析",
+    "sql": "SELECT c.company_name AS 公司名称,\n       DATE_TRUNC('quarter', b.oper_date) AS 季度,\n       ROUND(SUM(b.pay_sum)/COUNT(DISTINCT b.created_by), 2) AS 人均产出\nFROM bss_company c\nJOIN bss_service_area sa ON c.id = sa.company_id AND sa.delete_ts IS NULL\nJOIN bss_business_day_data b ON sa.service_area_no = b.service_no\nWHERE b.oper_date BETWEEN '2022-01-01' AND '2023-12-31'\nGROUP BY c.company_name, DATE_TRUNC('quarter', b.oper_date)\nORDER BY 季度, 人均产出 DESC;"
+  },
+  {
+    "question": "危化品车辆占比超过10%的服务区运营效率分析",
+    "sql": "SELECT sa.service_area_name AS 服务区名称,\n       c.company_name AS 管理公司,\n       ROUND(SUM(b.pay_sum)/SUM(car.customer_count), 2) AS 单位车次收入,\n       ROUND(SUM(car_chem.customer_count)*100/SUM(car.customer_count), 2) AS 危化品占比\nFROM bss_service_area sa\nJOIN bss_company c ON sa.company_id = c.id\nJOIN bss_business_day_data b ON sa.service_area_no = b.service_no\nJOIN bss_car_day_count car ON sa.id = car.service_area_id\nJOIN bss_car_day_count car_chem ON sa.id = car_chem.service_area_id\nWHERE car.count_date = b.oper_date\n  AND car_chem.car_type = '危化品'\nGROUP BY sa.service_area_name, c.company_name\nHAVING SUM(car_chem.customer_count)*100/SUM(car.customer_count) > 10\nORDER BY 危化品占比 DESC;"
+  },
+  {
+    "question": "城际车辆流量与夜间收入占比关系分析(20:00-6:00时段)",
+    "sql": "SELECT c.company_name AS 管理公司,\n       SUM(car.customer_count) AS 城际车流量,\n       ROUND(SUM(CASE WHEN EXTRACT(HOUR FROM b.create_ts) BETWEEN 20 AND 23 OR EXTRACT(HOUR FROM b.create_ts) BETWEEN 0 AND 6 THEN b.pay_sum ELSE 0 END)*100/SUM(b.pay_sum), 2) AS 夜间收入占比\nFROM bss_company c\nJOIN bss_service_area sa ON c.id = sa.company_id\nJOIN bss_business_day_data b ON sa.service_area_no = b.service_no\nJOIN bss_car_day_count car ON sa.id = car.service_area_id\nWHERE car.car_type = '城际'\n  AND car.count_date = b.oper_date\nGROUP BY c.company_name\nORDER BY 城际车流量 DESC;"
+  },
+  {
+    "question": "各支付方式订单转化率区域分布热力图",
+    "sql": "SELECT sa.service_position AS 坐标,\n       ROUND(SUM(b.wx_order + b.zf_order + b.rmb_order)*100/SUM(car.customer_count), 2) AS 综合转化率,\n       ROUND(SUM(b.wx_order)*100/SUM(b.order_sum), 2) AS 微信占比\nFROM bss_service_area sa\nJOIN bss_business_day_data b ON sa.service_area_no = b.service_no\nJOIN bss_car_day_count car ON sa.id = car.service_area_id\nWHERE b.oper_date BETWEEN '2023-01-01' AND '2023-03-31'\n  AND car.count_date = b.oper_date\nGROUP BY sa.service_position;"
+  },
+  {
+    "question": "资源闲置率最高的5个服务区(连续3个月营收下降)",
+    "sql": "WITH monthly_revenue AS (\n  SELECT service_no,\n         DATE_TRUNC('month', oper_date) AS 月份,\n         SUM(pay_sum) AS 总营收\n  FROM bss_business_day_data\n  WHERE oper_date BETWEEN '2022-12-01' AND '2023-02-28'\n  GROUP BY service_no, DATE_TRUNC('month', oper_date)\n),\nrevenue_trend AS (\n  SELECT service_no,\n         ARRAY_AGG(总营收 ORDER BY 月份) AS 收益序列\n  FROM monthly_revenue\n  GROUP BY service_no\n)\nSELECT sa.service_area_name AS 服务区名称,\n       r.收益序列[3] - r.收益序列[1] AS 下降幅度\nFROM revenue_trend r\nJOIN bss_service_area sa ON r.service_no = sa.service_area_no\nWHERE r.收益序列[3] < r.收益序列[2] AND r.收益序列[2] < r.收益序列[1]\nORDER BY 下降幅度 ASC\nLIMIT 5;"
+  },
+  {
+    "question": "节假日与工作日运营效率差异对比(春节假期 vs 常规周)",
+    "sql": "SELECT \n  CASE WHEN b.oper_date BETWEEN '2023-01-21' AND '2023-01-27' THEN '春节假期' ELSE '常规周' END AS 时段类型,\n  ROUND(AVG(b.pay_sum/car.customer_count), 2) AS 平均单车收入,\n  ROUND(AVG(b.order_sum/car.customer_count), 2) AS 平均转化率\nFROM bss_business_day_data b\nJOIN bss_car_day_count car ON b.service_no = car.service_area_id\nWHERE b.oper_date BETWEEN '2023-01-10' AND '2023-02-10'\n  AND car.count_date = b.oper_date\nGROUP BY \n  CASE WHEN b.oper_date BETWEEN '2023-01-21' AND '2023-01-27' THEN '春节假期' ELSE '常规周' END;"
+  },
+  {
+    "question": "各区域管理公司单位能耗产出对比(需结合能耗表)",
+    "sql": "SELECT c.company_name AS 公司名称,\n       ROUND(SUM(b.pay_sum)/SUM(e.energy_consumption), 2) AS 单位能耗产出\nFROM bss_company c\nJOIN bss_service_area sa ON c.id = sa.company_id\nJOIN bss_business_day_data b ON sa.service_area_no = b.service_no\n-- 假设存在能耗表 energy_consumption_table e\n-- JOIN energy_consumption_table e ON sa.id = e.service_area_id\nWHERE b.oper_date BETWEEN '2023-01-01' AND '2023-03-31'\nGROUP BY c.company_name\nORDER BY 单位能耗产出 DESC;"
+  },
+  {
+    "question": "新入驻公司首月运营效率达标情况检查",
+    "sql": "SELECT c.company_name AS 公司名称,\n       sa.service_area_name AS 服务区,\n       ROUND(SUM(b.pay_sum)/COUNT(DISTINCT b.oper_date), 2) AS 日均营收,\n       ROUND(SUM(b.order_sum)/COUNT(DISTINCT b.oper_date), 2) AS 日均订单量\nFROM bss_company c\nJOIN bss_service_area sa ON c.id = sa.company_id\nJOIN bss_business_day_data b ON sa.service_area_no = b.service_no\nWHERE sa.create_ts BETWEEN '2023-01-01' AND '2023-03-31'\nGROUP BY c.company_name, sa.service_area_name\nHAVING SUM(b.pay_sum)/COUNT(DISTINCT b.oper_date) < 5000;"
+  },
+  {
+    "question": "重点路线服务区运营效率矩阵分析(昌栗高速路段)",
+    "sql": "SELECT sa.service_area_name AS 服务区,\n       ROUND(AVG(b.pay_sum/car.customer_count), 2) AS 单位车次收入,\n       ROUND(AVG(b.order_sum/car.customer_count), 4) AS 转化率\nFROM bss_section_route sr\nJOIN bss_section_route_area_link link ON sr.id = link.section_route_id\nJOIN bss_service_area sa ON link.service_area_id = sa.id\nJOIN bss_business_day_data b ON sa.service_area_no = b.service_no\nJOIN bss_car_day_count car ON sa.id = car.service_area_id\nWHERE sr.section_name = '昌栗'\n  AND b.oper_date BETWEEN '2023-01-01' AND '2023-03-31'\n  AND car.count_date = b.oper_date\nGROUP BY sa.service_area_name\nORDER BY 单位车次收入 DESC, 转化率 DESC;"
+  },
+  {
+    "question": "各路段车流量分布情况?",
+    "sql": "SELECT section.section_name AS 路段名称, SUM(car.customer_count) AS 总车流量 FROM bss_section_route section JOIN bss_section_route_area_link link ON section.id = link.section_route_id JOIN bss_car_day_count car ON link.service_area_id = car.service_area_id WHERE section.delete_ts IS NULL AND car.delete_ts IS NULL GROUP BY section.section_name ORDER BY 总车流量 DESC;"
+  },
+  {
+    "question": "对比不同日期各路段的车流量变化趋势?",
+    "sql": "SELECT car.count_date AS 统计日期, section.section_name AS 路段名称, SUM(car.customer_count) AS 日车流量 FROM bss_section_route section JOIN bss_section_route_area_link link ON section.id = link.section_route_id JOIN bss_car_day_count car ON link.service_area_id = car.service_area_id WHERE section.delete_ts IS NULL AND car.delete_ts IS NULL GROUP BY car.count_date, section.section_name ORDER BY 统计日期 ASC;"
+  },
+  {
+    "question": "车流量最高的五个服务区?",
+    "sql": "SELECT area.service_area_name AS 服务区名称, SUM(car.customer_count) AS 总车流量 FROM bss_section_route_area_link link JOIN bss_car_day_count car ON link.service_area_id = car.service_area_id JOIN bss_service_area area ON link.service_area_id = area.id WHERE car.delete_ts IS NULL AND area.delete_ts IS NULL GROUP BY area.service_area_name ORDER BY 总车流量 DESC LIMIT 5;"
+  },
+  {
+    "question": "分析工作日与周末的平均车流量差异?",
+    "sql": "SELECT CASE WHEN EXTRACT(DOW FROM car.count_date) IN (0,6) THEN '周末' ELSE '工作日' END AS 日期类型, AVG(customer_count) AS 平均车流量 FROM bss_section_route_area_link link JOIN bss_car_day_count car ON link.service_area_id = car.service_area_id WHERE car.delete_ts IS NULL GROUP BY CASE WHEN EXTRACT(DOW FROM car.count_date) IN (0,6) THEN '周末' ELSE '工作日' END;"
+  },
+  {
+    "question": "危化品车辆较多的服务区分布?",
+    "sql": "SELECT area.service_area_name AS 服务区名称, SUM(car.customer_count) AS 危化品车流量 FROM bss_section_route_area_link link JOIN bss_car_day_count car ON link.service_area_id = car.service_area_id JOIN bss_service_area area ON link.service_area_id = area.id WHERE car.car_type = '危化品' AND car.delete_ts IS NULL AND area.delete_ts IS NULL GROUP BY area.service_area_name ORDER BY 危化品车流量 DESC;"
+  },
+  {
+    "question": "统计每个路段连接的服务区数量?",
+    "sql": "SELECT section.section_name AS 路段名称, COUNT(link.service_area_id) AS 服务区数量 FROM bss_section_route section JOIN bss_section_route_area_link link ON section.id = link.section_route_id WHERE section.delete_ts IS NULL GROUP BY section.section_name ORDER BY 服务区数量 DESC;"
+  },
+  {
+    "question": "分析特定路段(如昌栗)的车辆类型分布?",
+    "sql": "SELECT car.car_type AS 车辆类型, SUM(car.customer_count) AS 总车流量, ROUND(SUM(car.customer_count)*100.0/(SELECT SUM(customer_count) FROM bss_car_day_count car2 JOIN bss_section_route_area_link link2 ON car2.service_area_id = link2.service_area_id JOIN bss_section_route sec2 ON link2.section_route_id = sec2.id WHERE sec2.section_name = '昌栗' AND car2.delete_ts IS NULL AND sec2.delete_ts IS NULL),2) AS 占比百分比 FROM bss_section_route section JOIN bss_section_route_area_link link ON section.id = link.section_route_id JOIN bss_car_day_count car ON link.service_area_id = car.service_area_id WHERE section.section_name = '昌栗' AND section.delete_ts IS NULL AND car.delete_ts IS NULL GROUP BY car.car_type ORDER BY 总车流量 DESC;"
+  },
+  {
+    "question": "检查是否存在未关联任何路段的服务区?",
+    "sql": "SELECT area.service_area_name AS 服务区名称 FROM bss_service_area area LEFT JOIN bss_section_route_area_link link ON area.id = link.service_area_id WHERE link.section_route_id IS NULL AND area.delete_ts IS NULL;"
+  },
+  {
+    "question": "最近7天各路段的车流量统计?",
+    "sql": "SELECT section.section_name AS 路段名称, SUM(car.customer_count) AS 总车流量 FROM bss_section_route section JOIN bss_section_route_area_link link ON section.id = link.section_route_id JOIN bss_car_day_count car ON link.service_area_id = car.service_area_id WHERE section.delete_ts IS NULL AND car.delete_ts IS NULL AND car.count_date >= CURRENT_DATE - 7 GROUP BY section.section_name ORDER BY 总车流量 DESC;"
+  },
+  {
+    "question": "找出订单总数与车流量相关性高的服务区?",
+    "sql": "SELECT area.service_area_name AS 服务区名称, SUM(business.order_sum) AS 总订单数, SUM(car.customer_count) AS 总车流量, ROUND(SUM(business.order_sum)*1.0/SUM(car.customer_count),4) AS 订单车流比 FROM bss_service_area area JOIN bss_business_day_data business ON area.service_area_name = business.service_name JOIN bss_car_day_count car ON area.id = car.service_area_id WHERE area.delete_ts IS NULL AND business.delete_ts IS NULL AND car.delete_ts IS NULL GROUP BY area.service_area_name ORDER BY 订单车流比 DESC;"
+  },
+  {
+    "question": "不同数据来源类别的业务数据记录数量对比",
+    "sql": "SELECT sa.source_type AS 数据来源类别, COUNT(*) AS 记录数量 FROM bss_business_day_data bdd JOIN bss_service_area_mapper sa ON bdd.service_no = sa.service_no WHERE bdd.delete_ts IS NULL GROUP BY sa.source_type ORDER BY 记录数量 DESC;"
+  },
+  {
+    "question": "近30天各服务区微信支付金额波动趋势分析",
+    "sql": "SELECT oper_date AS 统计日期, service_name AS 服务区名称, SUM(wx) AS 微信支付总额 FROM bss_business_day_data WHERE oper_date >= CURRENT_DATE - 30 AND delete_ts IS NULL GROUP BY oper_date, service_name ORDER BY oper_date DESC LIMIT 100;"
+  },
+  {
+    "question": "版本变更次数超过5次的服务区映射信息",
+    "sql": "SELECT service_name AS 服务区名称, COUNT(version) AS 版本变更次数 FROM bss_service_area_mapper WHERE delete_ts IS NULL GROUP BY service_name HAVING COUNT(version) > 5 ORDER BY 版本变更次数 DESC;"
+  },
+  {
+    "question": "检查服务区编码与名称的匹配一致性",
+    "sql": "SELECT bdd.service_no AS 业务数据编码, bdd.service_name AS 业务数据名称, sa.service_name AS 映射表名称 FROM bss_business_day_data bdd JOIN bss_service_area_mapper sa ON bdd.service_no = sa.service_no WHERE bdd.service_name != sa.service_name AND bdd.delete_ts IS NULL LIMIT 50;"
+  },
+  {
+    "question": "最近7天每日新增业务数据量的时效性分析",
+    "sql": "SELECT DATE(create_ts) AS 创建日期, COUNT(*) AS 新增记录数 FROM bss_business_day_data WHERE create_ts >= CURRENT_DATE - 7 AND delete_ts IS NULL GROUP BY DATE(create_ts) ORDER BY 创建日期 DESC;"
+  },
+  {
+    "question": "不同支付方式订单占比分布统计",
+    "sql": "SELECT '微信' AS 支付方式, SUM(wx_order) AS 订单数 FROM bss_business_day_data WHERE delete_ts IS NULL UNION ALL SELECT '支付宝', SUM(zf_order) FROM bss_business_day_data WHERE delete_ts IS NULL UNION ALL SELECT '现金', SUM(rmb_order) FROM bss_business_day_data WHERE delete_ts IS NULL ORDER BY 订单数 DESC;"
+  },
+  {
+    "question": "检查超过30天未更新的业务数据记录",
+    "sql": "SELECT service_name AS 服务区名称, oper_date AS 统计日期, update_ts AS 最后更新时间 FROM bss_business_day_data WHERE update_ts < CURRENT_DATE - 30 AND delete_ts IS NULL ORDER BY update_ts ASC LIMIT 50;"
+  },
+  {
+    "question": "各服务区不同数据源支付总额差异分析",
+    "sql": "SELECT service_name AS 服务区名称, source_type AS 数据源类型, SUM(pay_sum) AS 支付总额 FROM bss_business_day_data bdd JOIN bss_service_area_mapper sa ON bdd.service_no = sa.service_no WHERE bdd.delete_ts IS NULL GROUP BY service_name, source_type HAVING SUM(pay_sum) > 10000 ORDER BY 支付总额 DESC LIMIT 20;"
+  },
+  {
+    "question": "手工录入数据每月占比变化趋势",
+    "sql": "SELECT DATE_TRUNC('month', bdd.create_ts) AS 月份, COUNT(CASE WHEN sa.source_system_type = '手工录入' THEN 1 END) * 100.0 / COUNT(*) AS 手工录入占比 FROM bss_business_day_data bdd JOIN bss_service_area_mapper sa ON bdd.service_no = sa.service_no WHERE bdd.delete_ts IS NULL GROUP BY DATE_TRUNC('month', bdd.create_ts) ORDER BY 月份 DESC;"
+  },
+  {
+    "question": "检查同一天同一服务区的重复数据记录",
+    "sql": "SELECT oper_date AS 统计日期, service_area_id AS 服务区ID, COUNT(*) AS 重复次数 FROM bss_business_day_data bdd JOIN bss_service_area_mapper sa ON bdd.service_no = sa.service_no WHERE bdd.delete_ts IS NULL GROUP BY oper_date, service_area_id HAVING COUNT(*) > 1 ORDER BY 重复次数 DESC LIMIT 30;"
+  }
+]

+ 74 - 0
output/sql_validation_20250623_220723_summary.txt

@@ -0,0 +1,74 @@
+SQL验证报告
+==================================================
+
+输入文件: output\qs_highway_db_20250623_192120_pair.json
+验证时间: 2025-06-23T22:07:23.116709
+验证耗时: 0.65秒
+
+验证结果摘要:
+  总SQL数量: 50
+  有效SQL: 45
+  无效SQL: 5
+  成功率: 90.00%
+  平均耗时: 0.057秒
+  重试次数: 0
+
+SQL修复统计:
+  尝试修复: 0
+  修复成功: 0
+  修复失败: 0
+
+原始文件修改统计:
+  修改的SQL: 0
+  删除的无效项: 0
+  修改失败: 0
+
+错误详情(共5个):
+==================================================
+
+1. 问题: 车辆数量与订单量相关性分析(关联车流数据)
+   错误: 函数 round(double precision, integer) 不存在
+HINT:  没有匹配指定名称和参数类型的函数. 您也许需要增加明确的类型转换.
+   LLM修复尝试: 未尝试
+   完整SQL:
+   SELECT a.oper_date, a.order_sum AS 订单数量, b.customer_count AS 车辆数量, ROUND(CORR(a.order_sum, b.customer_count),2) AS 相关系数 FROM (SELECT oper_date, SUM(order_sum) AS order_sum FROM bss_business_day_data WHERE delete_ts IS NULL GROUP BY oper_date) a JOIN (SELECT count_date, SUM(customer_count) AS customer_count FROM bss_car_day_count GROUP BY count_date) b ON a.oper_date = b.count_date GROUP BY a.oper_date;
+   ----------------------------------------
+
+2. 问题: 预测未来一周各服务区基于历史车流与消费数据的消费趋势(使用线性回归)
+   错误: 函数 pg_catalog.extract(unknown, integer) 不存在
+HINT:  没有匹配指定名称和参数类型的函数. 您也许需要增加明确的类型转换.
+   LLM修复尝试: 未尝试
+   完整SQL:
+   SELECT service_area_id AS 服务区ID, date, REGR_INTERCEPT(pay_sum, day_num) + REGR_SLOPE(pay_sum, day_num) * EXTRACT(EPOCH FROM date)/86400 AS 预测消费 FROM (SELECT c.service_area_id, b.oper_date AS date, EXTRACT(EPOCH FROM b.oper_date - CURRENT_DATE + 7) AS day_num, b.pay_sum FROM bss_car_day_count c JOIN bss_service_area_mapper m ON c.service_area_id = m.service_area_id AND m.delete_ts IS NULL JOIN bss_business_day_data b ON m.service_name = b.service_name AND c.count_date = b.oper_date WHERE b.oper_date BETWEEN CURRENT_DATE - 30 AND CURRENT_DATE AND c.delete_ts IS NULL AND b.delete_ts IS NULL) sub GROUP BY service_area_id, date;
+   ----------------------------------------
+
+3. 问题: 分析不同档口类型(餐饮/零售/其他)的消费转化率与车流密度(车流量/营业面积)的关系
+   错误: 字段 b.branch_type 不存在
+HINT:  也许您想要引用列"t.branch_type"。
+   LLM修复尝试: 未尝试
+   完整SQL:
+   SELECT b.branch_type AS 档口类型, c.customer_count / NULLIF(s.area,0) AS 车流密度, SUM(b.order_sum)/SUM(c.customer_count) AS 转化率 FROM bss_business_day_data b JOIN (SELECT branch_name, CASE WHEN branch_name LIKE '%餐饮%' THEN '餐饮' WHEN branch_name LIKE '%零售%' THEN '零售' ELSE '其他' END AS branch_type FROM bss_business_day_data GROUP BY branch_name) t ON b.branch_name = t.branch_name JOIN bss_service_area_mapper m ON b.service_name = m.service_name AND m.delete_ts IS NULL JOIN bss_car_day_count c ON m.service_area_id = c.service_area_id AND b.oper_date = c.count_date JOIN (SELECT id, (RANDOM()*1000+500)::INT AS area FROM bss_service_area) s ON m.service_area_id = s.id WHERE b.delete_ts IS NULL AND c.delete_ts IS NULL GROUP BY ROLLUP(branch_type);
+   ----------------------------------------
+
+4. 问题: 各区域管理公司单位能耗产出对比(需结合能耗表)
+   错误: 对于表"e",丢失FROM子句项
+   LLM修复尝试: 未尝试
+   完整SQL:
+   SELECT c.company_name AS 公司名称,
+       ROUND(SUM(b.pay_sum)/SUM(e.energy_consumption), 2) AS 单位能耗产出
+FROM bss_company c
+JOIN bss_service_area sa ON c.id = sa.company_id
+JOIN bss_business_day_data b ON sa.service_area_no = b.service_no
+-- 假设存在能耗表 energy_consumption_table e
+-- JOIN energy_consumption_table e ON sa.id = e.service_area_id
+WHERE b.oper_date BETWEEN '2023-01-01' AND '2023-03-31'
+GROUP BY c.company_name
+ORDER BY 单位能耗产出 DESC;
+   ----------------------------------------
+
+5. 问题: 各服务区不同数据源支付总额差异分析
+   错误: 字段关联 "service_name" 是不明确的
+   LLM修复尝试: 未尝试
+   完整SQL:
+   SELECT service_name AS 服务区名称, source_type AS 数据源类型, SUM(pay_sum) AS 支付总额 FROM bss_business_day_data bdd JOIN bss_service_area_mapper sa ON bdd.service_no = sa.service_no WHERE bdd.delete_ts IS NULL GROUP BY service_name, source_type HAVING SUM(pay_sum) > 10000 ORDER BY 支付总额 DESC LIMIT 20;
+   ----------------------------------------

+ 81 - 0
output/sql_validation_20250623_221222_summary.txt

@@ -0,0 +1,81 @@
+SQL验证报告
+==================================================
+
+输入文件: output\qs_highway_db_20250623_192120_pair.json
+验证时间: 2025-06-23T22:12:22.207844
+验证耗时: 213.11秒
+
+验证结果摘要:
+  总SQL数量: 50
+  有效SQL: 46
+  无效SQL: 4
+  成功率: 92.00%
+  平均耗时: 0.075秒
+  重试次数: 0
+
+SQL修复统计:
+  尝试修复: 5
+  修复成功: 1
+  修复失败: 4
+  修复成功率: 20.00%
+
+原始文件修改统计:
+  修改的SQL: 1
+  删除的无效项: 4
+  修改失败: 0
+
+错误详情(共4个):
+==================================================
+
+1. 问题: 车辆数量与订单量相关性分析(关联车流数据)
+   错误: 函数 round(double precision, integer) 不存在
+HINT:  没有匹配指定名称和参数类型的函数. 您也许需要增加明确的类型转换.
+   LLM修复尝试: 失败
+   修复失败原因: LLM修复失败或返回空结果
+   完整SQL:
+   SELECT a.oper_date, a.order_sum AS 订单数量, b.customer_count AS 车辆数量, ROUND(CORR(a.order_sum, b.customer_count),2) AS 相关系数 FROM (SELECT oper_date, SUM(order_sum) AS order_sum FROM bss_business_day_data WHERE delete_ts IS NULL GROUP BY oper_date) a JOIN (SELECT count_date, SUM(customer_count) AS customer_count FROM bss_car_day_count GROUP BY count_date) b ON a.oper_date = b.count_date GROUP BY a.oper_date;
+   ----------------------------------------
+
+2. 问题: 预测未来一周各服务区基于历史车流与消费数据的消费趋势(使用线性回归)
+   错误: 函数 pg_catalog.extract(unknown, integer) 不存在
+HINT:  没有匹配指定名称和参数类型的函数. 您也许需要增加明确的类型转换.
+   LLM修复尝试: 失败
+   修复失败原因: LLM修复失败或返回空结果
+   完整SQL:
+   SELECT service_area_id AS 服务区ID, date, REGR_INTERCEPT(pay_sum, day_num) + REGR_SLOPE(pay_sum, day_num) * EXTRACT(EPOCH FROM date)/86400 AS 预测消费 FROM (SELECT c.service_area_id, b.oper_date AS date, EXTRACT(EPOCH FROM b.oper_date - CURRENT_DATE + 7) AS day_num, b.pay_sum FROM bss_car_day_count c JOIN bss_service_area_mapper m ON c.service_area_id = m.service_area_id AND m.delete_ts IS NULL JOIN bss_business_day_data b ON m.service_name = b.service_name AND c.count_date = b.oper_date WHERE b.oper_date BETWEEN CURRENT_DATE - 30 AND CURRENT_DATE AND c.delete_ts IS NULL AND b.delete_ts IS NULL) sub GROUP BY service_area_id, date;
+   ----------------------------------------
+
+3. 问题: 分析不同档口类型(餐饮/零售/其他)的消费转化率与车流密度(车流量/营业面积)的关系
+   错误: 字段 b.branch_type 不存在
+HINT:  也许您想要引用列"t.branch_type"。
+   LLM修复尝试: 失败
+   修复失败原因: 字段 "c.customer_count" 必须出现在 GROUP BY 子句中或者在聚合函数中使用
+   完整SQL:
+   SELECT b.branch_type AS 档口类型, c.customer_count / NULLIF(s.area,0) AS 车流密度, SUM(b.order_sum)/SUM(c.customer_count) AS 转化率 FROM bss_business_day_data b JOIN (SELECT branch_name, CASE WHEN branch_name LIKE '%餐饮%' THEN '餐饮' WHEN branch_name LIKE '%零售%' THEN '零售' ELSE '其他' END AS branch_type FROM bss_business_day_data GROUP BY branch_name) t ON b.branch_name = t.branch_name JOIN bss_service_area_mapper m ON b.service_name = m.service_name AND m.delete_ts IS NULL JOIN bss_car_day_count c ON m.service_area_id = c.service_area_id AND b.oper_date = c.count_date JOIN (SELECT id, (RANDOM()*1000+500)::INT AS area FROM bss_service_area) s ON m.service_area_id = s.id WHERE b.delete_ts IS NULL AND c.delete_ts IS NULL GROUP BY ROLLUP(branch_type);
+   ----------------------------------------
+
+4. 问题: 各区域管理公司单位能耗产出对比(需结合能耗表)
+   错误: 对于表"e",丢失FROM子句项
+   LLM修复尝试: 失败
+   修复失败原因: 关系 "energy_consumption_table" 不存在
+   完整SQL:
+   SELECT c.company_name AS 公司名称,
+       ROUND(SUM(b.pay_sum)/SUM(e.energy_consumption), 2) AS 单位能耗产出
+FROM bss_company c
+JOIN bss_service_area sa ON c.id = sa.company_id
+JOIN bss_business_day_data b ON sa.service_area_no = b.service_no
+-- 假设存在能耗表 energy_consumption_table e
+-- JOIN energy_consumption_table e ON sa.id = e.service_area_id
+WHERE b.oper_date BETWEEN '2023-01-01' AND '2023-03-31'
+GROUP BY c.company_name
+ORDER BY 单位能耗产出 DESC;
+   ----------------------------------------
+
+成功修复的SQL(共1个):
+==================================================
+
+1. 问题: 各服务区不同数据源支付总额差异分析
+   原始错误: 字段关联 "service_name" 是不明确的
+   修复后SQL:
+   SELECT sa.service_name AS 服务区名称, bdd.source_type AS 数据源类型, SUM(bdd.pay_sum) AS 支付总额 FROM bss_business_day_data bdd JOIN bss_service_area_mapper sa ON bdd.service_no = sa.service_no WHERE bdd.delete_ts IS NULL GROUP BY sa.service_name, bdd.source_type HAVING SUM(bdd.pay_sum) > 10000 ORDER BY 支付总额 DESC LIMIT 20;
+   ----------------------------------------

+ 111 - 4
schema_tools/README.md

@@ -12,6 +12,7 @@
 - 📁 生成标准化的DDL和MD文档
 - 🛡️ 完整的错误处理和日志记录
 - 🎯 **新增**:Question-SQL训练数据生成
+- ✅ **新增**:SQL语句有效性验证
 
 ## 安装依赖
 
@@ -41,7 +42,7 @@ python -m schema_tools \
   --pipeline full
 ```
 
-### 2. 生成Question-SQL训练数据(新功能)
+### 2. 生成Question-SQL训练数据
 
 在生成DDL和MD文件后,可以使用新的Question-SQL生成功能:
 
@@ -60,7 +61,54 @@ python -m schema_tools.qs_generator \
 4. 为每个主题生成10个Question-SQL对
 5. 输出到 `qs_highway_db_时间戳_pair.json` 文件
 
-### 3. 编程方式使用
+### 3. 验证SQL语句有效性(新功能)
+
+在生成Question-SQL对后,可以验证其中的SQL语句:
+
+```bash
+python -m schema_tools.sql_validator \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --input-file ./qs_highway_db_20240101_143052_pair.json \
+  --output-dir ./validation_reports
+```
+
+这将:
+1. 读取Question-SQL对文件
+2. 使用PostgreSQL的EXPLAIN语句验证每个SQL
+3. 生成详细的验证报告
+4. 统计成功率和性能指标
+
+#### SQL验证高级选项
+```bash
+# 基本验证(仅生成报告)
+python -m schema_tools.sql_validator \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --input-file ./data.json
+
+# 删除无效SQL(不进行LLM修复)
+python -m schema_tools.sql_validator \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --input-file ./data.json \
+  --modify-original-file
+
+# 启用LLM修复功能
+python -m schema_tools.sql_validator \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --input-file ./data.json \
+  --enable-llm-repair \
+  --modify-original-file
+
+# 性能调优参数
+python -m schema_tools.sql_validator \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --input-file ./data.json \
+  --max-concurrent 10 \
+  --batch-size 20 \
+  --timeout 60 \
+  --verbose
+```
+
+### 4. 编程方式使用
 
 #### 生成DDL/MD文档
 ```python
@@ -101,6 +149,24 @@ async def generate_qs_data():
 asyncio.run(generate_qs_data())
 ```
 
+#### 验证SQL语句
+```python
+import asyncio
+from schema_tools import SQLValidationAgent
+
+async def validate_sqls():
+    agent = SQLValidationAgent(
+        db_connection="postgresql://user:pass@localhost:5432/dbname",
+        input_file="./qs_highway_db_20240101_143052_pair.json",
+        output_dir="./validation_reports"
+    )
+    
+    report = await agent.validate()
+    print(f"验证完成: {report['summary']['success_rate']:.1%} 成功率")
+
+asyncio.run(validate_sqls())
+```
+
 ## 输出文件结构
 
 ```
@@ -110,7 +176,11 @@ output/
 ├── logs/                         # 日志目录
 │   └── schema_tools_20240101_120000.log
 ├── filename_mapping.txt          # 文件名映射报告
-└── qs_highway_db_20240101_143052_pair.json  # Question-SQL训练数据
+├── qs_highway_db_20240101_143052_pair.json  # Question-SQL训练数据
+├── metadata.txt                  # 主题元数据(INSERT语句)
+└── validation_reports/           # SQL验证报告
+    ├── sql_validation_20240101_150000_report.json
+    └── sql_validation_20240101_150000_summary.txt
 ```
 
 注意:配置已更新为不再创建ddl/和docs/子目录,所有文件直接放在output目录下。
@@ -136,6 +206,17 @@ SCHEMA_TOOLS_CONFIG = {
         "theme_count": 5,                 # 生成主题数量
         "questions_per_theme": 10,        # 每主题问题数
         "max_concurrent_themes": 3,       # 并行处理主题数
+    },
+    
+    # SQL验证配置
+    "sql_validation": {
+        "reuse_connection_pool": True,    # 复用现有连接池
+        "max_concurrent_validations": 5,  # 并发验证数
+        "validation_timeout": 30,         # 单个验证超时(秒)
+        "batch_size": 10,                 # 批处理大小
+        "continue_on_error": True,        # 错误时是否继续
+        "save_validation_report": True,   # 保存验证报告
+        "readonly_mode": True,            # 启用只读模式
     }
 }
 ```
@@ -175,10 +256,32 @@ SCHEMA_TOOLS_CONFIG = {
 ]
 ```
 
+## SQL验证特性
+
+### 功能亮点
+- ✅ 使用EXPLAIN语句验证SQL有效性
+- ⚡ 支持并发验证,提升验证效率
+- 🔒 只读模式运行,安全可靠
+- 📊 详细的验证报告和统计信息
+- 🔄 自动重试机制,处理临时网络问题
+
+### 验证流程
+1. 读取Question-SQL对JSON文件
+2. 提取SQL语句并分批处理
+3. 使用PostgreSQL的EXPLAIN验证语法和表结构
+4. 生成详细验证报告(JSON和文本格式)
+5. 统计成功率和性能指标
+
+### 报告内容
+- 总体统计(成功率、平均耗时等)
+- 错误详情和重试信息
+- 单个SQL的验证结果
+- 配置和元数据信息
+
 ## 常见问题
 
 ### Q: 如何处理只读数据库?
-A: 工具自动检测并适配只读数据库,不会尝试写操作。
+A: 工具自动检测并适配只读数据库,不会尝试写操作。SQL验证器专门设计为只读模式。
 
 ### Q: 如何处理重名表?
 A: 自动生成唯一文件名,如 `hr__users.ddl` 和 `sales__users.ddl`。
@@ -189,12 +292,16 @@ A: 在表清单文件中注释掉(使用 # 开头)或删除相应行。
 ### Q: LLM调用失败怎么办?
 A: 自动重试3次,失败后使用原始注释或默认值。
 
+### Q: SQL验证失败率很高怎么办?
+A: 检查SQL语法、表名是否正确,使用 `--verbose` 查看详细错误信息。
+
 ## 注意事项
 
 1. **数据库权限**:至少需要SELECT权限
 2. **LLM配置**:复用项目的vanna实例配置
 3. **并发控制**:默认最大3个表并发,可调整
 4. **内存使用**:大表采样会限制数据量
+5. **SQL验证**:需要对所有相关表有SELECT权限
 
 ## 开发与扩展
 

+ 2 - 0
schema_tools/__init__.py

@@ -5,12 +5,14 @@ Schema Tools - 自动化数据库逆向工程工具
 
 from .training_data_agent import SchemaTrainingDataAgent
 from .qs_agent import QuestionSQLGenerationAgent
+from .sql_validation_agent import SQLValidationAgent
 from .config import SCHEMA_TOOLS_CONFIG, get_config, update_config
 
 __version__ = "1.0.0"
 __all__ = [
     "SchemaTrainingDataAgent",
     "QuestionSQLGenerationAgent",
+    "SQLValidationAgent",
     "SCHEMA_TOOLS_CONFIG", 
     "get_config",
     "update_config"

+ 22 - 0
schema_tools/config.py

@@ -98,6 +98,28 @@ SCHEMA_TOOLS_CONFIG = {
         "continue_on_theme_error": True,     # 主题生成失败是否继续
         "save_intermediate": True,           # 是否保存中间结果
         "output_file_prefix": "qs",          # 输出文件前缀
+    },
+    
+    # SQL验证配置
+    "sql_validation": {
+        "reuse_connection_pool": True,       # 复用现有连接池
+        "max_concurrent_validations": 5,     # 并发验证数
+        "validation_timeout": 30,            # 单个验证超时(秒)
+        "batch_size": 10,                    # 批处理大小
+        "continue_on_error": True,           # 错误时是否继续
+        "save_validation_report": True,      # 保存验证报告
+        "save_detailed_json_report": False,  # 保存详细JSON报告(可选)
+        "readonly_mode": True,               # 启用只读模式
+        "max_retry_count": 2,                # 验证失败重试次数
+        "report_file_prefix": "sql_validation",  # 报告文件前缀
+        
+        # SQL修复配置
+        "enable_sql_repair": False,          # 启用SQL修复功能(默认禁用)
+        "llm_repair_timeout": 120,           # LLM修复超时时间(秒)
+        "repair_batch_size": 2,              # 修复批处理大小
+        
+        # 文件修改配置
+        "modify_original_file": False,       # 是否修改原始JSON文件(默认禁用)
     }
 }
 

+ 8 - 1
schema_tools/qs_agent.py

@@ -269,6 +269,7 @@ class QuestionSQLGenerationAgent:
 5. 问题应该多样化,覆盖不同的分析角度
 6. 包含时间筛选、分组统计、排序、限制等不同类型的查询
 7. SQL语句末尾必须以分号结束
+8. **重要:问题和SQL都必须是单行文本,不能包含换行符**
 
 输出JSON格式(注意SQL中的双引号需要转义):
 ```json
@@ -295,7 +296,7 @@ class QuestionSQLGenerationAgent:
             response = await asyncio.to_thread(
                 self.vn.chat_with_llm,
                 question=prompt,
-                system_prompt="你是一个专业的数据分析师,精通PostgreSQL语法,擅长设计有业务价值的数据查询。请严格按照JSON格式输出。"
+                system_prompt="你是一个专业的数据分析师,精通PostgreSQL语法,擅长设计有业务价值的数据查询。请严格按照JSON格式输出。特别注意:生成的问题和SQL都必须是单行文本,不能包含换行符。"
             )
             
             if not response or not response.strip():
@@ -347,6 +348,12 @@ class QuestionSQLGenerationAgent:
                 self.logger.warning(f"跳过空问题或SQL的项 {i+1}")
                 continue
             
+            # 清理question中的换行符,替换为空格
+            question = ' '.join(question.split())
+            
+            # 清理SQL中的换行符和多余空格,压缩为单行
+            sql = ' '.join(sql.split())
+            
             # 确保SQL以分号结束
             if not sql.endswith(';'):
                 sql += ';'

+ 730 - 0
schema_tools/sql_validation_agent.py

@@ -0,0 +1,730 @@
+import asyncio
+import json
+import logging
+import time
+from datetime import datetime
+from pathlib import Path
+from typing import List, Dict, Any, Optional
+
+from schema_tools.config import SCHEMA_TOOLS_CONFIG
+from schema_tools.validators import SQLValidator, SQLValidationResult, ValidationStats
+from schema_tools.utils.logger import setup_logging
+
+
+class SQLValidationAgent:
+    """SQL验证Agent - 管理SQL验证的完整流程"""
+    
+    def __init__(self, 
+                 db_connection: str,
+                 input_file: str,
+                 output_dir: str = None):
+        """
+        初始化SQL验证Agent
+        
+        Args:
+            db_connection: 数据库连接字符串
+            input_file: 输入的JSON文件路径(包含Question-SQL对)
+            output_dir: 输出目录(默认为输入文件同目录)
+        """
+        self.db_connection = db_connection
+        self.input_file = Path(input_file)
+        self.output_dir = Path(output_dir) if output_dir else self.input_file.parent
+        
+        self.config = SCHEMA_TOOLS_CONFIG['sql_validation']
+        self.logger = logging.getLogger("schema_tools.SQLValidationAgent")
+        
+        # 初始化验证器
+        self.validator = SQLValidator(db_connection)
+        
+        # 初始化LLM实例(用于SQL修复)
+        self.vn = None
+        if self.config.get('enable_sql_repair', True):
+            self._initialize_llm()
+        
+        # 统计信息
+        self.total_questions = 0
+        self.validation_start_time = None
+        
+    async def validate(self) -> Dict[str, Any]:
+        """
+        执行SQL验证流程
+        
+        Returns:
+            验证结果报告
+        """
+        try:
+            self.validation_start_time = time.time()
+            self.logger.info("🚀 开始SQL验证流程")
+            
+            # 1. 读取输入文件
+            self.logger.info(f"📖 读取输入文件: {self.input_file}")
+            questions_sqls = await self._load_questions_sqls()
+            self.total_questions = len(questions_sqls)
+            
+            if not questions_sqls:
+                raise ValueError("输入文件中没有找到有效的Question-SQL对")
+            
+            self.logger.info(f"✅ 成功读取 {self.total_questions} 个Question-SQL对")
+            
+            # 2. 提取SQL语句
+            sqls = [item['sql'] for item in questions_sqls]
+            
+            # 3. 执行验证
+            self.logger.info("🔍 开始SQL验证...")
+            validation_results = await self._validate_sqls_with_batching(sqls)
+            
+            # 4. 计算统计信息
+            stats = self.validator.calculate_stats(validation_results)
+            
+            # 5. 尝试修复失败的SQL(如果启用LLM修复)
+            file_modification_stats = {'modified': 0, 'deleted': 0, 'failed_modifications': 0}
+            if self.config.get('enable_sql_repair', False) and self.vn:
+                self.logger.info("🔧 启用LLM修复功能,开始修复失败的SQL...")
+                validation_results = await self._attempt_sql_repair(questions_sqls, validation_results)
+                # 重新计算统计信息(包含修复结果)
+                stats = self.validator.calculate_stats(validation_results)
+            
+            # 6. 修改原始JSON文件(如果启用文件修改)
+            if self.config.get('modify_original_file', False):
+                self.logger.info("📝 启用文件修改功能,开始修改原始JSON文件...")
+                file_modification_stats = await self._modify_original_json_file(questions_sqls, validation_results)
+            else:
+                self.logger.info("📋 不修改原始文件")
+            
+            # 7. 生成详细报告
+            report = await self._generate_report(questions_sqls, validation_results, stats, file_modification_stats)
+            
+            # 8. 保存验证报告
+            if self.config['save_validation_report']:
+                await self._save_validation_report(report)
+            
+            # 9. 输出结果摘要
+            self._print_summary(stats, validation_results, file_modification_stats)
+            
+            return report
+            
+        except Exception as e:
+            self.logger.exception("❌ SQL验证流程失败")
+            raise
+    
+    async def _load_questions_sqls(self) -> List[Dict[str, str]]:
+        """读取Question-SQL对"""
+        try:
+            with open(self.input_file, 'r', encoding='utf-8') as f:
+                data = json.load(f)
+            
+            # 验证数据格式
+            if not isinstance(data, list):
+                raise ValueError("输入文件应包含Question-SQL对的数组")
+            
+            questions_sqls = []
+            for i, item in enumerate(data):
+                if not isinstance(item, dict):
+                    self.logger.warning(f"跳过第 {i+1} 项:格式不正确")
+                    continue
+                
+                if 'question' not in item or 'sql' not in item:
+                    self.logger.warning(f"跳过第 {i+1} 项:缺少question或sql字段")
+                    continue
+                
+                questions_sqls.append({
+                    'index': i,
+                    'question': item['question'],
+                    'sql': item['sql'].strip()
+                })
+            
+            return questions_sqls
+            
+        except json.JSONDecodeError as e:
+            raise ValueError(f"输入文件不是有效的JSON格式: {e}")
+        except Exception as e:
+            raise ValueError(f"读取输入文件失败: {e}")
+    
+    async def _validate_sqls_with_batching(self, sqls: List[str]) -> List[SQLValidationResult]:
+        """使用批处理方式验证SQL"""
+        batch_size = self.config['batch_size']
+        all_results = []
+        
+        # 分批处理
+        for i in range(0, len(sqls), batch_size):
+            batch = sqls[i:i + batch_size]
+            batch_num = i // batch_size + 1
+            total_batches = (len(sqls) + batch_size - 1) // batch_size
+            
+            self.logger.info(f"📦 处理批次 {batch_num}/{total_batches} ({len(batch)} 个SQL)")
+            
+            batch_results = await self.validator.validate_sqls_batch(batch)
+            all_results.extend(batch_results)
+            
+            # 显示批次进度
+            valid_count = sum(1 for r in batch_results if r.valid)
+            self.logger.info(f"✅ 批次 {batch_num} 完成: {valid_count}/{len(batch)} 有效")
+        
+        return all_results
+    
+    async def _generate_report(self, 
+                              questions_sqls: List[Dict], 
+                              validation_results: List[SQLValidationResult],
+                              stats: ValidationStats,
+                              file_modification_stats: Dict[str, int] = None) -> Dict[str, Any]:
+        """生成详细验证报告"""
+        
+        validation_time = time.time() - self.validation_start_time
+        
+        # 合并问题和验证结果
+        detailed_results = []
+        for i, (qs, result) in enumerate(zip(questions_sqls, validation_results)):
+            detailed_results.append({
+                'index': i + 1,
+                'question': qs['question'],
+                'sql': qs['sql'],
+                'valid': result.valid,
+                'error_message': result.error_message,
+                'execution_time': result.execution_time,
+                'retry_count': result.retry_count,
+                
+                # 添加修复信息
+                'repair_attempted': result.repair_attempted,
+                'repair_successful': result.repair_successful,
+                'repaired_sql': result.repaired_sql,
+                'repair_error': result.repair_error
+            })
+        
+        # 生成报告
+        report = {
+            'metadata': {
+                'input_file': str(self.input_file),
+                'validation_time': datetime.now().isoformat(),
+                'total_validation_time': validation_time,
+                'database_connection': self._mask_connection_string(self.db_connection),
+                'config': self.config.copy()
+            },
+            'summary': {
+                'total_questions': stats.total_sqls,
+                'valid_sqls': stats.valid_sqls,
+                'invalid_sqls': stats.invalid_sqls,
+                'success_rate': stats.valid_sqls / stats.total_sqls if stats.total_sqls > 0 else 0.0,
+                'average_execution_time': stats.avg_time_per_sql,
+                'total_retries': stats.retry_count,
+                
+                # 添加修复统计
+                'repair_stats': {
+                    'attempted': stats.repair_attempted,
+                    'successful': stats.repair_successful,
+                    'failed': stats.repair_failed
+                },
+                
+                # 添加文件修改统计
+                'file_modification_stats': file_modification_stats or {
+                    'modified': 0, 'deleted': 0, 'failed_modifications': 0
+                }
+            },
+            'detailed_results': detailed_results
+        }
+        
+        return report
+    
+    async def _save_validation_report(self, report: Dict[str, Any]):
+        """保存验证报告"""
+        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
+        
+        # 只保存文本格式摘要(便于查看)
+        txt_file = self.output_dir / f"{self.config['report_file_prefix']}_{timestamp}_summary.txt"
+        with open(txt_file, 'w', encoding='utf-8') as f:
+            f.write(f"SQL验证报告\n")
+            f.write(f"=" * 50 + "\n\n")
+            f.write(f"输入文件: {self.input_file}\n")
+            f.write(f"验证时间: {report['metadata']['validation_time']}\n")
+            f.write(f"验证耗时: {report['metadata']['total_validation_time']:.2f}秒\n\n")
+            
+            f.write(f"验证结果摘要:\n")
+            f.write(f"  总SQL数量: {report['summary']['total_questions']}\n")
+            f.write(f"  有效SQL: {report['summary']['valid_sqls']}\n")
+            f.write(f"  无效SQL: {report['summary']['invalid_sqls']}\n")
+            f.write(f"  成功率: {report['summary']['success_rate']:.2%}\n")
+            f.write(f"  平均耗时: {report['summary']['average_execution_time']:.3f}秒\n")
+            f.write(f"  重试次数: {report['summary']['total_retries']}\n\n")
+            
+            # 添加修复统计
+            if 'repair_stats' in report['summary']:
+                repair_stats = report['summary']['repair_stats']
+                f.write(f"SQL修复统计:\n")
+                f.write(f"  尝试修复: {repair_stats['attempted']}\n")
+                f.write(f"  修复成功: {repair_stats['successful']}\n")
+                f.write(f"  修复失败: {repair_stats['failed']}\n")
+                if repair_stats['attempted'] > 0:
+                    f.write(f"  修复成功率: {repair_stats['successful'] / repair_stats['attempted']:.2%}\n")
+                f.write(f"\n")
+            
+            # 添加文件修改统计
+            if 'file_modification_stats' in report['summary']:
+                file_stats = report['summary']['file_modification_stats']
+                f.write(f"原始文件修改统计:\n")
+                f.write(f"  修改的SQL: {file_stats['modified']}\n")
+                f.write(f"  删除的无效项: {file_stats['deleted']}\n")
+                f.write(f"  修改失败: {file_stats['failed_modifications']}\n")
+                f.write(f"\n")
+            
+            # 提取错误详情(显示完整SQL)
+            error_results = [r for r in report['detailed_results'] if not r['valid'] and not r.get('repair_successful', False)]
+            if error_results:
+                f.write(f"错误详情(共{len(error_results)}个):\n")
+                f.write(f"=" * 50 + "\n")
+                for i, error_result in enumerate(error_results, 1):
+                    f.write(f"\n{i}. 问题: {error_result['question']}\n")
+                    f.write(f"   错误: {error_result['error_message']}\n")
+                    if error_result['retry_count'] > 0:
+                        f.write(f"   重试: {error_result['retry_count']}次\n")
+                    
+                    # 显示修复尝试信息
+                    if error_result.get('repair_attempted', False):
+                        if error_result.get('repair_successful', False):
+                            f.write(f"   LLM修复尝试: 成功\n")
+                            f.write(f"   修复后SQL:\n")
+                            f.write(f"   {error_result.get('repaired_sql', '')}\n")
+                        else:
+                            f.write(f"   LLM修复尝试: 失败\n")
+                            repair_error = error_result.get('repair_error', '未知错误')
+                            f.write(f"   修复失败原因: {repair_error}\n")
+                    else:
+                        f.write(f"   LLM修复尝试: 未尝试\n")
+                    
+                    f.write(f"   完整SQL:\n")
+                    f.write(f"   {error_result['sql']}\n")
+                    f.write(f"   {'-' * 40}\n")
+            
+            # 显示成功修复的SQL
+            repaired_results = [r for r in report['detailed_results'] if r.get('repair_successful', False)]
+            if repaired_results:
+                f.write(f"\n成功修复的SQL(共{len(repaired_results)}个):\n")
+                f.write(f"=" * 50 + "\n")
+                for i, repaired_result in enumerate(repaired_results, 1):
+                    f.write(f"\n{i}. 问题: {repaired_result['question']}\n")
+                    f.write(f"   原始错误: {repaired_result['error_message']}\n")
+                    f.write(f"   修复后SQL:\n")
+                    f.write(f"   {repaired_result.get('repaired_sql', '')}\n")
+                    f.write(f"   {'-' * 40}\n")
+        
+        self.logger.info(f"📊 验证报告已保存: {txt_file}")
+        
+        # 如果配置允许,也可以保存JSON格式的详细报告(可选)
+        if self.config.get('save_detailed_json_report', False):
+            json_file = self.output_dir / f"{self.config['report_file_prefix']}_{timestamp}_report.json"
+            with open(json_file, 'w', encoding='utf-8') as f:
+                json.dump(report, f, ensure_ascii=False, indent=2)
+            self.logger.info(f"📊 详细JSON报告已保存: {json_file}")
+    
+    def _mask_connection_string(self, conn_str: str) -> str:
+        """隐藏连接字符串中的敏感信息"""
+        import re
+        # 隐藏密码
+        return re.sub(r':[^:@]+@', ':***@', conn_str)
+    
+    def _print_summary(self, stats: ValidationStats, validation_results: List[SQLValidationResult] = None, file_modification_stats: Dict[str, int] = None):
+        """打印验证结果摘要"""
+        validation_time = time.time() - self.validation_start_time
+        
+        self.logger.info("=" * 60)
+        self.logger.info("📊 SQL验证结果摘要")
+        self.logger.info(f"  📝 总SQL数量: {stats.total_sqls}")
+        self.logger.info(f"  ✅ 有效SQL: {stats.valid_sqls}")
+        self.logger.info(f"  ❌ 无效SQL: {stats.invalid_sqls}")
+        self.logger.info(f"  📈 成功率: {stats.valid_sqls / stats.total_sqls:.2%}")
+        self.logger.info(f"  ⏱️  平均耗时: {stats.avg_time_per_sql:.3f}秒/SQL")
+        self.logger.info(f"  🔄 重试次数: {stats.retry_count}")
+        self.logger.info(f"  ⏰ 总耗时: {validation_time:.2f}秒")
+        
+        # 添加修复统计
+        if stats.repair_attempted > 0:
+            self.logger.info(f"  🔧 修复尝试: {stats.repair_attempted}")
+            self.logger.info(f"  ✅ 修复成功: {stats.repair_successful}")
+            self.logger.info(f"  ❌ 修复失败: {stats.repair_failed}")
+            repair_rate = stats.repair_successful / stats.repair_attempted if stats.repair_attempted > 0 else 0.0
+            self.logger.info(f"  📈 修复成功率: {repair_rate:.2%}")
+        
+        # 添加文件修改统计
+        if file_modification_stats:
+            self.logger.info(f"  📝 文件修改: {file_modification_stats['modified']} 个SQL")
+            self.logger.info(f"  🗑️  删除无效项: {file_modification_stats['deleted']} 个")
+            if file_modification_stats['failed_modifications'] > 0:
+                self.logger.info(f"  ⚠️  修改失败: {file_modification_stats['failed_modifications']} 个")
+        
+        self.logger.info("=" * 60)
+        
+        # 显示部分错误信息
+        if validation_results:
+            error_results = [r for r in validation_results if not r.valid]
+            if error_results:
+                self.logger.info(f"⚠️  前5个错误示例:")
+                for i, error_result in enumerate(error_results[:5], 1):
+                    self.logger.info(f"  {i}. {error_result.error_message}")
+                    # 显示SQL的前80个字符
+                    sql_preview = error_result.sql[:80] + '...' if len(error_result.sql) > 80 else error_result.sql
+                    self.logger.info(f"     SQL: {sql_preview}")
+    
+    def _initialize_llm(self):
+        """初始化LLM实例"""
+        try:
+            from core.vanna_llm_factory import create_vanna_instance
+            self.vn = create_vanna_instance()
+            self.logger.info("✅ LLM实例初始化成功,SQL修复功能已启用")
+        except Exception as e:
+            self.logger.warning(f"⚠️  LLM初始化失败,SQL修复功能将被禁用: {e}")
+            self.vn = None 
+    
+    async def _attempt_sql_repair(self, questions_sqls: List[Dict], validation_results: List[SQLValidationResult]) -> List[SQLValidationResult]:
+        """
+        尝试修复失败的SQL
+        
+        Args:
+            questions_sqls: 问题SQL对列表
+            validation_results: 验证结果列表
+            
+        Returns:
+            更新后的验证结果列表
+        """
+        # 找出需要修复的SQL
+        failed_indices = []
+        for i, result in enumerate(validation_results):
+            if not result.valid:
+                failed_indices.append(i)
+        
+        if not failed_indices:
+            self.logger.info("🎉 所有SQL都有效,无需修复")
+            return validation_results
+        
+        self.logger.info(f"🔧 开始修复 {len(failed_indices)} 个失败的SQL...")
+        
+        # 批量修复
+        batch_size = self.config.get('repair_batch_size', 5)
+        updated_results = validation_results.copy()
+        
+        for i in range(0, len(failed_indices), batch_size):
+            batch_indices = failed_indices[i:i + batch_size]
+            self.logger.info(f"📦 修复批次 {i//batch_size + 1}/{(len(failed_indices) + batch_size - 1)//batch_size} ({len(batch_indices)} 个SQL)")
+            
+            # 准备批次数据
+            batch_data = []
+            for idx in batch_indices:
+                batch_data.append({
+                    'index': idx,
+                    'question': questions_sqls[idx]['question'],
+                    'sql': validation_results[idx].sql,
+                    'error': validation_results[idx].error_message
+                })
+            
+            # 调用LLM修复
+            repaired_sqls = await self._repair_sqls_with_llm(batch_data)
+            
+            # 验证修复后的SQL
+            for j, idx in enumerate(batch_indices):
+                original_result = updated_results[idx]
+                original_result.repair_attempted = True
+                
+                if j < len(repaired_sqls) and repaired_sqls[j]:
+                    repaired_sql = repaired_sqls[j]
+                    
+                    # 验证修复后的SQL
+                    repair_result = await self.validator.validate_sql(repaired_sql)
+                    
+                    if repair_result.valid:
+                        # 修复成功
+                        original_result.repair_successful = True
+                        original_result.repaired_sql = repaired_sql
+                        original_result.valid = True  # 更新为有效
+                        self.logger.info(f"✅ SQL修复成功 (索引 {idx})")
+                    else:
+                        # 修复失败
+                        original_result.repair_successful = False
+                        original_result.repair_error = repair_result.error_message
+                        self.logger.warning(f"❌ SQL修复失败 (索引 {idx}): {repair_result.error_message}")
+                else:
+                    # LLM修复失败
+                    original_result.repair_successful = False
+                    original_result.repair_error = "LLM修复失败或返回空结果"
+                    self.logger.warning(f"❌ LLM修复失败 (索引 {idx})")
+        
+        # 统计修复结果
+        repair_attempted = sum(1 for r in updated_results if r.repair_attempted)
+        repair_successful = sum(1 for r in updated_results if r.repair_successful)
+        
+        self.logger.info(f"🔧 修复完成: {repair_successful}/{repair_attempted} 成功")
+        
+        return updated_results 
+    
+    async def _modify_original_json_file(self, questions_sqls: List[Dict], validation_results: List[SQLValidationResult]) -> Dict[str, int]:
+        """
+        修改原始JSON文件:
+        1. 对于修复成功的SQL,更新原始文件中的SQL内容
+        2. 对于无法修复的SQL,从原始文件中删除对应的键值对
+        
+        Returns:
+            修改统计信息
+        """
+        stats = {'modified': 0, 'deleted': 0, 'failed_modifications': 0}
+        
+        try:
+            # 读取原始JSON文件
+            with open(self.input_file, 'r', encoding='utf-8') as f:
+                original_data = json.load(f)
+            
+            if not isinstance(original_data, list):
+                self.logger.error("原始JSON文件格式不正确,无法修改")
+                stats['failed_modifications'] = 1
+                return stats
+            
+            # 创建备份文件
+            backup_file = Path(str(self.input_file) + '.backup')
+            with open(backup_file, 'w', encoding='utf-8') as f:
+                json.dump(original_data, f, ensure_ascii=False, indent=2)
+            self.logger.info(f"已创建备份文件: {backup_file}")
+            
+            # 构建修改计划
+            modifications = []
+            deletions = []
+            
+            for i, (qs, result) in enumerate(zip(questions_sqls, validation_results)):
+                if result.repair_successful and result.repaired_sql:
+                    # 修复成功的SQL
+                    modifications.append({
+                        'index': i,
+                        'original_sql': result.sql,
+                        'repaired_sql': result.repaired_sql,
+                        'question': qs['question']
+                    })
+                elif not result.valid and not result.repair_successful:
+                    # 无法修复的SQL,标记删除
+                    deletions.append({
+                        'index': i,
+                        'question': qs['question'],
+                        'sql': result.sql,
+                        'error': result.error_message
+                    })
+            
+            # 执行修改(从后往前,避免索引变化)
+            new_data = original_data.copy()
+            
+            # 先删除无效项(从后往前删除)
+            for deletion in sorted(deletions, key=lambda x: x['index'], reverse=True):
+                if deletion['index'] < len(new_data):
+                    removed_item = new_data.pop(deletion['index'])
+                    stats['deleted'] += 1
+                    self.logger.info(f"删除无效项 {deletion['index']}: {deletion['question'][:50]}...")
+            
+            # 再修改SQL(需要重新计算索引)
+            index_offset = 0
+            for modification in sorted(modifications, key=lambda x: x['index']):
+                # 计算删除后的新索引
+                new_index = modification['index']
+                for deletion in deletions:
+                    if deletion['index'] < modification['index']:
+                        new_index -= 1
+                
+                if new_index < len(new_data):
+                    new_data[new_index]['sql'] = modification['repaired_sql']
+                    stats['modified'] += 1
+                    self.logger.info(f"修改SQL {new_index}: {modification['question'][:50]}...")
+            
+            # 写入修改后的文件
+            with open(self.input_file, 'w', encoding='utf-8') as f:
+                json.dump(new_data, f, ensure_ascii=False, indent=2)
+            
+            self.logger.info(f"✅ 原始文件修改完成: 修改{stats['modified']}个SQL,删除{stats['deleted']}个无效项")
+            
+            # 记录详细修改信息到日志文件
+            await self._write_modification_log(modifications, deletions)
+            
+        except Exception as e:
+            self.logger.error(f"修改原始JSON文件失败: {e}")
+            stats['failed_modifications'] = 1
+        
+        return stats
+    
+    async def _write_modification_log(self, modifications: List[Dict], deletions: List[Dict]):
+        """写入详细的修改日志"""
+        try:
+            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
+            log_file = self.output_dir / f"file_modifications_{timestamp}.log"
+            
+            with open(log_file, 'w', encoding='utf-8') as f:
+                f.write(f"原始JSON文件修改日志\n")
+                f.write(f"=" * 50 + "\n")
+                f.write(f"修改时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
+                f.write(f"原始文件: {self.input_file}\n")
+                f.write(f"备份文件: {str(self.input_file)}.backup\n")
+                f.write(f"\n")
+                
+                if modifications:
+                    f.write(f"修改的SQL ({len(modifications)}个):\n")
+                    f.write(f"-" * 40 + "\n")
+                    for i, mod in enumerate(modifications, 1):
+                        f.write(f"{i}. 索引: {mod['index']}\n")
+                        f.write(f"   问题: {mod['question']}\n")
+                        f.write(f"   原SQL: {mod['original_sql']}\n")
+                        f.write(f"   新SQL: {mod['repaired_sql']}\n\n")
+                
+                if deletions:
+                    f.write(f"删除的无效项 ({len(deletions)}个):\n")
+                    f.write(f"-" * 40 + "\n")
+                    for i, del_item in enumerate(deletions, 1):
+                        f.write(f"{i}. 索引: {del_item['index']}\n")
+                        f.write(f"   问题: {del_item['question']}\n")
+                        f.write(f"   SQL: {del_item['sql']}\n")
+                        f.write(f"   错误: {del_item['error']}\n\n")
+            
+            self.logger.info(f"详细修改日志已保存: {log_file}")
+            
+        except Exception as e:
+            self.logger.warning(f"写入修改日志失败: {e}")
+    
+    async def _repair_sqls_with_llm(self, batch_data: List[Dict]) -> List[str]:
+        """
+        使用LLM修复SQL批次
+        
+        Args:
+            batch_data: 批次数据,包含question, sql, error
+            
+        Returns:
+            修复后的SQL列表
+        """
+        try:
+            # 构建修复提示词
+            prompt = self._build_repair_prompt(batch_data)
+            
+            # 调用LLM
+            response = await self._call_llm_for_repair(prompt)
+            
+            # 解析响应
+            repaired_sqls = self._parse_repair_response(response, len(batch_data))
+            
+            return repaired_sqls
+            
+        except Exception as e:
+            self.logger.error(f"LLM修复批次失败: {e}")
+            return [""] * len(batch_data)  # 返回空字符串列表 
+    
+    def _build_repair_prompt(self, batch_data: List[Dict]) -> str:
+        """构建SQL修复提示词"""
+        
+        # 提取数据库类型
+        db_type = "PostgreSQL"  # 从连接字符串可以确定是PostgreSQL
+        
+        prompt = f"""你是一个SQL专家,专门修复PostgreSQL数据库的SQL语句错误。
+
+数据库类型: {db_type}
+
+请修复以下SQL语句中的错误。对于每个SQL,我会提供问题描述、错误信息和完整的SQL语句。
+
+修复要求:
+1. 只修复语法错误和表结构错误
+2. 保持SQL的原始业务逻辑不变
+3. 使用PostgreSQL标准语法
+4. 确保修复后的SQL语法正确
+
+需要修复的SQL:
+
+"""
+        
+        # 添加每个SQL的详细信息
+        for i, data in enumerate(batch_data, 1):
+            prompt += f"""
+{i}. 问题: {data['question']}
+   错误: {data['error']}
+   完整SQL:
+   {data['sql']}
+
+"""
+        
+        prompt += f"""
+请按以下JSON格式输出修复后的SQL:
+```json
+{{
+  "repaired_sqls": [
+    "修复后的SQL1",
+    "修复后的SQL2",
+    "修复后的SQL3"
+  ]
+}}
+```
+
+注意:
+- 必须输出 {len(batch_data)} 个修复后的SQL
+- 如果某个SQL无法修复,请输出原始SQL
+- SQL语句必须以分号结束
+- 保持原始的中文别名和业务逻辑"""
+        
+        return prompt 
+    
+    async def _call_llm_for_repair(self, prompt: str) -> str:
+        """调用LLM进行修复"""
+        import asyncio
+        
+        try:
+            timeout = self.config.get('llm_repair_timeout', 60)
+            
+            response = await asyncio.wait_for(
+                asyncio.to_thread(
+                    self.vn.chat_with_llm,
+                    question=prompt,
+                    system_prompt="你是一个专业的PostgreSQL SQL专家,专门负责修复SQL语句中的语法错误和表结构错误。请严格按照JSON格式输出修复结果。"
+                ),
+                timeout=timeout
+            )
+            
+            if not response or not response.strip():
+                raise ValueError("LLM返回空响应")
+            
+            return response.strip()
+            
+        except asyncio.TimeoutError:
+            raise Exception(f"LLM调用超时({timeout}秒)")
+        except Exception as e:
+            raise Exception(f"LLM调用失败: {e}") 
+    
+    def _parse_repair_response(self, response: str, expected_count: int) -> List[str]:
+        """解析LLM修复响应"""
+        import json
+        import re
+        
+        try:
+            # 尝试提取JSON部分
+            json_match = re.search(r'```json\s*(.*?)\s*```', response, re.DOTALL)
+            if json_match:
+                json_str = json_match.group(1)
+            else:
+                # 如果没有代码块,尝试直接解析
+                json_str = response
+            
+            # 解析JSON
+            parsed_data = json.loads(json_str)
+            repaired_sqls = parsed_data.get('repaired_sqls', [])
+            
+            # 验证数量
+            if len(repaired_sqls) != expected_count:
+                self.logger.warning(f"LLM返回的SQL数量不匹配:期望{expected_count},实际{len(repaired_sqls)}")
+                # 补齐或截断
+                while len(repaired_sqls) < expected_count:
+                    repaired_sqls.append("")
+                repaired_sqls = repaired_sqls[:expected_count]
+            
+            # 清理SQL语句
+            cleaned_sqls = []
+            for sql in repaired_sqls:
+                if sql and isinstance(sql, str):
+                    cleaned_sql = sql.strip()
+                    # 确保以分号结束
+                    if cleaned_sql and not cleaned_sql.endswith(';'):
+                        cleaned_sql += ';'
+                    cleaned_sqls.append(cleaned_sql)
+                else:
+                    cleaned_sqls.append("")
+            
+            return cleaned_sqls
+            
+        except json.JSONDecodeError as e:
+            self.logger.error(f"解析LLM修复响应失败: {e}")
+            self.logger.debug(f"原始响应: {response}")
+            return [""] * expected_count
+        except Exception as e:
+            self.logger.error(f"处理修复响应失败: {e}")
+            return [""] * expected_count 

+ 251 - 0
schema_tools/sql_validation_example.py

@@ -0,0 +1,251 @@
+"""
+SQL验证器使用示例
+演示如何使用SQL验证功能
+"""
+
+import asyncio
+import json
+import sys
+import os
+from pathlib import Path
+
+# 添加项目根目录到Python路径
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from schema_tools import SQLValidationAgent
+from schema_tools.utils.logger import setup_logging
+
+
+async def example_basic_validation():
+    """基本SQL验证示例"""
+    print("=" * 60)
+    print("基本SQL验证示例")
+    print("=" * 60)
+    
+    # 创建测试数据
+    test_data = [
+        {
+            "question": "查询所有用户",
+            "sql": "SELECT * FROM users;"
+        },
+        {
+            "question": "按年龄分组统计用户数",
+            "sql": "SELECT age, COUNT(*) as user_count FROM users GROUP BY age ORDER BY age;"
+        },
+        {
+            "question": "查询不存在的表",
+            "sql": "SELECT * FROM non_existent_table;"
+        },
+        {
+            "question": "语法错误的SQL",
+            "sql": "SELECT * FORM users;"  # FORM而不是FROM
+        }
+    ]
+    
+    # 保存测试数据到文件
+    test_file = Path("test_sql_data.json")
+    with open(test_file, 'w', encoding='utf-8') as f:
+        json.dump(test_data, f, ensure_ascii=False, indent=2)
+    
+    print(f"创建测试文件: {test_file}")
+    print(f"包含 {len(test_data)} 个Question-SQL对")
+    
+    # 这里需要实际的数据库连接字符串
+    # 请根据实际情况修改
+    db_connection = "postgresql://user:password@localhost:5432/test_db"
+    
+    try:
+        # 创建SQL验证Agent
+        agent = SQLValidationAgent(
+            db_connection=db_connection,
+            input_file=str(test_file),
+            output_dir="./validation_example_output"
+        )
+        
+        print(f"\n开始验证...")
+        
+        # 执行验证
+        report = await agent.validate()
+        
+        print(f"\n验证完成!")
+        print(f"成功率: {report['summary']['success_rate']:.1%}")
+        print(f"有效SQL: {report['summary']['valid_sqls']}/{report['summary']['total_questions']}")
+        
+        # 显示错误详情
+        if report['errors']:
+            print(f"\n错误详情:")
+            for i, error in enumerate(report['errors'], 1):
+                print(f"  {i}. {error['error']}")
+                print(f"     SQL: {error['sql'][:100]}...")
+        
+    except Exception as e:
+        print(f"验证失败: {e}")
+        print("请检查数据库连接字符串和数据库权限")
+    
+    finally:
+        # 清理测试文件
+        if test_file.exists():
+            test_file.unlink()
+            print(f"\n清理测试文件: {test_file}")
+
+
+async def example_with_real_data():
+    """使用真实数据的SQL验证示例"""
+    print("=" * 60)
+    print("真实数据SQL验证示例")
+    print("=" * 60)
+    
+    # 检查是否有现有的Question-SQL文件
+    possible_files = list(Path(".").glob("qs_*_pair.json"))
+    
+    if not possible_files:
+        print("未找到现有的Question-SQL文件")
+        print("请先运行 qs_generator 生成Question-SQL对,或使用基本示例")
+        return
+    
+    input_file = possible_files[0]
+    print(f"找到文件: {input_file}")
+    
+    # 读取文件内容预览
+    with open(input_file, 'r', encoding='utf-8') as f:
+        data = json.load(f)
+    
+    print(f"文件包含 {len(data)} 个Question-SQL对")
+    print(f"前3个问题预览:")
+    for i, item in enumerate(data[:3], 1):
+        print(f"  {i}. {item['question']}")
+    
+    # 数据库连接(需要根据实际情况修改)
+    db_connection = "postgresql://user:password@localhost:5432/your_db"
+    
+    try:
+        agent = SQLValidationAgent(
+            db_connection=db_connection,
+            input_file=str(input_file),
+            output_dir="./validation_real_output"
+        )
+        
+        print(f"\n开始验证...")
+        report = await agent.validate()
+        
+        print(f"\n验证结果:")
+        print(f"  总SQL数: {report['summary']['total_questions']}")
+        print(f"  有效SQL: {report['summary']['valid_sqls']}")
+        print(f"  无效SQL: {report['summary']['invalid_sqls']}")
+        print(f"  成功率: {report['summary']['success_rate']:.1%}")
+        print(f"  平均耗时: {report['summary']['average_execution_time']:.3f}秒")
+        
+    except Exception as e:
+        print(f"验证失败: {e}")
+
+
+async def example_configuration_demo():
+    """配置演示示例"""
+    print("=" * 60)
+    print("配置选项演示")
+    print("=" * 60)
+    
+    from schema_tools.config import SCHEMA_TOOLS_CONFIG
+    
+    print("当前SQL验证配置:")
+    sql_config = SCHEMA_TOOLS_CONFIG['sql_validation']
+    for key, value in sql_config.items():
+        print(f"  {key}: {value}")
+    
+    print("\n可以通过命令行参数覆盖配置:")
+    print("  --max-concurrent 10    # 最大并发数")
+    print("  --batch-size 20        # 批处理大小")
+    print("  --timeout 60           # 验证超时时间")
+    
+    print("\n或者在代码中修改配置:")
+    print("  SCHEMA_TOOLS_CONFIG['sql_validation']['max_concurrent_validations'] = 10")
+
+
+def print_usage_examples():
+    """打印使用示例"""
+    print("=" * 60)
+    print("SQL验证器命令行使用示例")
+    print("=" * 60)
+    
+    examples = [
+        {
+            "title": "基本验证",
+            "command": """python -m schema_tools.sql_validator \\
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \\
+  --input-file ./qs_data.json"""
+        },
+        {
+            "title": "指定输出目录",
+            "command": """python -m schema_tools.sql_validator \\
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \\
+  --input-file ./qs_data.json \\
+  --output-dir ./reports"""
+        },
+        {
+            "title": "调整性能参数",
+            "command": """python -m schema_tools.sql_validator \\
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \\
+  --input-file ./qs_data.json \\
+  --max-concurrent 10 \\
+  --batch-size 20 \\
+  --timeout 60"""
+        },
+        {
+            "title": "预检查模式",
+            "command": """python -m schema_tools.sql_validator \\
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \\
+  --input-file ./qs_data.json \\
+  --dry-run"""
+        },
+        {
+            "title": "详细日志",
+            "command": """python -m schema_tools.sql_validator \\
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \\
+  --input-file ./qs_data.json \\
+  --verbose \\
+  --log-file validation.log"""
+        }
+    ]
+    
+    for example in examples:
+        print(f"\n{example['title']}:")
+        print(example['command'])
+
+
+async def main():
+    """主函数"""
+    # 设置日志
+    setup_logging(verbose=True)
+    
+    print("Schema Tools SQL验证器示例")
+    print("请选择要运行的示例:")
+    print("1. 基本SQL验证示例")
+    print("2. 真实数据验证示例")
+    print("3. 配置选项演示")
+    print("4. 命令行使用示例")
+    print("0. 退出")
+    
+    try:
+        choice = input("\n请输入选择 (0-4): ").strip()
+        
+        if choice == "1":
+            await example_basic_validation()
+        elif choice == "2":
+            await example_with_real_data()
+        elif choice == "3":
+            await example_configuration_demo()
+        elif choice == "4":
+            print_usage_examples()
+        elif choice == "0":
+            print("退出示例程序")
+        else:
+            print("无效选择")
+    
+    except KeyboardInterrupt:
+        print("\n\n用户中断,退出程序")
+    except Exception as e:
+        print(f"\n示例执行失败: {e}")
+
+
+if __name__ == "__main__":
+    asyncio.run(main()) 

+ 241 - 0
schema_tools/sql_validator.py

@@ -0,0 +1,241 @@
+"""
+SQL验证器命令行入口
+用于验证Question-SQL对中的SQL语句是否有效
+"""
+
+import argparse
+import asyncio
+import sys
+import os
+from pathlib import Path
+
+from schema_tools.sql_validation_agent import SQLValidationAgent
+from schema_tools.utils.logger import setup_logging
+
+
+def setup_argument_parser():
+    """设置命令行参数解析器"""
+    parser = argparse.ArgumentParser(
+        description='SQL Validator - 验证Question-SQL对中的SQL语句',
+        formatter_class=argparse.RawDescriptionHelpFormatter,
+        epilog="""
+示例用法:
+  # 基本使用(仅验证,不修改文件)
+  python -m schema_tools.sql_validator --db-connection "postgresql://user:pass@localhost:5432/dbname" --input-file ./data.json
+  
+  # 仅删除无效SQL,不进行LLM修复
+  python -m schema_tools.sql_validator --db-connection "postgresql://user:pass@localhost:5432/dbname" --input-file ./data.json --modify-original-file
+  
+  # 启用LLM修复功能(需要同时指定文件修改参数)
+  python -m schema_tools.sql_validator --db-connection "postgresql://user:pass@localhost:5432/dbname" --input-file ./data.json --enable-llm-repair --modify-original-file
+  
+  # 指定输出目录
+  python -m schema_tools.sql_validator --db-connection "postgresql://user:pass@localhost:5432/dbname" --input-file ./data.json --output-dir ./reports
+  
+  # 启用详细日志
+  python -m schema_tools.sql_validator --db-connection "postgresql://user:pass@localhost:5432/dbname" --input-file ./data.json --verbose
+        """
+    )
+    
+    # 必需参数
+    parser.add_argument(
+        '--db-connection',
+        required=True,
+        help='数据库连接字符串 (postgresql://user:pass@host:port/dbname)'
+    )
+    
+    parser.add_argument(
+        '--input-file',
+        required=True,
+        help='输入的JSON文件路径(包含Question-SQL对)'
+    )
+    
+    # 可选参数
+    parser.add_argument(
+        '--output-dir',
+        help='验证报告输出目录(默认为输入文件同目录)'
+    )
+    
+    parser.add_argument(
+        '--max-concurrent',
+        type=int,
+        help='最大并发验证数(覆盖配置文件设置)'
+    )
+    
+    parser.add_argument(
+        '--batch-size',
+        type=int,
+        help='批处理大小(覆盖配置文件设置)'
+    )
+    
+    parser.add_argument(
+        '--timeout',
+        type=int,
+        help='单个SQL验证超时时间(秒)'
+    )
+    
+    parser.add_argument(
+        '--verbose', '-v',
+        action='store_true',
+        help='启用详细日志输出'
+    )
+    
+    parser.add_argument(
+        '--log-file',
+        help='日志文件路径'
+    )
+    
+    parser.add_argument(
+        '--dry-run',
+        action='store_true',
+        help='仅读取和解析文件,不执行验证'
+    )
+    
+    parser.add_argument(
+        '--save-json',
+        action='store_true',
+        help='同时保存详细的JSON报告'
+    )
+    
+    parser.add_argument(
+        '--enable-llm-repair',
+        action='store_true',
+        help='启用LLM自动修复功能'
+    )
+    
+    parser.add_argument(
+        '--modify-original-file',
+        action='store_true',
+        help='修改原始JSON文件(删除无效SQL,如果启用LLM修复则同时更新修复后的SQL)'
+    )
+    
+    return parser
+
+
+def apply_config_overrides(args):
+    """应用命令行参数覆盖配置"""
+    from schema_tools.config import SCHEMA_TOOLS_CONFIG
+    
+    sql_config = SCHEMA_TOOLS_CONFIG['sql_validation']
+    
+    if args.max_concurrent:
+        sql_config['max_concurrent_validations'] = args.max_concurrent
+        print(f"覆盖并发数配置: {args.max_concurrent}")
+    
+    if args.batch_size:
+        sql_config['batch_size'] = args.batch_size
+        print(f"覆盖批处理大小: {args.batch_size}")
+    
+    if args.timeout:
+        sql_config['validation_timeout'] = args.timeout
+        print(f"覆盖超时配置: {args.timeout}秒")
+    
+    if args.save_json:
+        sql_config['save_detailed_json_report'] = True
+        print(f"启用详细JSON报告保存")
+    
+    if args.enable_llm_repair:
+        sql_config['enable_sql_repair'] = True
+        print(f"启用LLM自动修复功能")
+    else:
+        sql_config['enable_sql_repair'] = False
+        print(f"LLM修复功能已禁用")
+    
+    if args.modify_original_file:
+        sql_config['modify_original_file'] = True
+        print(f"启用原文件修改功能")
+    else:
+        sql_config['modify_original_file'] = False
+        print(f"不修改原文件")
+
+
+async def main():
+    """主入口函数"""
+    parser = setup_argument_parser()
+    args = parser.parse_args()
+    
+    # 设置日志
+    setup_logging(
+        verbose=args.verbose,
+        log_file=args.log_file,
+        log_dir=os.path.join(args.output_dir, 'logs') if args.output_dir else None
+    )
+    
+    # 验证参数
+    if not os.path.exists(args.input_file):
+        print(f"错误: 输入文件不存在: {args.input_file}")
+        sys.exit(1)
+    
+    input_path = Path(args.input_file)
+    if not input_path.suffix.lower() == '.json':
+        print(f"警告: 输入文件可能不是JSON格式: {args.input_file}")
+    
+    # 应用配置覆盖
+    apply_config_overrides(args)
+    
+    try:
+        # 创建SQL验证Agent
+        agent = SQLValidationAgent(
+            db_connection=args.db_connection,
+            input_file=args.input_file,
+            output_dir=args.output_dir
+        )
+        
+        # 显示运行信息
+        print(f"🚀 开始SQL验证...")
+        print(f"📁 输入文件: {args.input_file}")
+        if args.output_dir:
+            print(f"📁 输出目录: {args.output_dir}")
+        print(f"🔗 数据库: {_mask_db_connection(args.db_connection)}")
+        
+        if args.dry_run:
+            print("\n🔍 执行预检查模式...")
+            # 仅读取和验证文件格式
+            questions_sqls = await agent._load_questions_sqls()
+            print(f"✅ 成功读取 {len(questions_sqls)} 个Question-SQL对")
+            print("📊 SQL样例:")
+            for i, qs in enumerate(questions_sqls[:3], 1):
+                print(f"  {i}. {qs['question']}")
+                print(f"     SQL: {qs['sql'][:100]}{'...' if len(qs['sql']) > 100 else ''}")
+                print()
+            sys.exit(0)
+        
+        # 执行验证
+        report = await agent.validate()
+        
+        # 输出结果
+        success_rate = report['summary']['success_rate']
+        
+        if success_rate >= 0.9:  # 90%以上成功率
+            print(f"\n🎉 验证完成,成功率: {success_rate:.1%}")
+            exit_code = 0
+        elif success_rate >= 0.7:  # 70%-90%成功率
+            print(f"\n⚠️  验证完成,成功率较低: {success_rate:.1%}")
+            exit_code = 1
+        else:  # 70%以下成功率
+            print(f"\n❌ 验证完成,成功率过低: {success_rate:.1%}")
+            exit_code = 2
+        
+        print(f"📊 详细结果: {report['summary']['valid_sqls']}/{report['summary']['total_questions']} SQL有效")
+        
+        sys.exit(exit_code)
+        
+    except KeyboardInterrupt:
+        print("\n\n⏹️  用户中断,程序退出")
+        sys.exit(130)
+    except Exception as e:
+        print(f"\n❌ 程序执行失败: {e}")
+        if args.verbose:
+            import traceback
+            traceback.print_exc()
+        sys.exit(1)
+
+
+def _mask_db_connection(conn_str: str) -> str:
+    """隐藏数据库连接字符串中的敏感信息"""
+    import re
+    return re.sub(r'://[^:]+:[^@]+@', '://***:***@', conn_str)
+
+
+if __name__ == "__main__":
+    asyncio.run(main()) 

+ 5 - 1
schema_tools/validators/__init__.py

@@ -3,7 +3,11 @@
 """
 
 from .file_count_validator import FileCountValidator
+from .sql_validator import SQLValidator, SQLValidationResult, ValidationStats
 
 __all__ = [
-    "FileCountValidator"
+    "FileCountValidator",
+    "SQLValidator",
+    "SQLValidationResult", 
+    "ValidationStats"
 ] 

+ 253 - 0
schema_tools/validators/sql_validator.py

@@ -0,0 +1,253 @@
+import asyncio
+import logging
+import time
+from typing import Dict, Any, List, Optional
+from dataclasses import dataclass, field
+
+from schema_tools.config import SCHEMA_TOOLS_CONFIG
+
+
+@dataclass
+class SQLValidationResult:
+    """SQL验证结果"""
+    sql: str
+    valid: bool
+    error_message: str = ""
+    execution_time: float = 0.0
+    retry_count: int = 0
+    
+    # SQL修复相关字段
+    repair_attempted: bool = False
+    repair_successful: bool = False
+    repaired_sql: str = ""
+    repair_error: str = ""
+
+
+@dataclass
+class ValidationStats:
+    """验证统计信息"""
+    total_sqls: int = 0
+    valid_sqls: int = 0
+    invalid_sqls: int = 0
+    total_time: float = 0.0
+    avg_time_per_sql: float = 0.0
+    retry_count: int = 0
+    
+    # SQL修复统计
+    repair_attempted: int = 0
+    repair_successful: int = 0
+    repair_failed: int = 0
+
+
+class SQLValidator:
+    """SQL验证器"""
+    
+    def __init__(self, db_connection: str = None):
+        """
+        初始化SQL验证器
+        
+        Args:
+            db_connection: 数据库连接字符串(可选,用于复用连接池)
+        """
+        self.db_connection = db_connection
+        self.connection_pool = None
+        self.config = SCHEMA_TOOLS_CONFIG['sql_validation']
+        self.logger = logging.getLogger("schema_tools.SQLValidator")
+        
+    async def _get_connection_pool(self):
+        """获取或复用现有连接池"""
+        if not self.connection_pool:
+            if self.config['reuse_connection_pool'] and self.db_connection:
+                # 复用现有的DatabaseInspector连接池
+                from schema_tools.tools.base import ToolRegistry
+                
+                db_tool = ToolRegistry.get_tool("database_inspector", 
+                                               db_connection=self.db_connection)
+                
+                # 如果连接池不存在,则创建
+                if not db_tool.connection_pool:
+                    await db_tool._create_connection_pool()
+                
+                # 复用连接池
+                self.connection_pool = db_tool.connection_pool
+                self.logger.info("复用现有数据库连接池进行SQL验证")
+            else:
+                raise ValueError("需要提供数据库连接字符串或启用连接池复用")
+        
+        return self.connection_pool
+    
+    async def validate_sql(self, sql: str, retry_count: int = 0) -> SQLValidationResult:
+        """
+        验证单个SQL语句
+        
+        Args:
+            sql: 要验证的SQL语句
+            retry_count: 当前重试次数
+            
+        Returns:
+            SQLValidationResult: 验证结果
+        """
+        start_time = time.time()
+        
+        try:
+            pool = await self._get_connection_pool()
+            
+            async with pool.acquire() as conn:
+                # 设置超时
+                timeout = self.config['validation_timeout']
+                
+                # 设置只读模式(安全考虑)
+                if self.config['readonly_mode']:
+                    await asyncio.wait_for(
+                        conn.execute("SET default_transaction_read_only = on"),
+                        timeout=timeout
+                    )
+                
+                # 执行EXPLAIN验证SQL
+                await asyncio.wait_for(
+                    conn.execute(f"EXPLAIN {sql}"),
+                    timeout=timeout
+                )
+                
+                execution_time = time.time() - start_time
+                
+                self.logger.debug(f"SQL验证成功: {sql[:50]}... ({execution_time:.3f}s)")
+                
+                return SQLValidationResult(
+                    sql=sql,
+                    valid=True,
+                    execution_time=execution_time,
+                    retry_count=retry_count
+                )
+                
+        except asyncio.TimeoutError:
+            execution_time = time.time() - start_time
+            error_msg = f"验证超时({self.config['validation_timeout']}秒)"
+            
+            self.logger.warning(f"SQL验证超时: {sql[:50]}...")
+            
+            return SQLValidationResult(
+                sql=sql,
+                valid=False,
+                error_message=error_msg,
+                execution_time=execution_time,
+                retry_count=retry_count
+            )
+            
+        except Exception as e:
+            execution_time = time.time() - start_time
+            error_msg = str(e)
+            
+            # 检查是否需要重试
+            max_retries = self.config['max_retry_count']
+            if retry_count < max_retries and self._should_retry(e):
+                self.logger.debug(f"SQL验证失败,重试 {retry_count + 1}/{max_retries}: {error_msg}")
+                await asyncio.sleep(0.5)  # 短暂等待后重试
+                return await self.validate_sql(sql, retry_count + 1)
+            
+            self.logger.debug(f"SQL验证失败: {sql[:50]}... - {error_msg}")
+            
+            return SQLValidationResult(
+                sql=sql,
+                valid=False,
+                error_message=error_msg,
+                execution_time=execution_time,
+                retry_count=retry_count
+            )
+    
+    async def validate_sqls_batch(self, sqls: List[str]) -> List[SQLValidationResult]:
+        """
+        批量验证SQL语句
+        
+        Args:
+            sqls: SQL语句列表
+            
+        Returns:
+            验证结果列表
+        """
+        if not sqls:
+            return []
+        
+        max_concurrent = self.config['max_concurrent_validations']
+        semaphore = asyncio.Semaphore(max_concurrent)
+        
+        async def validate_with_semaphore(sql):
+            async with semaphore:
+                return await self.validate_sql(sql)
+        
+        self.logger.info(f"开始批量验证 {len(sqls)} 个SQL语句 (并发度: {max_concurrent})")
+        
+        # 并发执行验证
+        tasks = [validate_with_semaphore(sql) for sql in sqls]
+        results = await asyncio.gather(*tasks, return_exceptions=True)
+        
+        # 处理异常结果
+        validated_results = []
+        for i, result in enumerate(results):
+            if isinstance(result, Exception):
+                self.logger.error(f"SQL验证任务异常: {sqls[i][:50]}... - {result}")
+                validated_results.append(SQLValidationResult(
+                    sql=sqls[i],
+                    valid=False,
+                    error_message=f"验证任务异常: {str(result)}"
+                ))
+            else:
+                validated_results.append(result)
+        
+        return validated_results
+    
+    def _should_retry(self, error: Exception) -> bool:
+        """
+        判断是否应该重试
+        
+        Args:
+            error: 异常对象
+            
+        Returns:
+            是否应该重试
+        """
+        # 一般网络或连接相关的错误可以重试
+        retry_indicators = [
+            "connection",
+            "network",
+            "timeout",
+            "server closed",
+            "pool",
+        ]
+        
+        error_str = str(error).lower()
+        return any(indicator in error_str for indicator in retry_indicators)
+    
+    def calculate_stats(self, results: List[SQLValidationResult]) -> ValidationStats:
+        """
+        计算验证统计信息
+        
+        Args:
+            results: 验证结果列表
+            
+        Returns:
+            ValidationStats: 统计信息
+        """
+        total_sqls = len(results)
+        valid_sqls = sum(1 for r in results if r.valid)
+        invalid_sqls = total_sqls - valid_sqls
+        total_time = sum(r.execution_time for r in results)
+        avg_time = total_time / total_sqls if total_sqls > 0 else 0.0
+        total_retries = sum(r.retry_count for r in results)
+        
+        # 计算修复统计
+        repair_attempted = sum(1 for r in results if r.repair_attempted)
+        repair_successful = sum(1 for r in results if r.repair_successful)
+        repair_failed = repair_attempted - repair_successful
+        
+        return ValidationStats(
+            total_sqls=total_sqls,
+            valid_sqls=valid_sqls,
+            invalid_sqls=invalid_sqls,
+            total_time=total_time,
+            avg_time_per_sql=avg_time,
+            retry_count=total_retries,
+            repair_attempted=repair_attempted,
+            repair_successful=repair_successful,
+            repair_failed=repair_failed
+        ) 

+ 10 - 0
test_file_modification.json

@@ -0,0 +1,10 @@
+[
+  {
+    "question": "查询所有服务区名称",
+    "sql": "SELECT service_area_name FROM bss_service_area WHERE delete_ts IS NULL;"
+  },
+  {
+    "question": "测试无效SQL",
+    "sql": "SELECT * FROM non_existent_table WHERE id = 1;"
+  }
+]