瀏覽代碼

修正了data_pipline日志的模块,它现在是独立的,下面准备修改作业执行过程的监控功能。

wangxq 1 周之前
父節點
當前提交
e606828120
共有 42 個文件被更改,包括 1152 次插入83 次删除
  1. 6 1
      .claude/settings.local.json
  2. 2 2
      data_pipeline/analyzers/md_analyzer.py
  3. 2 2
      data_pipeline/analyzers/theme_extractor.py
  4. 4 2
      data_pipeline/api/simple_db_manager.py
  5. 4 2
      data_pipeline/api/simple_file_manager.py
  6. 6 3
      data_pipeline/api/simple_workflow.py
  7. 12 2
      data_pipeline/ddl_generation/training_data_agent.py
  8. 29 0
      data_pipeline/dp_logging/__init__.py
  9. 154 0
      data_pipeline/dp_logging/manager.py
  10. 7 3
      data_pipeline/metadata_only_generator.py
  11. 14 3
      data_pipeline/qa_generation/qs_agent.py
  12. 39 9
      data_pipeline/schema_workflow.py
  13. 5 5
      data_pipeline/tables.txt
  14. 2 3
      data_pipeline/task_executor.py
  15. 5 5
      data_pipeline/tools/base.py
  16. 39 19
      data_pipeline/trainer/run_training.py
  17. 2 2
      data_pipeline/trainer/vanna_trainer.py
  18. 17 0
      data_pipeline/training_data/task_20250701_175640/bss_car_day_count.ddl
  19. 18 0
      data_pipeline/training_data/task_20250701_175640/bss_car_day_count_detail.md
  20. 14 0
      data_pipeline/training_data/task_20250701_175640/task_config.json
  21. 14 0
      data_pipeline/training_data/task_20250701_180014/task_config.json
  22. 31 0
      data_pipeline/training_data/task_20250701_184430/bss_business_day_data.ddl
  23. 32 0
      data_pipeline/training_data/task_20250701_184430/bss_business_day_data_detail.md
  24. 17 0
      data_pipeline/training_data/task_20250701_184430/bss_car_day_count.ddl
  25. 18 0
      data_pipeline/training_data/task_20250701_184430/bss_car_day_count_detail.md
  26. 38 0
      data_pipeline/training_data/task_20250701_184430/db_query_decision_prompt.txt
  27. 5 0
      data_pipeline/training_data/task_20250701_184430/filename_mapping.txt
  28. 62 0
      data_pipeline/training_data/task_20250701_184430/metadata.txt
  29. 20 0
      data_pipeline/training_data/task_20250701_184430/metadata_detail.md
  30. 198 0
      data_pipeline/training_data/task_20250701_184430/qs_highway_db_20250701_185822_pair.json
  31. 202 0
      data_pipeline/training_data/task_20250701_184430/qs_highway_db_20250701_185822_pair.json.backup
  32. 14 0
      data_pipeline/training_data/task_20250701_184430/task_config.json
  33. 88 0
      data_pipeline/training_data/task_20250701_184430/task_result.json
  34. 2 2
      data_pipeline/utils/file_manager.py
  35. 2 2
      data_pipeline/utils/large_table_handler.py
  36. 4 4
      data_pipeline/utils/logger.py
  37. 2 2
      data_pipeline/utils/permission_checker.py
  38. 2 2
      data_pipeline/utils/system_filter.py
  39. 2 2
      data_pipeline/utils/table_parser.py
  40. 2 2
      data_pipeline/validators/file_count_validator.py
  41. 14 2
      data_pipeline/validators/sql_validation_agent.py
  42. 2 2
      data_pipeline/validators/sql_validator.py

+ 6 - 1
.claude/settings.local.json

@@ -18,7 +18,12 @@
       "Bash(rm:*)",
       "Bash(.venv/bin/python:*)",
       "Bash(./.venv/Scripts/python.exe:*)",
-      "Bash(sed:*)"
+      "Bash(sed:*)",
+      "Bash(\".venv/Scripts/python.exe\" -c \"import sys; sys.path.append('.'); from data_pipeline.logging import get_logger; print('独立日志系统导入成功')\")",
+      "Bash(\".venv/Scripts/python.exe\" -c \"import sys; sys.path.append('.'); from data_pipeline.schema_workflow import SchemaWorkflowOrchestrator; print('SchemaWorkflowOrchestrator导入成功')\")",
+      "Bash(\".venv/Scripts/python.exe\" -c \"import sys; sys.path.append('.'); from data_pipeline.api.simple_workflow import SimpleWorkflowExecutor; print('SimpleWorkflowExecutor导入成功')\")",
+      "Bash(\".venv/Scripts/python.exe\":*)",
+      "Bash(curl:*)"
     ],
     "deny": []
   }

+ 2 - 2
data_pipeline/analyzers/md_analyzer.py

@@ -1,6 +1,6 @@
 from pathlib import Path
 from typing import List, Dict, Any
-from core.logging import get_data_pipeline_logger
+import logging
 
 
 class MDFileAnalyzer:
@@ -8,7 +8,7 @@ class MDFileAnalyzer:
     
     def __init__(self, output_dir: str):
         self.output_dir = Path(output_dir)
-        self.logger = get_data_pipeline_logger("MDFileAnalyzer")
+        self.logger = logging.getLogger("MDFileAnalyzer")
         
     async def read_all_md_files(self) -> str:
         """

+ 2 - 2
data_pipeline/analyzers/theme_extractor.py

@@ -3,7 +3,7 @@ import json
 from typing import List, Dict, Any
 
 from data_pipeline.config import SCHEMA_TOOLS_CONFIG
-from core.logging import get_data_pipeline_logger
+import logging
 
 
 class ThemeExtractor:
@@ -19,7 +19,7 @@ class ThemeExtractor:
         """
         self.vn = vn
         self.business_context = business_context
-        self.logger = get_data_pipeline_logger("ThemeExtractor")
+        self.logger = logging.getLogger("ThemeExtractor")
         self.config = SCHEMA_TOOLS_CONFIG
         
     async def extract_themes(self, md_contents: str) -> List[Dict[str, Any]]:

+ 4 - 2
data_pipeline/api/simple_db_manager.py

@@ -12,7 +12,7 @@ import psycopg2
 from psycopg2.extras import RealDictCursor, Json
 
 from app_config import PGVECTOR_CONFIG
-from core.logging import get_data_pipeline_logger
+import logging
 
 
 class SimpleTaskManager:
@@ -20,7 +20,9 @@ class SimpleTaskManager:
     
     def __init__(self):
         """初始化任务管理器"""
-        self.logger = get_data_pipeline_logger("SimpleTaskManager")
+        # 使用简单的控制台日志,不使用文件日志
+        self.logger = logging.getLogger("SimpleTaskManager")
+        self.logger.setLevel(logging.INFO)
         self._connection = None
     
     def _get_connection(self):

+ 4 - 2
data_pipeline/api/simple_file_manager.py

@@ -9,7 +9,7 @@ from pathlib import Path
 from typing import Dict, Any, List
 from datetime import datetime
 
-from core.logging import get_data_pipeline_logger
+import logging
 
 
 class SimpleFileManager:
@@ -23,7 +23,9 @@ class SimpleFileManager:
             base_output_dir: 基础输出目录
         """
         self.base_output_dir = Path(base_output_dir)
-        self.logger = get_data_pipeline_logger("SimpleFileManager")
+        # 使用简单的控制台日志,不使用文件日志
+        self.logger = logging.getLogger("SimpleFileManager")
+        self.logger.setLevel(logging.INFO)
         
         # 确保基础目录存在
         self.base_output_dir.mkdir(parents=True, exist_ok=True)

+ 6 - 3
data_pipeline/api/simple_workflow.py

@@ -16,7 +16,7 @@ from contextlib import contextmanager
 from data_pipeline.schema_workflow import SchemaWorkflowOrchestrator
 from data_pipeline.api.simple_db_manager import SimpleTaskManager
 from data_pipeline.api.simple_file_manager import SimpleFileManager
-from core.logging import get_data_pipeline_logger
+from data_pipeline.dp_logging import get_logger
 
 
 class SimpleWorkflowExecutor:
@@ -30,7 +30,7 @@ class SimpleWorkflowExecutor:
             task_id: 任务ID
         """
         self.task_id = task_id
-        self.logger = get_data_pipeline_logger("SimpleWorkflowExecutor")
+        self.logger = get_logger("SimpleWorkflowExecutor", task_id)
         
         # 初始化管理器
         self.task_manager = SimpleTaskManager()
@@ -144,6 +144,7 @@ class SimpleWorkflowExecutor:
             table_list_file=self.task_params['table_list_file'],
             business_context=self.task_params['business_context'],
             output_dir=str(task_dir),
+            task_id=self.task_id,  # 传递task_id给编排器
             enable_sql_validation=self.task_params.get('enable_sql_validation', True),
             enable_llm_repair=self.task_params.get('enable_llm_repair', True),
             modify_original_file=self.task_params.get('modify_original_file', True),
@@ -444,7 +445,9 @@ class SimpleWorkflowManager:
         """初始化工作流管理器"""
         self.task_manager = SimpleTaskManager()
         self.file_manager = SimpleFileManager()
-        self.logger = get_data_pipeline_logger("SimpleWorkflowManager")
+        # 使用简单的控制台日志,不使用文件日志
+        self.logger = logging.getLogger("SimpleWorkflowManager")
+        self.logger.setLevel(logging.INFO)
     
     def create_task(self, 
                    table_list_file: str,

+ 12 - 2
data_pipeline/ddl_generation/training_data_agent.py

@@ -11,7 +11,7 @@ from data_pipeline.utils.system_filter import SystemTableFilter
 from data_pipeline.utils.permission_checker import DatabasePermissionChecker
 from data_pipeline.utils.table_parser import TableListParser
 from data_pipeline.config import SCHEMA_TOOLS_CONFIG
-from core.logging import get_data_pipeline_logger
+from data_pipeline.dp_logging import get_logger
 
 class SchemaTrainingDataAgent:
     """Schema训练数据生成AI Agent"""
@@ -21,6 +21,7 @@ class SchemaTrainingDataAgent:
                  table_list_file: str,
                  business_context: str = None,
                  output_dir: str = None,
+                 task_id: str = None,
                  pipeline: str = "full"):
         
         self.db_connection = db_connection
@@ -49,7 +50,16 @@ class SchemaTrainingDataAgent:
         }
         
         self.failed_tables = []
-        self.logger = get_data_pipeline_logger("SchemaTrainingDataAgent")
+        self.task_id = task_id
+        
+        # 初始化独立日志系统
+        if task_id:
+            self.logger = get_logger("SchemaTrainingDataAgent", task_id)
+        else:
+            # 脚本模式下,如果没有传递task_id,生成一个
+            from datetime import datetime
+            self.task_id = f"manual_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
+            self.logger = get_logger("SchemaTrainingDataAgent", self.task_id)
     
     async def generate_training_data(self) -> Dict[str, Any]:
         """主入口:生成训练数据"""

+ 29 - 0
data_pipeline/dp_logging/__init__.py

@@ -0,0 +1,29 @@
+"""
+Data Pipeline 独立日志管理系统
+
+完全脱离主项目的日志管理,专门为data_pipeline模块设计
+支持任务级别的日志文件管理,同时支持API调用和脚本调用
+"""
+
+from .manager import DataPipelineLogManager
+
+# 对外接口
+def get_logger(name: str, task_id: str):
+    """
+    获取data_pipeline专用logger
+    
+    Args:
+        name: logger名称 (如: "SchemaWorkflowOrchestrator", "DDLGenerator")
+        task_id: 任务ID,必须提供
+                API模式: task_YYYYMMDD_HHMMSS
+                脚本模式: manual_YYYYMMDD_HHMMSS
+    
+    Returns:
+        配置好的logger,输出到 ./data_pipeline/training_data/{task_id}/data_pipeline.log
+    """
+    return DataPipelineLogManager.get_logger(name, task_id)
+
+# 便捷方法(保持接口一致性)
+def get_data_pipeline_logger(name: str, task_id: str):
+    """便捷方法,与get_logger功能相同"""
+    return get_logger(name, task_id)

+ 154 - 0
data_pipeline/dp_logging/manager.py

@@ -0,0 +1,154 @@
+"""
+Data Pipeline 独立日志管理器
+
+专门为data_pipeline模块设计的日志管理器,完全独立于主项目的日志系统
+"""
+
+import os
+from pathlib import Path
+from typing import Dict
+
+# 明确导入Python内置logging模块
+import logging as std_logging
+
+
+class DataPipelineLogManager:
+    """Data Pipeline 专用日志管理器"""
+    
+    _loggers: Dict[str, std_logging.Logger] = {}
+    _file_handlers: Dict[str, std_logging.FileHandler] = {}
+    
+    @classmethod
+    def get_logger(cls, name: str, task_id: str) -> std_logging.Logger:
+        """
+        获取或创建logger
+        
+        Args:
+            name: logger名称
+            task_id: 任务ID,用于确定日志文件位置
+        
+        Returns:
+            配置好的logger实例
+        """
+        logger_key = f"data_pipeline.{name}.{task_id}"
+        
+        if logger_key not in cls._loggers:
+            logger = cls._create_logger(name, task_id)
+            cls._loggers[logger_key] = logger
+        
+        return cls._loggers[logger_key]
+    
+    @classmethod
+    def _create_logger(cls, name: str, task_id: str) -> std_logging.Logger:
+        """创建新的logger实例"""
+        # 创建logger
+        logger_name = f"data_pipeline.{name}"
+        logger = std_logging.getLogger(logger_name)
+        
+        # 设置日志级别
+        logger.setLevel(std_logging.DEBUG)
+        
+        # 防止日志重复(清除已有处理器)
+        logger.handlers.clear()
+        logger.propagate = False
+        
+        # 添加控制台处理器
+        console_handler = cls._create_console_handler()
+        logger.addHandler(console_handler)
+        
+        # 添加文件处理器
+        file_handler = cls._create_file_handler(task_id)
+        if file_handler:
+            logger.addHandler(file_handler)
+        
+        return logger
+    
+    @classmethod
+    def _create_console_handler(cls) -> std_logging.StreamHandler:
+        """创建控制台处理器"""
+        handler = std_logging.StreamHandler()
+        handler.setLevel(std_logging.INFO)
+        
+        formatter = std_logging.Formatter(
+            '%(asctime)s [%(levelname)s] Pipeline: %(message)s',
+            datefmt='%Y-%m-%d %H:%M:%S'
+        )
+        handler.setFormatter(formatter)
+        
+        return handler
+    
+    @classmethod
+    def _create_file_handler(cls, task_id: str) -> std_logging.FileHandler:
+        """创建文件处理器"""
+        try:
+            # 确定日志文件路径
+            task_dir = Path("data_pipeline/training_data") / task_id
+            task_dir.mkdir(parents=True, exist_ok=True)
+            
+            log_file = task_dir / "data_pipeline.log"
+            
+            # 为每个任务创建独立的文件处理器
+            handler_key = f"file_handler_{task_id}"
+            
+            if handler_key not in cls._file_handlers:
+                handler = std_logging.FileHandler(log_file, encoding='utf-8')
+                handler.setLevel(std_logging.DEBUG)
+                
+                formatter = std_logging.Formatter(
+                    '%(asctime)s [%(levelname)s] [%(name)s] %(filename)s:%(lineno)d - %(message)s',
+                    datefmt='%Y-%m-%d %H:%M:%S'
+                )
+                handler.setFormatter(formatter)
+                
+                cls._file_handlers[handler_key] = handler
+            
+            return cls._file_handlers[handler_key]
+            
+        except Exception as e:
+            # 如果文件处理器创建失败,记录到stderr但不影响程序运行
+            import sys
+            sys.stderr.write(f"[WARNING] 无法创建data_pipeline日志文件处理器: {e}\n")
+            return None
+    
+    @classmethod
+    def cleanup_logger(cls, task_id: str):
+        """清理指定任务的logger和文件处理器"""
+        try:
+            # 关闭文件处理器
+            handler_key = f"file_handler_{task_id}"
+            if handler_key in cls._file_handlers:
+                cls._file_handlers[handler_key].close()
+                del cls._file_handlers[handler_key]
+            
+            # 清理相关的logger
+            keys_to_remove = [key for key in cls._loggers.keys() if task_id in key]
+            for key in keys_to_remove:
+                logger = cls._loggers[key]
+                for handler in logger.handlers:
+                    handler.close()
+                logger.handlers.clear()
+                del cls._loggers[key]
+                
+        except Exception as e:
+            import sys
+            sys.stderr.write(f"[WARNING] 清理data_pipeline日志资源失败: {e}\n")
+    
+    @classmethod
+    def cleanup_all(cls):
+        """清理所有logger和文件处理器"""
+        try:
+            # 关闭所有文件处理器
+            for handler in cls._file_handlers.values():
+                handler.close()
+            cls._file_handlers.clear()
+            
+            # 清理所有logger
+            for logger in cls._loggers.values():
+                for handler in logger.handlers:
+                    handler.close()
+                logger.handlers.clear()
+            cls._loggers.clear()
+            
+        except Exception as e:
+            import sys
+            sys.stderr.write(f"[WARNING] 清理所有data_pipeline日志资源失败: {e}\n")

+ 7 - 3
data_pipeline/metadata_only_generator.py

@@ -15,7 +15,7 @@ from data_pipeline.analyzers import MDFileAnalyzer, ThemeExtractor
 from data_pipeline.validators import FileCountValidator
 from data_pipeline.utils.logger import setup_logging
 from core.vanna_llm_factory import create_vanna_instance
-from core.logging import get_data_pipeline_logger
+import logging
 
 
 class MetadataOnlyGenerator:
@@ -47,7 +47,7 @@ class MetadataOnlyGenerator:
         self.theme_extractor = None
         
         # 初始化logger
-        self.logger = get_data_pipeline_logger("MetadataOnlyGenerator")
+        self.logger = logging.getLogger("MetadataOnlyGenerator")
         
         self.logger.info(f"🎯 元数据生成器初始化完成")
         self.logger.info(f"📁 输出目录: {output_dir}")
@@ -492,8 +492,12 @@ async def main():
     
     # 验证参数
     output_path = Path(args.output_dir)
+    # 为脚本模式生成task_id
+    from datetime import datetime
+    script_task_id = f"manual_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
     # 初始化logger用于参数验证
-    logger = get_data_pipeline_logger("MetadataGeneratorMain")
+    from data_pipeline.dp_logging import get_logger
+    logger = get_logger("MetadataGeneratorMain", script_task_id)
     
     if not output_path.exists():
         logger.error(f"错误: 输出目录不存在: {args.output_dir}")

+ 14 - 3
data_pipeline/qa_generation/qs_agent.py

@@ -9,7 +9,7 @@ from typing import List, Dict, Any, Optional
 from data_pipeline.config import SCHEMA_TOOLS_CONFIG
 from data_pipeline.validators import FileCountValidator
 from data_pipeline.analyzers import MDFileAnalyzer, ThemeExtractor
-from core.logging import get_data_pipeline_logger
+from data_pipeline.dp_logging import get_logger
 from core.vanna_llm_factory import create_vanna_instance
 
 
@@ -20,7 +20,8 @@ class QuestionSQLGenerationAgent:
                  output_dir: str,
                  table_list_file: str,
                  business_context: str,
-                 db_name: str = None):
+                 db_name: str = None,
+                 task_id: str = None):
         """
         初始化Agent
         
@@ -29,6 +30,7 @@ class QuestionSQLGenerationAgent:
             table_list_file: 表清单文件路径
             business_context: 业务上下文
             db_name: 数据库名称(用于输出文件命名)
+            task_id: 任务ID
         """
         self.output_dir = Path(output_dir)
         self.table_list_file = table_list_file
@@ -36,7 +38,16 @@ class QuestionSQLGenerationAgent:
         self.db_name = db_name or "db"
         
         self.config = SCHEMA_TOOLS_CONFIG
-        self.logger = get_data_pipeline_logger("QSAgent")
+        self.task_id = task_id
+        
+        # 初始化独立日志系统
+        if task_id:
+            self.logger = get_logger("QuestionSQLGenerationAgent", task_id)
+        else:
+            # 脚本模式下,如果没有传递task_id,生成一个
+            from datetime import datetime
+            self.task_id = f"manual_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
+            self.logger = get_logger("QuestionSQLGenerationAgent", self.task_id)
         
         # 初始化组件
         self.validator = FileCountValidator()

+ 39 - 9
data_pipeline/schema_workflow.py

@@ -14,7 +14,7 @@ from data_pipeline.ddl_generation.training_data_agent import SchemaTrainingDataA
 from data_pipeline.qa_generation.qs_agent import QuestionSQLGenerationAgent
 from data_pipeline.validators.sql_validation_agent import SQLValidationAgent
 from data_pipeline.config import SCHEMA_TOOLS_CONFIG
-from core.logging import get_data_pipeline_logger
+from data_pipeline.dp_logging import get_logger
 
 
 class SchemaWorkflowOrchestrator:
@@ -25,6 +25,7 @@ class SchemaWorkflowOrchestrator:
                  table_list_file: str,
                  business_context: str,
                  output_dir: str = None,
+                 task_id: str = None,
                  enable_sql_validation: bool = True,
                  enable_llm_repair: bool = True,
                  modify_original_file: bool = True,
@@ -37,6 +38,7 @@ class SchemaWorkflowOrchestrator:
             table_list_file: 表清单文件路径
             business_context: 业务上下文描述
             output_dir: 输出目录
+            task_id: 任务ID (API模式传递,脚本模式自动生成)
             enable_sql_validation: 是否启用SQL验证
             enable_llm_repair: 是否启用LLM修复功能
             modify_original_file: 是否修改原始JSON文件
@@ -46,17 +48,32 @@ class SchemaWorkflowOrchestrator:
         self.table_list_file = table_list_file
         self.business_context = business_context
         self.db_name = self._extract_db_name_from_connection(db_connection)
-        self.output_dir = Path(output_dir) if output_dir else Path("./output")
         self.enable_sql_validation = enable_sql_validation
         self.enable_llm_repair = enable_llm_repair
         self.modify_original_file = modify_original_file
         self.enable_training_data_load = enable_training_data_load
         
+        # 处理task_id
+        if task_id is None:
+            # 脚本模式:自动生成manual开头的task_id
+            self.task_id = f"manual_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
+        else:
+            # API模式:使用传递的task_id
+            self.task_id = task_id
+        
+        # 设置输出目录
+        if output_dir is None:
+            # 脚本模式或未指定输出目录时,使用任务目录
+            self.output_dir = Path("data_pipeline/training_data") / self.task_id
+        else:
+            # API模式或明确指定输出目录时,使用指定的目录
+            self.output_dir = Path(output_dir)
+            
         # 确保输出目录存在
         self.output_dir.mkdir(parents=True, exist_ok=True)
-        
-        # 初始化日志
-        self.logger = get_data_pipeline_logger("SchemaWorkflow")
+            
+        # 初始化独立日志系统
+        self.logger = get_logger("SchemaWorkflowOrchestrator", self.task_id)
         
         # 工作流程状态
         self.workflow_state = {
@@ -158,6 +175,7 @@ class SchemaWorkflowOrchestrator:
                 table_list_file=self.table_list_file,
                 business_context=self.business_context,
                 output_dir=str(self.output_dir),
+                task_id=self.task_id,  # 传递task_id
                 pipeline="full"
             )
             
@@ -200,7 +218,8 @@ class SchemaWorkflowOrchestrator:
                 output_dir=str(self.output_dir),
                 table_list_file=self.table_list_file,
                 business_context=self.business_context,
-                db_name=self.db_name
+                db_name=self.db_name,
+                task_id=self.task_id  # 传递task_id
             )
             
             # 执行Question-SQL生成
@@ -252,6 +271,7 @@ class SchemaWorkflowOrchestrator:
                 db_connection=self.db_connection,
                 input_file=str(qs_file),
                 output_dir=str(self.output_dir),
+                task_id=self.task_id,  # 传递task_id
                 enable_sql_repair=self.enable_llm_repair,
                 modify_original_file=self.modify_original_file
             )
@@ -322,7 +342,7 @@ class SchemaWorkflowOrchestrator:
             
             # 执行训练数据加载
             self.logger.info("🔄 开始处理训练文件...")
-            load_successful = process_training_files(training_data_dir)
+            load_successful = process_training_files(training_data_dir, self.task_id)
             
             step_duration = time.time() - step_start_time
             
@@ -645,7 +665,12 @@ async def main():
     
     # 验证输入文件
     if not os.path.exists(args.table_list):
-        logger = get_data_pipeline_logger("SchemaWorkflow")
+        # 为脚本模式生成task_id
+        from datetime import datetime
+        script_task_id = f"manual_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
+        # 使用独立日志系统
+        from data_pipeline.dp_logging import get_logger
+        logger = get_logger("SchemaWorkflow", script_task_id)
         logger.error(f"错误: 表清单文件不存在: {args.table_list}")
         sys.exit(1)
     
@@ -663,7 +688,12 @@ async def main():
         )
         
         # 获取logger用于启动信息
-        logger = get_data_pipeline_logger("SchemaWorkflow")
+        # 为脚本模式生成task_id
+        from datetime import datetime
+        script_task_id = f"manual_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
+        # 使用独立日志系统
+        from data_pipeline.dp_logging import get_logger
+        logger = get_logger("SchemaWorkflow", script_task_id)
         logger.info(f"🚀 开始执行Schema工作流编排...")
         logger.info(f"📁 输出目录: {args.output_dir}")
         logger.info(f"📋 表清单: {args.table_list}")

+ 5 - 5
data_pipeline/tables.txt

@@ -5,9 +5,9 @@
 # 服务区相关表
 bss_car_day_count
 bss_business_day_data
-bss_company
-bss_section_route
-bss_section_route_area_link
-bss_service_area
-bss_service_area_mapper
+#bss_company
+#bss_section_route
+#bss_section_route_area_link
+#bss_service_area
+#bss_service_area_mapper
 

+ 2 - 3
data_pipeline/task_executor.py

@@ -15,7 +15,6 @@ from pathlib import Path
 sys.path.insert(0, str(Path(__file__).parent.parent))
 
 from data_pipeline.api.simple_workflow import SimpleWorkflowExecutor
-from core.logging import initialize_logging
 
 
 def main():
@@ -27,8 +26,8 @@ def main():
     
     args = parser.parse_args()
     
-    # 初始化日志系统
-    initialize_logging()
+    # 初始化日志系统(不需要,使用独立的日志系统)
+    pass
     
     # 验证参数
     if args.execution_mode == 'step' and not args.step_name:

+ 5 - 5
data_pipeline/tools/base.py

@@ -1,7 +1,7 @@
 import asyncio
 import time
 from abc import ABC, abstractmethod
-from core.logging import get_data_pipeline_logger
+import logging
 from typing import Dict, Any, Optional, Type, List
 from data_pipeline.utils.data_structures import ProcessingResult, TableProcessingContext
 
@@ -15,7 +15,7 @@ class ToolRegistry:
         """装饰器:注册工具"""
         def decorator(tool_class: Type['BaseTool']):
             cls._tools[name] = tool_class
-            logger = get_data_pipeline_logger("ToolRegistry")
+            logger = logging.getLogger("ToolRegistry")
             logger.debug(f"注册工具: {name} -> {tool_class.__name__}")
             return tool_class
         return decorator
@@ -33,7 +33,7 @@ class ToolRegistry:
             if hasattr(tool_class, 'needs_llm') and tool_class.needs_llm:
                 from core.vanna_llm_factory import create_vanna_instance
                 kwargs['vn'] = create_vanna_instance()
-                logger = get_data_pipeline_logger("ToolRegistry")
+                logger = logging.getLogger("ToolRegistry")
                 logger.debug(f"为工具 {name} 注入LLM实例")
             
             cls._instances[name] = tool_class(**kwargs)
@@ -57,7 +57,7 @@ class BaseTool(ABC):
     tool_name: str = ""      # 工具名称
     
     def __init__(self, **kwargs):
-        self.logger = get_data_pipeline_logger(f"tools.{self.__class__.__name__}")
+        self.logger = logging.getLogger(f"tools.{self.__class__.__name__}")
         
         # 如果工具需要LLM,检查是否已注入
         if self.needs_llm and 'vn' not in kwargs:
@@ -115,7 +115,7 @@ class PipelineExecutor:
     
     def __init__(self, pipeline_config: Dict[str, List[str]]):
         self.pipeline_config = pipeline_config
-        self.logger = get_data_pipeline_logger("tools.PipelineExecutor")
+        self.logger = logging.getLogger("tools.PipelineExecutor")
     
     async def execute_pipeline(self, pipeline_name: str, context: TableProcessingContext) -> Dict[str, ProcessingResult]:
         """执行指定的处理链"""

+ 39 - 19
data_pipeline/trainer/run_training.py

@@ -308,19 +308,39 @@ def train_json_question_sql_pairs(json_file):
     except Exception as e:
         print(f" 错误:处理JSON问答训练 - {e}")
 
-def process_training_files(data_path):
+def process_training_files(data_path, task_id=None):
     """处理指定路径下的所有训练文件
     
     Args:
         data_path (str): 训练数据目录路径
+        task_id (str): 任务ID,用于日志记录
     """
-    print(f"\n===== 扫描训练数据目录: {os.path.abspath(data_path)} =====")
+    # 初始化日志
+    if task_id:
+        from data_pipeline.dp_logging import get_logger
+        logger = get_logger("TrainingDataLoader", task_id)
+        logger.info(f"扫描训练数据目录: {os.path.abspath(data_path)}")
+    else:
+        # 兼容原有调用方式
+        print(f"\n===== 扫描训练数据目录: {os.path.abspath(data_path)} =====")
+        logger = None
     
     # 检查目录是否存在
     if not os.path.exists(data_path):
-        print(f"错误: 训练数据目录不存在: {data_path}")
+        error_msg = f"错误: 训练数据目录不存在: {data_path}"
+        if logger:
+            logger.error(error_msg)
+        else:
+            print(error_msg)
         return False
     
+    # 日志输出辅助函数
+    def log_message(message, level="info"):
+        if logger:
+            getattr(logger, level)(message)
+        else:
+            print(message)
+    
     # 初始化统计计数器
     stats = {
         "ddl": 0,
@@ -338,7 +358,7 @@ def process_training_files(data_path):
             
             # 只处理文件,跳过目录
             if not os.path.isfile(item_path):
-                print(f"跳过子目录: {item}")
+                log_message(f"跳过子目录: {item}")
                 continue
                 
             file_lower = item.lower()
@@ -346,49 +366,49 @@ def process_training_files(data_path):
             # 根据文件类型调用相应的处理函数
             try:
                 if file_lower.endswith(".ddl"):
-                    print(f"\n处理DDL文件: {item_path}")
+                    log_message(f"处理DDL文件: {item_path}")
                     train_ddl_statements(item_path)
                     stats["ddl"] += 1
                     
                 elif file_lower.endswith(".md") or file_lower.endswith(".markdown"):
-                    print(f"\n处理文档文件: {item_path}")
+                    log_message(f"处理文档文件: {item_path}")
                     train_documentation_blocks(item_path)
                     stats["documentation"] += 1
                     
                 elif file_lower.endswith("_pair.json") or file_lower.endswith("_pairs.json"):
-                    print(f"\n处理JSON问答对文件: {item_path}")
+                    log_message(f"处理JSON问答对文件: {item_path}")
                     train_json_question_sql_pairs(item_path)
                     stats["question_sql_json"] += 1
                     
                 elif file_lower.endswith("_pair.sql") or file_lower.endswith("_pairs.sql"):
-                    print(f"\n处理格式化问答对文件: {item_path}")
+                    log_message(f"处理格式化问答对文件: {item_path}")
                     train_formatted_question_sql_pairs(item_path)
                     stats["question_sql_formatted"] += 1
                     
                 elif file_lower.endswith(".sql") and not (file_lower.endswith("_pair.sql") or file_lower.endswith("_pairs.sql")):
-                    print(f"\n处理SQL示例文件: {item_path}")
+                    log_message(f"处理SQL示例文件: {item_path}")
                     train_sql_examples(item_path)
                     stats["sql_example"] += 1
                 else:
-                    print(f"跳过不支持的文件类型: {item}")
+                    log_message(f"跳过不支持的文件类型: {item}")
             except Exception as e:
-                print(f"处理文件 {item_path} 时出错: {e}")
+                log_message(f"处理文件 {item_path} 时出错: {e}", "error")
                 
     except OSError as e:
-        print(f"读取目录失败: {e}")
+        log_message(f"读取目录失败: {e}", "error")
         return False
     
     # 打印处理统计
-    print("\n===== 训练文件处理统计 =====")
-    print(f"DDL文件: {stats['ddl']}个")
-    print(f"文档文件: {stats['documentation']}个")
-    print(f"SQL示例文件: {stats['sql_example']}个")
-    print(f"格式化问答对文件: {stats['question_sql_formatted']}个")
-    print(f"JSON问答对文件: {stats['question_sql_json']}个")
+    log_message("训练文件处理统计:")
+    log_message(f"DDL文件: {stats['ddl']}个")
+    log_message(f"文档文件: {stats['documentation']}个")
+    log_message(f"SQL示例文件: {stats['sql_example']}个")
+    log_message(f"格式化问答对文件: {stats['question_sql_formatted']}个")
+    log_message(f"JSON问答对文件: {stats['question_sql_json']}个")
     
     total_files = sum(stats.values())
     if total_files == 0:
-        print(f"警告: 在目录 {data_path} 中未找到任何可训练的文件")
+        log_message(f"警告: 在目录 {data_path} 中未找到任何可训练的文件", "warning")
         return False
         
     return True

+ 2 - 2
data_pipeline/trainer/vanna_trainer.py

@@ -11,10 +11,10 @@ import sys
 import os
 sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
 import app_config
-from core.logging import get_data_pipeline_logger
+import logging
 
 # 初始化日志
-logger = get_data_pipeline_logger("VannaTrainer")
+logger = logging.getLogger("VannaTrainer")
 
 # 设置正确的项目根目录路径
 project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

+ 17 - 0
data_pipeline/training_data/task_20250701_175640/bss_car_day_count.ddl

@@ -0,0 +1,17 @@
+-- 中文名: 服务区车辆日统计表
+-- 描述: 服务区车辆日统计表,按车型统计每日车辆数量及类型,用于交通流量分析与资源调度。
+create table public.bss_car_day_count (
+  id varchar(32) not null     -- 主键ID,主键,
+  version integer not null    -- 版本号,
+  create_ts timestamp         -- 创建时间,
+  created_by varchar(50)      -- 创建人ID,
+  update_ts timestamp         -- 更新时间,
+  updated_by varchar(50)      -- 更新人ID,
+  delete_ts timestamp         -- 删除时间,
+  deleted_by varchar(50)      -- 删除人ID,
+  customer_count bigint       -- 车辆数量,
+  car_type varchar(100)       -- 车辆类别,
+  count_date date             -- 统计日期,
+  service_area_id varchar(32) -- 服务区ID,
+  primary key (id)
+);

+ 18 - 0
data_pipeline/training_data/task_20250701_175640/bss_car_day_count_detail.md

@@ -0,0 +1,18 @@
+## bss_car_day_count(服务区车辆日统计表)
+bss_car_day_count 表服务区车辆日统计表,按车型统计每日车辆数量及类型,用于交通流量分析与资源调度。
+字段列表:
+- id (varchar(32)) - 主键ID [主键, 非空] [示例: 00022c1c99ff11ec86d4fa163ec0f8fc, 00022caa99ff11ec86d4fa163ec0f8fc]
+- version (integer) - 版本号 [非空] [示例: 1]
+- create_ts (timestamp) - 创建时间 [示例: 2022-03-02 16:01:43, 2022-02-02 14:18:55]
+- created_by (varchar(50)) - 创建人ID
+- update_ts (timestamp) - 更新时间 [示例: 2022-03-02 16:01:43, 2022-02-02 14:18:55]
+- updated_by (varchar(50)) - 更新人ID
+- delete_ts (timestamp) - 删除时间
+- deleted_by (varchar(50)) - 删除人ID
+- customer_count (bigint) - 车辆数量 [示例: 1114, 295]
+- car_type (varchar(100)) - 车辆类别 [示例: 其他]
+- count_date (date) - 统计日期 [示例: 2022-03-02, 2022-02-02]
+- service_area_id (varchar(32)) - 服务区ID [示例: 17461166e7fa3ecda03534a5795ce985, 81f4eb731fb0728aef17ae61f1f1daef]
+字段补充说明:
+- id 为主键
+- car_type 为枚举字段,包含取值:其他、危化品、城际、过境

+ 14 - 0
data_pipeline/training_data/task_20250701_175640/task_config.json

@@ -0,0 +1,14 @@
+{
+  "task_id": "task_20250701_175640",
+  "created_at": "2025-07-01T09:56:40.836065",
+  "parameters": {
+    "db_connection": "postgresql://postgres:postgres@192.168.67.1:6432/highway_db",
+    "table_list_file": "./data_pipeline/tables.txt",
+    "business_context": "高速公路服务区管理系统测试",
+    "enable_llm_repair": true,
+    "modify_original_file": true,
+    "enable_sql_validation": true,
+    "enable_training_data_load": true
+  },
+  "output_directory": "data_pipeline\\training_data\\task_20250701_175640"
+}

+ 14 - 0
data_pipeline/training_data/task_20250701_180014/task_config.json

@@ -0,0 +1,14 @@
+{
+  "task_id": "task_20250701_180014",
+  "created_at": "2025-07-01T10:00:14.816750",
+  "parameters": {
+    "db_connection": "postgresql://postgres:postgres@192.168.67.1:6432/highway_db",
+    "table_list_file": "data_pipeline/tables.txt",
+    "business_context": "高速公路服务区管理系统",
+    "enable_llm_repair": true,
+    "modify_original_file": true,
+    "enable_sql_validation": true,
+    "enable_training_data_load": true
+  },
+  "output_directory": "data_pipeline\\training_data\\task_20250701_180014"
+}

+ 31 - 0
data_pipeline/training_data/task_20250701_184430/bss_business_day_data.ddl

@@ -0,0 +1,31 @@
+-- 中文名: 服务区每日业务统计表(记录各SA运营数据)
+-- 描述: 服务区每日业务统计表(记录各SA运营数据)
+create table public.bss_business_day_data (
+  id varchar(32) not null     -- 主键ID,主键,
+  version integer not null    -- 版本号,
+  create_ts timestamp         -- 创建时间,
+  created_by varchar(50)      -- 创建人,
+  update_ts timestamp         -- 更新时间,
+  updated_by varchar(50)      -- 更新人,
+  delete_ts timestamp         -- 删除时间,
+  deleted_by varchar(50)      -- 删除人,
+  oper_date date              -- 统计日期,
+  service_no varchar(255)     -- 服务区编码,
+  service_name varchar(255)   -- 服务区名称,
+  branch_no varchar(255)      -- 档口编码,
+  branch_name varchar(255)    -- 档口名称,
+  wx numeric(19,4)            -- 微信支付金额,
+  wx_order integer            -- 微信订单数量,
+  zfb numeric(19,4)           -- 支付宝支付金额,
+  zf_order integer            -- 支付宝订单数量,
+  rmb numeric(19,4)           -- 现金支付金额,
+  rmb_order integer           -- 现金订单数量,
+  xs numeric(19,4)            -- 行吧支付金额,
+  xs_order integer            -- 行吧订单数量,
+  jd numeric(19,4)            -- 金豆支付金额,
+  jd_order integer            -- 金豆订单数量,
+  order_sum integer           -- 订单总数,
+  pay_sum numeric(19,4)       -- 总支付金额,
+  source_type integer         -- 数据来源类别,
+  primary key (id)
+);

+ 32 - 0
data_pipeline/training_data/task_20250701_184430/bss_business_day_data_detail.md

@@ -0,0 +1,32 @@
+## bss_business_day_data(服务区每日业务统计表(记录各SA运营数据))
+bss_business_day_data 表服务区每日业务统计表(记录各SA运营数据)
+字段列表:
+- id (varchar(32)) - 主键ID [主键, 非空] [示例: 00827DFF993D415488EA1F07CAE6C440, 00e799048b8cbb8ee758eac9c8b4b820]
+- version (integer) - 版本号 [非空] [示例: 1]
+- create_ts (timestamp) - 创建时间 [示例: 2023-04-02 08:31:51, 2023-04-02 02:30:08]
+- created_by (varchar(50)) - 创建人 [示例: xingba]
+- update_ts (timestamp) - 更新时间 [示例: 2023-04-02 08:31:51, 2023-04-02 02:30:08]
+- updated_by (varchar(50)) - 更新人
+- delete_ts (timestamp) - 删除时间
+- deleted_by (varchar(50)) - 删除人
+- oper_date (date) - 统计日期 [示例: 2023-04-01]
+- service_no (varchar(255)) - 服务区编码 [示例: 1028, H0501]
+- service_name (varchar(255)) - 服务区名称 [示例: 宜春服务区, 庐山服务区]
+- branch_no (varchar(255)) - 档口编码 [示例: 1, H05016]
+- branch_name (varchar(255)) - 档口名称 [示例: 宜春南区, 庐山鲜徕客东区]
+- wx (numeric(19,4)) - 微信支付金额 [示例: 4790.0000, 2523.0000]
+- wx_order (integer) - 微信订单数量 [示例: 253, 133]
+- zfb (numeric(19,4)) - 支付宝支付金额 [示例: 229.0000, 0.0000]
+- zf_order (integer) - 支付宝订单数量 [示例: 15, 0]
+- rmb (numeric(19,4)) - 现金支付金额 [示例: 1058.5000, 124.0000]
+- rmb_order (integer) - 现金订单数量 [示例: 56, 12]
+- xs (numeric(19,4)) - 行吧支付金额 [示例: 0.0000, 40.0000]
+- xs_order (integer) - 行吧订单数量 [示例: 0, 1]
+- jd (numeric(19,4)) - 金豆支付金额 [示例: 0.0000]
+- jd_order (integer) - 金豆订单数量 [示例: 0]
+- order_sum (integer) - 订单总数 [示例: 324, 146]
+- pay_sum (numeric(19,4)) - 总支付金额 [示例: 6077.5000, 2687.0000]
+- source_type (integer) - 数据来源类别 [示例: 1, 0, 4]
+字段补充说明:
+- id 为主键
+- source_type 为枚举字段,包含取值:0、4、1、2、3

+ 17 - 0
data_pipeline/training_data/task_20250701_184430/bss_car_day_count.ddl

@@ -0,0 +1,17 @@
+-- 中文名: 高速公路服务区每日车辆统计表
+-- 描述: 高速公路服务区每日车辆统计表,记录各类型车辆流量数据,支撑交通管理与资源调度分析。
+create table public.bss_car_day_count (
+  id varchar(32) not null     -- 主键ID,主键,
+  version integer not null    -- 版本号,
+  create_ts timestamp         -- 创建时间,
+  created_by varchar(50)      -- 创建人,
+  update_ts timestamp         -- 更新时间,
+  updated_by varchar(50)      -- 更新人,
+  delete_ts timestamp         -- 删除时间,
+  deleted_by varchar(50)      -- 删除人,
+  customer_count bigint       -- 车辆数量,
+  car_type varchar(100)       -- 车辆类别,
+  count_date date             -- 统计日期,
+  service_area_id varchar(32) -- 服务区ID,
+  primary key (id)
+);

+ 18 - 0
data_pipeline/training_data/task_20250701_184430/bss_car_day_count_detail.md

@@ -0,0 +1,18 @@
+## bss_car_day_count(高速公路服务区每日车辆统计表)
+bss_car_day_count 表高速公路服务区每日车辆统计表,记录各类型车辆流量数据,支撑交通管理与资源调度分析。
+字段列表:
+- id (varchar(32)) - 主键ID [主键, 非空] [示例: 00022c1c99ff11ec86d4fa163ec0f8fc, 00022caa99ff11ec86d4fa163ec0f8fc]
+- version (integer) - 版本号 [非空] [示例: 1]
+- create_ts (timestamp) - 创建时间 [示例: 2022-03-02 16:01:43, 2022-02-02 14:18:55]
+- created_by (varchar(50)) - 创建人
+- update_ts (timestamp) - 更新时间 [示例: 2022-03-02 16:01:43, 2022-02-02 14:18:55]
+- updated_by (varchar(50)) - 更新人
+- delete_ts (timestamp) - 删除时间
+- deleted_by (varchar(50)) - 删除人
+- customer_count (bigint) - 车辆数量 [示例: 1114, 295]
+- car_type (varchar(100)) - 车辆类别 [示例: 其他]
+- count_date (date) - 统计日期 [示例: 2022-03-02, 2022-02-02]
+- service_area_id (varchar(32)) - 服务区ID [示例: 17461166e7fa3ecda03534a5795ce985, 81f4eb731fb0728aef17ae61f1f1daef]
+字段补充说明:
+- id 为主键
+- car_type 为枚举字段,包含取值:其他、危化品、城际、过境

+ 38 - 0
data_pipeline/training_data/task_20250701_184430/db_query_decision_prompt.txt

@@ -0,0 +1,38 @@
+{
+  "数据库业务范围": "当前数据库存储的是高速公路服务区运营管理与车辆流量分析的相关数据,主要涉及运营交易数据与车辆通行数据,包含以下业务数据:",
+  "核心业务实体": [
+    {
+      "实体类型": "服务区",
+      "详细描述": "高速公路沿线提供停车休憩的场所,记录其每日运营数据与车辆流量统计",
+      "主要字段": "oper_date, service_no, service_name, service_area_id"
+    },
+    {
+      "实体类型": "档口",
+      "详细描述": "服务区内的商业经营单元,记录其每日交易明细",
+      "主要字段": "branch_no, branch_name"
+    },
+    {
+      "实体类型": "车辆类型",
+      "详细描述": "按车辆属性分类的通行记录,用于分析交通流量结构",
+      "主要字段": "car_type"
+    }
+  ],
+  "关键业务指标": [
+    {
+      "指标类型": "支付金额与订单数量",
+      "详细描述": "按支付渠道(微信/支付宝/现金/行吧/金豆)统计的交易金额与订单数,反映消费行为分布"
+    },
+    {
+      "指标类型": "车辆流量统计",
+      "详细描述": "按车辆类型分类的通行量统计,用于分析交通流量结构与高峰时段特征"
+    },
+    {
+      "指标类型": "运营总指标",
+      "详细描述": "订单总数与支付总额的时序变化,反映服务区整体运营态势"
+    },
+    {
+      "指标类型": "数据来源分布",
+      "详细描述": "通过source_type字段分析数据采集渠道的覆盖情况与可靠性"
+    }
+  ]
+}

+ 5 - 0
data_pipeline/training_data/task_20250701_184430/filename_mapping.txt

@@ -0,0 +1,5 @@
+# 文件名映射报告
+# 格式: 原始表名 -> 实际文件名
+
+public.bss_business_day_data -> bss_business_day_data_detail.md
+public.bss_car_day_count -> bss_car_day_count_detail.md

+ 62 - 0
data_pipeline/training_data/task_20250701_184430/metadata.txt

@@ -0,0 +1,62 @@
+-- Schema Tools生成的主题元数据
+-- 业务背景: 高速公路服务区管理系统
+-- 生成时间: 2025-07-01 18:58:22
+-- 数据库: highway_db
+
+-- 创建表(如果不存在)
+CREATE TABLE IF NOT EXISTS metadata (
+    id SERIAL PRIMARY KEY,    -- 主键
+    topic_name VARCHAR(100) NOT NULL,  -- 业务主题名称
+    description TEXT,                  -- 业务主体说明
+    related_tables TEXT[],			  -- 相关表名
+    biz_entities TEXT[],               -- 主要业务实体名称
+    biz_metrics TEXT[],                -- 主要业务指标名称
+    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP    -- 插入时间
+);
+
+-- 插入主题数据
+INSERT INTO metadata(topic_name, description, related_tables, biz_entities, biz_metrics) VALUES
+(
+  '日营收分析',
+  '基于bss_business_day_data表分析各服务区每日营收结构及支付方式占比',
+  'bss_business_day_data',
+  '服务区,档口,支付方式,统计日期',
+  '收入分布,订单构成,支付方式渗透率'
+);
+
+INSERT INTO metadata(topic_name, description, related_tables, biz_entities, biz_metrics) VALUES
+(
+  '车流统计分析',
+  '通过bss_car_day_count表分析不同车辆类型在各服务区的流量分布特征',
+  'bss_car_day_count',
+  '服务区,车辆类型,统计日期',
+  '车流趋势,车型占比,高峰时段流量'
+);
+
+INSERT INTO metadata(topic_name, description, related_tables, biz_entities, biz_metrics) VALUES
+(
+  '档口效能评估',
+  '结合两个表数据评估不同档口的单位车流营收产出及运营效率差异',
+  'bss_business_day_data,bss_car_day_count',
+  '档口,服务区,运营日期',
+  '坪效分析,客单价对比,时段效率曲线'
+);
+
+INSERT INTO metadata(topic_name, description, related_tables, biz_entities, biz_metrics) VALUES
+(
+  '节假日效应分析',
+  '对比法定节假日与平日的车流变化及消费行为差异,支撑资源调度决策',
+  'bss_business_day_data,bss_car_day_count',
+  '服务区,节假日类型,支付方式',
+  '节前节后对比,消费金额波动,车流峰值分析'
+);
+
+INSERT INTO metadata(topic_name, description, related_tables, biz_entities, biz_metrics) VALUES
+(
+  '区域对标分析',
+  '按地理区域划分统计各服务区营收能力和车流规模的topN排名对比',
+  'bss_business_day_data,bss_car_day_count',
+  '区域,服务区等级,运营指标',
+  '营收排名,车流密度,运营健康度评分'
+);
+

+ 20 - 0
data_pipeline/training_data/task_20250701_184430/metadata_detail.md

@@ -0,0 +1,20 @@
+## metadata(存储分析主题元数据)
+
+`metadata` 主要描述了当前数据库包含了哪些数据内容,哪些分析主题,哪些指标等等。
+
+字段列表:
+
+- `id` (serial) - 主键ID [主键, 非空]
+- `topic_name` (varchar(100)) - 业务主题名称 [非空]
+- `description` (text) - 业务主题说明
+- `related_tables` (text[]) - 涉及的数据表 [示例: bss_business_day_data, bss_car_day_count]
+- `biz_entities` (text[]) - 主要业务实体名称 [示例: 统计日期, 车辆类型, 服务区]
+- `biz_metrics` (text[]) - 主要业务指标名称 [示例: 车型占比, 节前节后对比, 车流密度]
+- `created_at` (timestamp) - 插入时间 [默认值: `CURRENT_TIMESTAMP`]
+
+字段补充说明:
+
+- `id` 为主键,自增;
+- `related_tables` 用于建立主题与具体明细表的依赖关系;
+- `biz_entities` 表示主题关注的核心对象,例如服务区、车辆、公司;
+- `biz_metrics` 表示该主题关注的业务分析指标,例如营收对比、趋势变化、占比结构等。

+ 198 - 0
data_pipeline/training_data/task_20250701_184430/qs_highway_db_20250701_185822_pair.json

@@ -0,0 +1,198 @@
+[
+  {
+    "question": "统计最近7天各服务区的总收入和总订单数,并按收入从高到低排序",
+    "sql": "SELECT service_name AS 服务区名称, SUM(pay_sum) AS 总收入, SUM(order_sum) AS 总订单数 FROM bss_business_day_data WHERE delete_ts IS NULL AND oper_date >= CURRENT_DATE - 7 GROUP BY service_name ORDER BY 总收入 DESC;"
+  },
+  {
+    "question": "计算各服务区不同支付方式的订单占比(微信/支付宝/现金),展示前五名",
+    "sql": "SELECT service_name AS 服务区名称, ROUND(SUM(wx_order)*100.0/SUM(order_sum),2) AS 微信占比, ROUND(SUM(zf_order)*100.0/SUM(order_sum),2) AS 支付宝占比, ROUND(SUM(rmb_order)*100.0/SUM(order_sum),2) AS 现金占比 FROM bss_business_day_data WHERE delete_ts IS NULL GROUP BY service_name ORDER BY SUM(order_sum) DESC LIMIT 5;"
+  },
+  {
+    "question": "分析2023年4月每日总营收变化趋势",
+    "sql": "SELECT oper_date AS 统计日期, SUM(pay_sum) AS 日营收总额 FROM bss_business_day_data WHERE delete_ts IS NULL AND EXTRACT(YEAR FROM oper_date) = 2023 AND EXTRACT(MONTH FROM oper_date) = 4 GROUP BY oper_date ORDER BY oper_date;"
+  },
+  {
+    "question": "查询最近一天营收超过5万元的服务区及对应支付方式渗透率",
+    "sql": "SELECT service_name AS 服务区名称, wx_order AS 微信订单数, zf_order AS 支付宝订单数, rmb_order AS 现金订单数, pay_sum AS 日营收 FROM bss_business_day_data WHERE delete_ts IS NULL AND oper_date = (SELECT MAX(oper_date) FROM bss_business_day_data) AND pay_sum > 50000 ORDER BY pay_sum DESC;"
+  },
+  {
+    "question": "统计各档口平均客单价(日均)并排名",
+    "sql": "SELECT branch_name AS 档口名称, ROUND(AVG(pay_sum/order_sum),2) AS 平均客单价 FROM bss_business_day_data WHERE delete_ts IS NULL AND order_sum > 0 GROUP BY branch_name ORDER BY 平均客单价 DESC;"
+  },
+  {
+    "question": "对比不同服务区现金支付占比的分布情况",
+    "sql": "SELECT service_name AS 服务区名称, ROUND(SUM(rmb) * 100.0 / SUM(pay_sum), 2) AS 现金占比 FROM bss_business_day_data WHERE delete_ts IS NULL AND pay_sum > 0 GROUP BY service_name ORDER BY 现金占比 DESC;"
+  },
+  {
+    "question": "查询指定日期(2023-04-01)微信支付金额TOP5的服务区明细",
+    "sql": "SELECT service_name AS 服务区名称, wx AS 微信支付金额, wx_order AS 微信订单数, pay_sum AS 总营收 FROM bss_business_day_data WHERE delete_ts IS NULL AND oper_date = '2023-04-01' ORDER BY wx DESC LIMIT 5;"
+  },
+  {
+    "question": "分析各服务区支付宝订单占比与总营收的关系",
+    "sql": "SELECT service_name AS 服务区名称, ROUND(SUM(zf_order)*100.0/SUM(order_sum),2) AS 支付宝订单占比, SUM(pay_sum) AS 总营收 FROM bss_business_day_data WHERE delete_ts IS NULL GROUP BY service_name ORDER BY 支付宝订单占比 DESC;"
+  },
+  {
+    "question": "统计各服务区不同支付方式的订单数量分布",
+    "sql": "SELECT service_name AS 服务区名称, SUM(wx_order) AS 微信订单数, SUM(zf_order) AS 支付宝订单数, SUM(rmb_order) AS 现金订单数 FROM bss_business_day_data WHERE delete_ts IS NULL GROUP BY service_name ORDER BY SUM(wx_order + zf_order + rmb_order) DESC;"
+  },
+  {
+    "question": "查询最近3天庐山服务区每日营收及支付方式构成",
+    "sql": "SELECT oper_date AS 统计日期, wx AS 微信金额, zfb AS 支付宝金额, rmb AS 现金金额, pay_sum AS 总营收 FROM bss_business_day_data WHERE delete_ts IS NULL AND service_name = '庐山服务区' AND oper_date >= CURRENT_DATE - 3 ORDER BY oper_date DESC;"
+  },
+  {
+    "question": "不同车辆类型的总车流量统计情况如何?",
+    "sql": "SELECT car_type AS 车辆类型, SUM(customer_count) AS 总车流量 FROM bss_car_day_count WHERE delete_ts IS NULL GROUP BY car_type;"
+  },
+  {
+    "question": "哪些服务区的累计车流量位列前十?",
+    "sql": "SELECT service_area_id AS 服务区ID, SUM(customer_count) AS 总车流量 FROM bss_car_day_count WHERE delete_ts IS NULL GROUP BY service_area_id ORDER BY 总车流量 DESC LIMIT 10;"
+  },
+  {
+    "question": "2022年3月2日各车型在服务区的流量分布是怎样的?",
+    "sql": "SELECT car_type AS 车辆类型, SUM(customer_count) AS 当日车流量 FROM bss_car_day_count WHERE delete_ts IS NULL AND count_date = '2022-03-02' GROUP BY car_type;"
+  },
+  {
+    "question": "每个服务区每月平均车流量是多少?",
+    "sql": "SELECT service_area_id AS 服务区ID, DATE_TRUNC('month', count_date) AS 月份, AVG(daily_total) AS 月均车流量 FROM (SELECT service_area_id, count_date, SUM(customer_count) AS daily_total FROM bss_car_day_count WHERE delete_ts IS NULL GROUP BY service_area_id, count_date) AS daily_counts GROUP BY service_area_id, 月份;"
+  },
+  {
+    "question": "最近一个月内,各服务区的日均车流量对比如何?",
+    "sql": "SELECT service_area_id AS 服务区ID, AVG(customer_count) AS 日均车流量 FROM bss_car_day_count WHERE delete_ts IS NULL AND count_date >= CURRENT_DATE - INTERVAL '1 month' GROUP BY service_area_id ORDER BY 日均车流量 DESC;"
+  },
+  {
+    "question": "车流量最高的五个服务区是哪些?",
+    "sql": "SELECT service_area_id AS 服务区ID, SUM(customer_count) AS 总车流量 FROM bss_car_day_count WHERE delete_ts IS NULL GROUP BY service_area_id ORDER BY 总车流量 DESC LIMIT 5;"
+  },
+  {
+    "question": "各车型在不同服务区的车流量分布情况如何?",
+    "sql": "SELECT car_type AS 车辆类型, service_area_id AS 服务区ID, SUM(customer_count) AS 总车流量 FROM bss_car_day_count WHERE delete_ts IS NULL GROUP BY car_type, service_area_id;"
+  },
+  {
+    "question": "某服务区(如service_area_id='17461166e7fa3ecda03534a5795ce985')各车型的日均车流量是多少?",
+    "sql": "SELECT car_type AS 车辆类型, AVG(customer_count) AS 日均车流量 FROM bss_car_day_count WHERE delete_ts IS NULL AND service_area_id = '17461166e7fa3ecda03534a5795ce985' GROUP BY car_type;"
+  },
+  {
+    "question": "2022年1月至3月期间,总车流量的月度变化趋势是怎样的?",
+    "sql": "SELECT DATE_TRUNC('month', count_date) AS 月份, SUM(customer_count) AS 总车流量 FROM bss_car_day_count WHERE delete_ts IS NULL AND count_date BETWEEN '2022-01-01' AND '2022-03-31' GROUP BY 月份 ORDER BY 月份;"
+  },
+  {
+    "question": "某服务区(如ID为'81f4eb731fb0728aef17ae61f1f1daef')中,哪种车型的累计车流量最多?",
+    "sql": "SELECT car_type AS 车辆类型, SUM(customer_count) AS 总车流量 FROM bss_car_day_count WHERE delete_ts IS NULL AND service_area_id = '81f4eb731fb0728aef17ae61f1f1daef' GROUP BY car_type ORDER BY 总车流量 DESC LIMIT 1;"
+  },
+  {
+    "question": "统计各档口单位车流营收产出(坪效)并按从高到低排序",
+    "sql": "SELECT b.branch_name AS 档口名称, SUM(b.pay_sum) / SUM(c.customer_count) AS 单位车流营收 FROM bss_business_day_data b JOIN bss_car_day_count c ON b.service_no = c.service_area_id AND b.oper_date = c.count_date WHERE b.delete_ts IS NULL AND c.delete_ts IS NULL GROUP BY b.branch_name ORDER BY 单位车流营收 DESC;"
+  },
+  {
+    "question": "对比不同服务区客单价(支付金额/订单数)排名",
+    "sql": "SELECT service_name AS 服务区名称, SUM(pay_sum) / SUM(order_sum) AS 客单价 FROM bss_business_day_data WHERE delete_ts IS NULL GROUP BY service_name ORDER BY 客单价 DESC LIMIT 10;"
+  },
+  {
+    "question": "查询最近7天车流最高的服务区对应坪效TOP5",
+    "sql": "SELECT s.service_name, SUM(s.pay_sum) / MAX(c.customer_count) AS 坪效 FROM (SELECT service_name, service_no, SUM(pay_sum) AS pay_sum FROM bss_business_day_data WHERE oper_date >= CURRENT_DATE - 7 AND delete_ts IS NULL GROUP BY service_name, service_no) s JOIN (SELECT service_area_id, SUM(customer_count) AS customer_count FROM bss_car_day_count WHERE count_date >= CURRENT_DATE - 7 AND delete_ts IS NULL GROUP BY service_area_id) c ON s.service_no = c.service_area_id GROUP BY s.service_name ORDER BY 坪效 DESC LIMIT 5;"
+  },
+  {
+    "question": "分析各档口月度坪效趋势(2023年4月数据)",
+    "sql": "SELECT TO_CHAR(b.oper_date, 'YYYY-MM') AS 月份, b.branch_name, SUM(b.pay_sum) / SUM(c.customer_count) AS 坪效 FROM bss_business_day_data b JOIN bss_car_day_count c ON b.service_no = c.service_area_id WHERE b.oper_date BETWEEN '2023-04-01' AND '2023-04-30' AND b.delete_ts IS NULL AND c.delete_ts IS NULL GROUP BY 月份, b.branch_name ORDER BY 月份, 坪效 DESC;"
+  },
+  {
+    "question": "查询城际车辆占比超50%的服务区坪效对比",
+    "sql": "WITH car_ratio AS (SELECT service_area_id, SUM(CASE WHEN car_type = '城际' THEN customer_count ELSE 0 END) * 1.0 / SUM(customer_count) AS 城际占比 FROM bss_car_day_count GROUP BY service_area_id) SELECT b.service_name, SUM(b.pay_sum) / SUM(c.customer_count) AS 坪效 FROM bss_business_day_data b JOIN car_ratio r ON b.service_no = r.service_area_id JOIN bss_car_day_count c ON b.service_no = c.service_area_id WHERE r.城际占比 > 0.5 AND b.delete_ts IS NULL AND c.delete_ts IS NULL GROUP BY b.service_name ORDER BY 坪效 DESC;"
+  },
+  {
+    "question": "找出客单价最低的五个档口(客单价=金额/订单数)",
+    "sql": "SELECT branch_name, pay_sum / order_sum AS 客单价 FROM (SELECT branch_name, SUM(pay_sum) AS pay_sum, SUM(order_sum) AS order_sum FROM bss_business_day_data WHERE delete_ts IS NULL GROUP BY branch_name) t WHERE order_sum > 0 ORDER BY 客单价 ASC LIMIT 5;"
+  },
+  {
+    "question": "分析2023年Q2季度各服务区日均车流与营收关系",
+    "sql": "SELECT b.service_name, AVG(c.customer_count) AS 日均车流, AVG(b.pay_sum) AS 日均营收 FROM bss_business_day_data b JOIN bss_car_day_count c ON b.service_no = c.service_area_id WHERE b.oper_date BETWEEN '2023-04-01' AND '2023-06-30' GROUP BY b.service_name ORDER BY 日均车流 DESC;"
+  },
+  {
+    "question": "查询宜春服务区各档口微信支付占比TOP3",
+    "sql": "SELECT branch_name, SUM(wx) * 100.0 / SUM(pay_sum) AS 微信支付占比 FROM bss_business_day_data WHERE service_name = '宜春服务区' AND delete_ts IS NULL GROUP BY branch_name ORDER BY 微信支付占比 DESC LIMIT 3;"
+  },
+  {
+    "question": "统计各服务区坪效及车流排名差异(坪效排名与车流排名差值)",
+    "sql": "WITH rank_data AS (SELECT service_name, RANK() OVER (ORDER BY SUM(pay_sum)/SUM(customer_count) DESC) AS \"坪效排名\", RANK() OVER (ORDER BY SUM(customer_count) DESC) AS \"车流排名\" FROM bss_business_day_data b JOIN bss_car_day_count c ON b.service_no = c.service_area_id WHERE b.delete_ts IS NULL AND c.delete_ts IS NULL GROUP BY service_name) SELECT service_name, \"坪效排名\", \"车流排名\", ABS(\"坪效排名\" -\"车流排名\") AS \"排名差异\" FROM rank_data ORDER BY \"排名差异\" DESC;"
+  },
+  {
+    "question": "分析周末与工作日营收差异(以2023-04为例)",
+    "sql": "SELECT CASE WHEN EXTRACT(ISODOW FROM oper_date) IN (6,7) THEN '周末' ELSE '工作日' END AS 日期类型, AVG(pay_sum) AS 平均营收, AVG(customer_count) AS 平均车流 FROM bss_business_day_data b JOIN bss_car_day_count c ON b.service_no = c.service_area_id WHERE oper_date BETWEEN '2023-04-01' AND '2023-04-30' GROUP BY 日期类型;"
+  },
+  {
+    "question": "节假日与平日平均消费金额对比分析",
+    "sql": "SELECT '节假日' AS \"分析类型\", AVG(pay_sum) AS \"平均消费金额\" FROM bss_business_day_data WHERE oper_date BETWEEN '2023-10-01' AND '2023-10-07' AND delete_ts IS NULL UNION ALL SELECT '平日', AVG(pay_sum) FROM bss_business_day_data WHERE oper_date NOT BETWEEN '2023-10-01' AND '2023-10-07' AND delete_ts IS NULL;"
+  },
+  {
+    "question": "节假日与平日各类型车辆平均流量对比分析",
+    "sql": "SELECT car_type AS \"车辆类型\", AVG(CASE WHEN count_date BETWEEN '2023-10-01' AND '2023-10-07' THEN customer_count END) AS \"节假日均值\", AVG(CASE WHEN count_date NOT BETWEEN '2023-10-01' AND '2023-10-07' THEN customer_count END) AS \"平日均值\" FROM bss_car_day_count WHERE delete_ts IS NULL GROUP BY car_type;"
+  },
+  {
+    "question": "节假日与平日不同支付方式金额占比对比",
+    "sql": "SELECT '节假日' AS \"类型\", SUM(wx)/SUM(pay_sum) AS \"微信占比\", SUM(zfb)/SUM(pay_sum) AS \"支付宝占比\", SUM(rmb)/SUM(pay_sum) AS \"现金占比\" FROM bss_business_day_data WHERE oper_date BETWEEN '2023-10-01' AND '2023-10-07' AND delete_ts IS NULL UNION ALL SELECT '平日', SUM(wx)/SUM(pay_sum), SUM(zfb)/SUM(pay_sum), SUM(rmb)/SUM(pay_sum) FROM bss_business_day_data WHERE oper_date NOT BETWEEN '2023-10-01' AND '2023-10-07' AND delete_ts IS NULL;"
+  },
+  {
+    "question": "节假日总订单量Top10服务区",
+    "sql": "SELECT service_name AS \"服务区名称\", SUM(order_sum) AS \"总订单量\" FROM bss_business_day_data WHERE oper_date BETWEEN '2023-10-01' AND '2023-10-07' AND delete_ts IS NULL GROUP BY service_name ORDER BY \"总订单量\" DESC LIMIT 10;"
+  },
+  {
+    "question": "节假日车流峰值日期识别",
+    "sql": "SELECT count_date AS \"日期\", SUM(customer_count) AS \"总车流量\" FROM bss_car_day_count WHERE count_date BETWEEN '2023-10-01' AND '2023-10-07' AND delete_ts IS NULL GROUP BY count_date ORDER BY \"总车流量\" DESC LIMIT 1;"
+  },
+  {
+    "question": "平日周消费金额波动趋势分析",
+    "sql": "SELECT EXTRACT(DOW FROM oper_date) AS \"星期\", AVG(pay_sum) AS \"平均消费\" FROM bss_business_day_data WHERE oper_date NOT BETWEEN '2023-10-01' AND '2023-10-07' AND delete_ts IS NULL GROUP BY EXTRACT(DOW FROM oper_date) ORDER BY \"星期\";"
+  },
+  {
+    "question": "节假日与非节假日现金支付占比差异",
+    "sql": "SELECT '节假日' AS \"类型\", SUM(rmb)/SUM(pay_sum) AS \"现金占比\" FROM bss_business_day_data WHERE oper_date BETWEEN '2023-10-01' AND '2023-10-07' AND delete_ts IS NULL UNION ALL SELECT '平日', SUM(rmb)/SUM(pay_sum) FROM bss_business_day_data WHERE oper_date NOT BETWEEN '2023-10-01' AND '2023-10-07' AND delete_ts IS NULL;"
+  },
+  {
+    "question": "节前节后3日车流环比增长率计算",
+    "sql": "SELECT (AVG(CASE WHEN count_date BETWEEN '2023-10-08' AND '2023-10-10' THEN customer_count END) - AVG(CASE WHEN count_date BETWEEN '2023-09-28' AND '2023-09-30' THEN customer_count END))/AVG(CASE WHEN count_date BETWEEN '2023-09-28' AND '2023-09-30' THEN customer_count END) AS \"增长率\" FROM bss_car_day_count WHERE count_date BETWEEN '2023-09-28' AND '2023-10-10' AND delete_ts IS NULL;"
+  },
+  {
+    "question": "节假日各档口消费总额Top10排名",
+    "sql": "SELECT branch_name AS \"档口名称\", SUM(pay_sum) AS \"总消费额\" FROM bss_business_day_data WHERE oper_date BETWEEN '2023-10-01' AND '2023-10-07' AND delete_ts IS NULL GROUP BY branch_name ORDER BY \"总消费额\" DESC LIMIT 10;"
+  },
+  {
+    "question": "节假日车辆类型占比分布统计",
+    "sql": "SELECT car_type AS \"车辆类型\", SUM(customer_count) AS \"总量\", ROUND(100*SUM(customer_count)/(SELECT SUM(customer_count) FROM bss_car_day_count WHERE count_date BETWEEN '2023-10-01' AND '2023-10-07' AND delete_ts IS NULL),2) AS \"占比百分比\" FROM bss_car_day_count WHERE count_date BETWEEN '2023-10-01' AND '2023-10-07' AND delete_ts IS NULL GROUP BY car_type ORDER BY \"总量\" DESC;"
+  },
+  {
+    "question": "统计最近一个月各服务区总营收排名(按支付金额降序)Top10",
+    "sql": "SELECT service_name AS 服务区, SUM(pay_sum) AS 总营收 FROM bss_business_day_data WHERE delete_ts IS NULL AND oper_date >= CURRENT_DATE - INTERVAL '1 month' GROUP BY service_name ORDER BY 总营收 DESC LIMIT 10;"
+  },
+  {
+    "question": "分析最近7天各区域(按服务区划分)日均车流密度Top5",
+    "sql": "SELECT service_area_id AS 服务区ID, AVG(customer_count) AS 日均车流量 FROM bss_car_day_count WHERE delete_ts IS NULL AND count_date >= CURRENT_DATE - INTERVAL '7 days' GROUP BY service_area_id ORDER BY 日均车流量 DESC LIMIT 5;"
+  },
+  {
+    "question": "对比营收Top10服务区与车流Top10服务区的重合率",
+    "sql": "WITH 营收排名 AS (SELECT service_name, SUM(pay_sum) AS 金额 FROM bss_business_day_data WHERE delete_ts IS NULL AND oper_date >= CURRENT_DATE - INTERVAL '1 month' GROUP BY service_name ORDER BY 金额 DESC LIMIT 10), 车流排名 AS (SELECT service_area_id, SUM(customer_count) AS 车流 FROM bss_car_day_count WHERE delete_ts IS NULL AND count_date >= CURRENT_DATE - INTERVAL '1 month' GROUP BY service_area_id ORDER BY 车流 DESC LIMIT 10) SELECT COUNT(*) FILTER (WHERE r.service_name = c.service_area_id) * 100.0 / 10 AS 重合率 FROM 营收排名 r, 车流排名 c;"
+  },
+  {
+    "question": "计算各区域(按branch_name首字分组)客单价(支付金额/订单数)Top3",
+    "sql": "SELECT SUBSTRING(branch_name FROM 1 FOR 1) AS 区域, service_name AS 服务区, AVG(pay_sum / order_sum) AS 客单价 FROM bss_business_day_data WHERE delete_ts IS NULL AND order_sum > 0 AND oper_date >= CURRENT_DATE - INTERVAL '1 month' GROUP BY SUBSTRING(branch_name FROM 1 FOR 1), service_name ORDER BY 区域, 客单价 DESC LIMIT 3;"
+  },
+  {
+    "question": "查询2023年Q2季度各服务区运营健康度评分(支付金额环比增长率)",
+    "sql": "SELECT service_name AS 服务区, (SUM(CASE WHEN EXTRACT(QUARTER FROM oper_date)=2 THEN pay_sum ELSE 0 END) - SUM(CASE WHEN EXTRACT(QUARTER FROM oper_date)=1 THEN pay_sum ELSE 0 END)) / NULLIF(SUM(CASE WHEN EXTRACT(QUARTER FROM oper_date)=1 THEN pay_sum ELSE 0 END), 0) AS 增长率 FROM bss_business_day_data WHERE delete_ts IS NULL AND EXTRACT(YEAR FROM oper_date)=2023 GROUP BY service_name ORDER BY 增长率 DESC;"
+  },
+  {
+    "question": "统计周末与工作日车流量差异最大的Top5服务区",
+    "sql": "SELECT service_area_id AS 服务区ID, AVG(CASE WHEN EXTRACT(ISODOW FROM count_date) IN (6,7) THEN customer_count ELSE 0 END) - AVG(CASE WHEN EXTRACT(ISODOW FROM count_date) NOT IN (6,7) THEN customer_count ELSE 0 END) AS 差异值 FROM bss_car_day_count WHERE delete_ts IS NULL GROUP BY service_area_id ORDER BY 差异值 DESC LIMIT 5;"
+  },
+  {
+    "question": "查询2023年节假日(五一假期)期间营收异常波动(超3倍均值)的服务区",
+    "sql": "WITH 日均基准 AS (SELECT service_name, AVG(pay_sum) AS 基准值 FROM bss_business_day_data WHERE delete_ts IS NULL AND oper_date NOT BETWEEN '2023-04-29' AND '2023-05-03' GROUP BY service_name) SELECT b.service_name AS 服务区, b.pay_sum AS 节假日营收, d.基准值 FROM bss_business_day_data b JOIN 日均基准 d ON b.service_name = d.service_name WHERE b.delete_ts IS NULL AND b.oper_date BETWEEN '2023-04-29' AND '2023-05-03' AND b.pay_sum > d.基准值 * 3;"
+  },
+  {
+    "question": "分析不同车辆类型(过境/城际)对应服务区营收相关性",
+    "sql": "SELECT '过境车流' AS 类型, AVG(pay_sum) AS 平均营收 FROM bss_business_day_data b JOIN bss_car_day_count c ON b.service_name = c.service_area_id WHERE c.car_type = '过境' AND b.delete_ts IS NULL AND c.delete_ts IS NULL UNION ALL SELECT '城际车流', AVG(pay_sum) FROM bss_business_day_data b JOIN bss_car_day_count c ON b.service_name = c.service_area_id WHERE c.car_type = '城际' AND b.delete_ts IS NULL AND c.delete_ts IS NULL;"
+  },
+  {
+    "question": "统计最近30天支付方式偏好(各服务区微信/支付宝占比分布)",
+    "sql": "SELECT service_name AS 服务区, SUM(wx) / SUM(pay_sum) * 100 AS 微信占比, SUM(zfb) / SUM(pay_sum) * 100 AS 支付宝占比 FROM bss_business_day_data WHERE delete_ts IS NULL AND oper_date >= CURRENT_DATE - 30 GROUP BY service_name ORDER BY 微信占比 DESC LIMIT 10;"
+  }
+]

+ 202 - 0
data_pipeline/training_data/task_20250701_184430/qs_highway_db_20250701_185822_pair.json.backup

@@ -0,0 +1,202 @@
+[
+  {
+    "question": "统计最近7天各服务区的总收入和总订单数,并按收入从高到低排序",
+    "sql": "SELECT service_name AS 服务区名称, SUM(pay_sum) AS 总收入, SUM(order_sum) AS 总订单数 FROM bss_business_day_data WHERE delete_ts IS NULL AND oper_date >= CURRENT_DATE - 7 GROUP BY service_name ORDER BY 总收入 DESC;"
+  },
+  {
+    "question": "计算各服务区不同支付方式的订单占比(微信/支付宝/现金),展示前五名",
+    "sql": "SELECT service_name AS 服务区名称, ROUND(SUM(wx_order)*100.0/SUM(order_sum),2) AS 微信占比, ROUND(SUM(zf_order)*100.0/SUM(order_sum),2) AS 支付宝占比, ROUND(SUM(rmb_order)*100.0/SUM(order_sum),2) AS 现金占比 FROM bss_business_day_data WHERE delete_ts IS NULL GROUP BY service_name ORDER BY 总收入 DESC LIMIT 5;"
+  },
+  {
+    "question": "分析2023年4月每日总营收变化趋势",
+    "sql": "SELECT oper_date AS 统计日期, SUM(pay_sum) AS 日营收总额 FROM bss_business_day_data WHERE delete_ts IS NULL AND EXTRACT(YEAR FROM oper_date) = 2023 AND EXTRACT(MONTH FROM oper_date) = 4 GROUP BY oper_date ORDER BY oper_date;"
+  },
+  {
+    "question": "查询最近一天营收超过5万元的服务区及对应支付方式渗透率",
+    "sql": "SELECT service_name AS 服务区名称, wx_order AS 微信订单数, zf_order AS 支付宝订单数, rmb_order AS 现金订单数, pay_sum AS 日营收 FROM bss_business_day_data WHERE delete_ts IS NULL AND oper_date = (SELECT MAX(oper_date) FROM bss_business_day_data) AND pay_sum > 50000 ORDER BY pay_sum DESC;"
+  },
+  {
+    "question": "统计各档口平均客单价(日均)并排名",
+    "sql": "SELECT branch_name AS 档口名称, ROUND(AVG(pay_sum/order_sum),2) AS 平均客单价 FROM bss_business_day_data WHERE delete_ts IS NULL AND order_sum > 0 GROUP BY branch_name ORDER BY 平均客单价 DESC;"
+  },
+  {
+    "question": "对比不同服务区现金支付占比的分布情况",
+    "sql": "SELECT service_name AS 服务区名称, ROUND(SUM(rmb) * 100.0 / SUM(pay_sum), 2) AS 现金占比 FROM bss_business_day_data WHERE delete_ts IS NULL AND pay_sum > 0 GROUP BY service_name ORDER BY 现金占比 DESC;"
+  },
+  {
+    "question": "查询指定日期(2023-04-01)微信支付金额TOP5的服务区明细",
+    "sql": "SELECT service_name AS 服务区名称, wx AS 微信支付金额, wx_order AS 微信订单数, pay_sum AS 总营收 FROM bss_business_day_data WHERE delete_ts IS NULL AND oper_date = '2023-04-01' ORDER BY wx DESC LIMIT 5;"
+  },
+  {
+    "question": "分析各服务区支付宝订单占比与总营收的关系",
+    "sql": "SELECT service_name AS 服务区名称, ROUND(SUM(zf_order)*100.0/SUM(order_sum),2) AS 支付宝订单占比, SUM(pay_sum) AS 总营收 FROM bss_business_day_data WHERE delete_ts IS NULL GROUP BY service_name ORDER BY 支付宝订单占比 DESC;"
+  },
+  {
+    "question": "统计各服务区不同支付方式的订单数量分布",
+    "sql": "SELECT service_name AS 服务区名称, SUM(wx_order) AS 微信订单数, SUM(zf_order) AS 支付宝订单数, SUM(rmb_order) AS 现金订单数 FROM bss_business_day_data WHERE delete_ts IS NULL GROUP BY service_name ORDER BY 总营收 DESC;"
+  },
+  {
+    "question": "查询最近3天庐山服务区每日营收及支付方式构成",
+    "sql": "SELECT oper_date AS 统计日期, wx AS 微信金额, zfb AS 支付宝金额, rmb AS 现金金额, pay_sum AS 总营收 FROM bss_business_day_data WHERE delete_ts IS NULL AND service_name = '庐山服务区' AND oper_date >= CURRENT_DATE - 3 ORDER BY oper_date DESC;"
+  },
+  {
+    "question": "不同车辆类型的总车流量统计情况如何?",
+    "sql": "SELECT car_type AS 车辆类型, SUM(customer_count) AS 总车流量 FROM bss_car_day_count WHERE delete_ts IS NULL GROUP BY car_type;"
+  },
+  {
+    "question": "哪些服务区的累计车流量位列前十?",
+    "sql": "SELECT service_area_id AS 服务区ID, SUM(customer_count) AS 总车流量 FROM bss_car_day_count WHERE delete_ts IS NULL GROUP BY service_area_id ORDER BY 总车流量 DESC LIMIT 10;"
+  },
+  {
+    "question": "2022年3月2日各车型在服务区的流量分布是怎样的?",
+    "sql": "SELECT car_type AS 车辆类型, SUM(customer_count) AS 当日车流量 FROM bss_car_day_count WHERE delete_ts IS NULL AND count_date = '2022-03-02' GROUP BY car_type;"
+  },
+  {
+    "question": "每个服务区每月平均车流量是多少?",
+    "sql": "SELECT service_area_id AS 服务区ID, DATE_TRUNC('month', count_date) AS 月份, AVG(daily_total) AS 月均车流量 FROM (SELECT service_area_id, count_date, SUM(customer_count) AS daily_total FROM bss_car_day_count WHERE delete_ts IS NULL GROUP BY service_area_id, count_date) AS daily_counts GROUP BY service_area_id, 月份;"
+  },
+  {
+    "question": "最近一个月内,各服务区的日均车流量对比如何?",
+    "sql": "SELECT service_area_id AS 服务区ID, AVG(customer_count) AS 日均车流量 FROM bss_car_day_count WHERE delete_ts IS NULL AND count_date >= CURRENT_DATE - INTERVAL '1 month' GROUP BY service_area_id ORDER BY 日均车流量 DESC;"
+  },
+  {
+    "question": "车流量最高的五个服务区是哪些?",
+    "sql": "SELECT service_area_id AS 服务区ID, SUM(customer_count) AS 总车流量 FROM bss_car_day_count WHERE delete_ts IS NULL GROUP BY service_area_id ORDER BY 总车流量 DESC LIMIT 5;"
+  },
+  {
+    "question": "各车型在不同服务区的车流量分布情况如何?",
+    "sql": "SELECT car_type AS 车辆类型, service_area_id AS 服务区ID, SUM(customer_count) AS 总车流量 FROM bss_car_day_count WHERE delete_ts IS NULL GROUP BY car_type, service_area_id;"
+  },
+  {
+    "question": "某服务区(如service_area_id='17461166e7fa3ecda03534a5795ce985')各车型的日均车流量是多少?",
+    "sql": "SELECT car_type AS 车辆类型, AVG(customer_count) AS 日均车流量 FROM bss_car_day_count WHERE delete_ts IS NULL AND service_area_id = '17461166e7fa3ecda03534a5795ce985' GROUP BY car_type;"
+  },
+  {
+    "question": "2022年1月至3月期间,总车流量的月度变化趋势是怎样的?",
+    "sql": "SELECT DATE_TRUNC('month', count_date) AS 月份, SUM(customer_count) AS 总车流量 FROM bss_car_day_count WHERE delete_ts IS NULL AND count_date BETWEEN '2022-01-01' AND '2022-03-31' GROUP BY 月份 ORDER BY 月份;"
+  },
+  {
+    "question": "某服务区(如ID为'81f4eb731fb0728aef17ae61f1f1daef')中,哪种车型的累计车流量最多?",
+    "sql": "SELECT car_type AS 车辆类型, SUM(customer_count) AS 总车流量 FROM bss_car_day_count WHERE delete_ts IS NULL AND service_area_id = '81f4eb731fb0728aef17ae61f1f1daef' GROUP BY car_type ORDER BY 总车流量 DESC LIMIT 1;"
+  },
+  {
+    "question": "统计各档口单位车流营收产出(坪效)并按从高到低排序",
+    "sql": "SELECT b.branch_name AS 档口名称, SUM(b.pay_sum) / SUM(c.customer_count) AS 单位车流营收 FROM bss_business_day_data b JOIN bss_car_day_count c ON b.service_no = c.service_area_id AND b.oper_date = c.count_date WHERE b.delete_ts IS NULL AND c.delete_ts IS NULL GROUP BY b.branch_name ORDER BY 单位车流营收 DESC;"
+  },
+  {
+    "question": "对比不同服务区客单价(支付金额/订单数)排名",
+    "sql": "SELECT service_name AS 服务区名称, SUM(pay_sum) / SUM(order_sum) AS 客单价 FROM bss_business_day_data WHERE delete_ts IS NULL GROUP BY service_name ORDER BY 客单价 DESC LIMIT 10;"
+  },
+  {
+    "question": "查询最近7天车流最高的服务区对应坪效TOP5",
+    "sql": "SELECT s.service_name, SUM(s.pay_sum) / MAX(c.customer_count) AS 坪效 FROM (SELECT service_name, service_no, SUM(pay_sum) AS pay_sum FROM bss_business_day_data WHERE oper_date >= CURRENT_DATE - 7 AND delete_ts IS NULL GROUP BY service_name, service_no) s JOIN (SELECT service_area_id, SUM(customer_count) AS customer_count FROM bss_car_day_count WHERE count_date >= CURRENT_DATE - 7 AND delete_ts IS NULL GROUP BY service_area_id) c ON s.service_no = c.service_area_id ORDER BY 坪效 DESC LIMIT 5;"
+  },
+  {
+    "question": "分析各档口月度坪效趋势(2023年4月数据)",
+    "sql": "SELECT TO_CHAR(b.oper_date, 'YYYY-MM') AS 月份, b.branch_name, SUM(b.pay_sum) / SUM(c.customer_count) AS 坪效 FROM bss_business_day_data b JOIN bss_car_day_count c ON b.service_no = c.service_area_id WHERE b.oper_date BETWEEN '2023-04-01' AND '2023-04-30' AND b.delete_ts IS NULL AND c.delete_ts IS NULL GROUP BY 月份, b.branch_name ORDER BY 月份, 坪效 DESC;"
+  },
+  {
+    "question": "查询城际车辆占比超50%的服务区坪效对比",
+    "sql": "WITH car_ratio AS (SELECT service_area_id, SUM(CASE WHEN car_type = '城际' THEN customer_count ELSE 0 END) * 1.0 / SUM(customer_count) AS城际占比 FROM bss_car_day_count GROUP BY service_area_id) SELECT b.service_name, SUM(b.pay_sum) / SUM(c.customer_count) AS 坪效 FROM bss_business_day_data b JOIN car_ratio r ON b.service_no = r.service_area_id JOIN bss_car_day_count c ON b.service_no = c.service_area_id WHERE r.城际占比 > 0.5 AND b.delete_ts IS NULL AND c.delete_ts IS NULL GROUP BY b.service_name ORDER BY 坪效 DESC;"
+  },
+  {
+    "question": "找出客单价最低的五个档口(客单价=金额/订单数)",
+    "sql": "SELECT branch_name, pay_sum / order_sum AS 客单价 FROM (SELECT branch_name, SUM(pay_sum) AS pay_sum, SUM(order_sum) AS order_sum FROM bss_business_day_data WHERE delete_ts IS NULL GROUP BY branch_name) t WHERE order_sum > 0 ORDER BY 客单价 ASC LIMIT 5;"
+  },
+  {
+    "question": "分析2023年Q2季度各服务区日均车流与营收关系",
+    "sql": "SELECT b.service_name, AVG(c.customer_count) AS 日均车流, AVG(b.pay_sum) AS 日均营收 FROM bss_business_day_data b JOIN bss_car_day_count c ON b.service_no = c.service_area_id WHERE b.oper_date BETWEEN '2023-04-01' AND '2023-06-30' GROUP BY b.service_name ORDER BY 日均车流 DESC;"
+  },
+  {
+    "question": "查询宜春服务区各档口微信支付占比TOP3",
+    "sql": "SELECT branch_name, SUM(wx) * 100.0 / SUM(pay_sum) AS 微信支付占比 FROM bss_business_day_data WHERE service_name = '宜春服务区' AND delete_ts IS NULL GROUP BY branch_name ORDER BY 微信支付占比 DESC LIMIT 3;"
+  },
+  {
+    "question": "统计各服务区坪效及车流排名差异(坪效排名与车流排名差值)",
+    "sql": "WITH rank_data AS (SELECT service_name, RANK() OVER (ORDER BY SUM(pay_sum)/SUM(customer_count) DESC) AS坪效排名, RANK() OVER (ORDER BY SUM(customer_count) DESC) AS车流排名 FROM bss_business_day_data b JOIN bss_car_day_count c ON b.service_no = c.service_area_id WHERE b.delete_ts IS NULL AND c.delete_ts IS NULL GROUP BY service_name) SELECT service_name, 坪效排名, 车流排名, ABS(坪效排名 -车流排名) AS排名差异 FROM rank_data ORDER BY 排名差异 DESC;"
+  },
+  {
+    "question": "分析周末与工作日营收差异(以2023-04为例)",
+    "sql": "SELECT CASE WHEN EXTRACT(ISODOW FROM oper_date) IN (6,7) THEN '周末' ELSE '工作日' END AS 日期类型, AVG(pay_sum) AS 平均营收, AVG(customer_count) AS 平均车流 FROM bss_business_day_data b JOIN bss_car_day_count c ON b.service_no = c.service_area_id WHERE oper_date BETWEEN '2023-04-01' AND '2023-04-30' GROUP BY 日期类型;"
+  },
+  {
+    "question": "节假日与平日平均消费金额对比分析",
+    "sql": "SELECT '节假日' AS \"分析类型\", AVG(pay_sum) AS \"平均消费金额\" FROM bss_business_day_data WHERE oper_date BETWEEN '2023-10-01' AND '2023-10-07' AND delete_ts IS NULL UNION ALL SELECT '平日', AVG(pay_sum) FROM bss_business_day_data WHERE oper_date NOT BETWEEN '2023-10-01' AND '2023-10-07' AND delete_ts IS NULL;"
+  },
+  {
+    "question": "节假日与平日各类型车辆平均流量对比分析",
+    "sql": "SELECT car_type AS \"车辆类型\", AVG(CASE WHEN count_date BETWEEN '2023-10-01' AND '2023-10-07' THEN customer_count END) AS \"节假日均值\", AVG(CASE WHEN count_date NOT BETWEEN '2023-10-01' AND '2023-10-07' THEN customer_count END) AS \"平日均值\" FROM bss_car_day_count WHERE delete_ts IS NULL GROUP BY car_type;"
+  },
+  {
+    "question": "节假日与平日不同支付方式金额占比对比",
+    "sql": "SELECT '节假日' AS \"类型\", SUM(wx)/SUM(pay_sum) AS \"微信占比\", SUM(zfb)/SUM(pay_sum) AS \"支付宝占比\", SUM(rmb)/SUM(pay_sum) AS \"现金占比\" FROM bss_business_day_data WHERE oper_date BETWEEN '2023-10-01' AND '2023-10-07' AND delete_ts IS NULL UNION ALL SELECT '平日', SUM(wx)/SUM(pay_sum), SUM(zfb)/SUM(pay_sum), SUM(rmb)/SUM(pay_sum) FROM bss_business_day_data WHERE oper_date NOT BETWEEN '2023-10-01' AND '2023-10-07' AND delete_ts IS NULL;"
+  },
+  {
+    "question": "节假日总订单量Top10服务区",
+    "sql": "SELECT service_name AS \"服务区名称\", SUM(order_sum) AS \"总订单量\" FROM bss_business_day_data WHERE oper_date BETWEEN '2023-10-01' AND '2023-10-07' AND delete_ts IS NULL GROUP BY service_name ORDER BY \"总订单量\" DESC LIMIT 10;"
+  },
+  {
+    "question": "节假日车流峰值日期识别",
+    "sql": "SELECT count_date AS \"日期\", SUM(customer_count) AS \"总车流量\" FROM bss_car_day_count WHERE count_date BETWEEN '2023-10-01' AND '2023-10-07' AND delete_ts IS NULL GROUP BY count_date ORDER BY \"总车流量\" DESC LIMIT 1;"
+  },
+  {
+    "question": "平日周消费金额波动趋势分析",
+    "sql": "SELECT EXTRACT(DOW FROM oper_date) AS \"星期\", AVG(pay_sum) AS \"平均消费\" FROM bss_business_day_data WHERE oper_date NOT BETWEEN '2023-10-01' AND '2023-10-07' AND delete_ts IS NULL GROUP BY EXTRACT(DOW FROM oper_date) ORDER BY \"星期\";"
+  },
+  {
+    "question": "节假日与非节假日现金支付占比差异",
+    "sql": "SELECT '节假日' AS \"类型\", SUM(rmb)/SUM(pay_sum) AS \"现金占比\" FROM bss_business_day_data WHERE oper_date BETWEEN '2023-10-01' AND '2023-10-07' AND delete_ts IS NULL UNION ALL SELECT '平日', SUM(rmb)/SUM(pay_sum) FROM bss_business_day_data WHERE oper_date NOT BETWEEN '2023-10-01' AND '2023-10-07' AND delete_ts IS NULL;"
+  },
+  {
+    "question": "节前节后3日车流环比增长率计算",
+    "sql": "SELECT (AVG(CASE WHEN count_date BETWEEN '2023-10-08' AND '2023-10-10' THEN customer_count END) - AVG(CASE WHEN count_date BETWEEN '2023-09-28' AND '2023-09-30' THEN customer_count END))/AVG(CASE WHEN count_date BETWEEN '2023-09-28' AND '2023-09-30' THEN customer_count END) AS \"增长率\" FROM bss_car_day_count WHERE count_date BETWEEN '2023-09-28' AND '2023-10-10' AND delete_ts IS NULL;"
+  },
+  {
+    "question": "节假日各档口消费总额Top10排名",
+    "sql": "SELECT branch_name AS \"档口名称\", SUM(pay_sum) AS \"总消费额\" FROM bss_business_day_data WHERE oper_date BETWEEN '2023-10-01' AND '2023-10-07' AND delete_ts IS NULL GROUP BY branch_name ORDER BY \"总消费额\" DESC LIMIT 10;"
+  },
+  {
+    "question": "节假日车辆类型占比分布统计",
+    "sql": "SELECT car_type AS \"车辆类型\", SUM(customer_count) AS \"总量\", ROUND(100*SUM(customer_count)/(SELECT SUM(customer_count) FROM bss_car_day_count WHERE count_date BETWEEN '2023-10-01' AND '2023-10-07' AND delete_ts IS NULL),2) AS \"占比百分比\" FROM bss_car_day_count WHERE count_date BETWEEN '2023-10-01' AND '2023-10-07' AND delete_ts IS NULL GROUP BY car_type ORDER BY \"总量\" DESC;"
+  },
+  {
+    "question": "统计最近一个月各服务区总营收排名(按支付金额降序)Top10",
+    "sql": "SELECT service_name AS 服务区, SUM(pay_sum) AS 总营收 FROM bss_business_day_data WHERE delete_ts IS NULL AND oper_date >= CURRENT_DATE - INTERVAL '1 month' GROUP BY service_name ORDER BY 总营收 DESC LIMIT 10;"
+  },
+  {
+    "question": "分析最近7天各区域(按服务区划分)日均车流密度Top5",
+    "sql": "SELECT service_area_id AS 服务区ID, AVG(customer_count) AS 日均车流量 FROM bss_car_day_count WHERE delete_ts IS NULL AND count_date >= CURRENT_DATE - INTERVAL '7 days' GROUP BY service_area_id ORDER BY 日均车流量 DESC LIMIT 5;"
+  },
+  {
+    "question": "对比营收Top10服务区与车流Top10服务区的重合率",
+    "sql": "WITH 营收排名 AS (SELECT service_name, SUM(pay_sum) AS 金额 FROM bss_business_day_data WHERE delete_ts IS NULL AND oper_date >= CURRENT_DATE - INTERVAL '1 month' GROUP BY service_name ORDER BY 金额 DESC LIMIT 10), 车流排名 AS (SELECT service_area_id, SUM(customer_count) AS 车流 FROM bss_car_day_count WHERE delete_ts IS NULL AND count_date >= CURRENT_DATE - INTERVAL '1 month' GROUP BY service_area_id ORDER BY 车流 DESC LIMIT 10) SELECT COUNT(*) FILTER (WHERE r.service_name = c.service_area_id) * 100.0 / 10 AS 重合率 FROM 营收排名 r, 车流排名 c;"
+  },
+  {
+    "question": "计算各区域(按branch_name首字分组)客单价(支付金额/订单数)Top3",
+    "sql": "SELECT SUBSTRING(branch_name FROM 1 FOR 1) AS 区域, service_name AS 服务区, AVG(pay_sum / order_sum) AS 客单价 FROM bss_business_day_data WHERE delete_ts IS NULL AND order_sum > 0 AND oper_date >= CURRENT_DATE - INTERVAL '1 month' GROUP BY SUBSTRING(branch_name FROM 1 FOR 1), service_name ORDER BY 区域, 客单价 DESC LIMIT 3;"
+  },
+  {
+    "question": "查询2023年Q2季度各服务区运营健康度评分(支付金额环比增长率)",
+    "sql": "SELECT service_name AS 服务区, (SUM(CASE WHEN EXTRACT(QUARTER FROM oper_date)=2 THEN pay_sum ELSE 0 END) - SUM(CASE WHEN EXTRACT(QUARTER FROM oper_date)=1 THEN pay_sum ELSE 0 END)) / NULLIF(SUM(CASE WHEN EXTRACT(QUARTER FROM oper_date)=1 THEN pay_sum ELSE 0 END), 0) AS 增长率 FROM bss_business_day_data WHERE delete_ts IS NULL AND EXTRACT(YEAR FROM oper_date)=2023 GROUP BY service_name ORDER BY 增长率 DESC;"
+  },
+  {
+    "question": "统计周末与工作日车流量差异最大的Top5服务区",
+    "sql": "SELECT service_area_id AS 服务区ID, AVG(CASE WHEN EXTRACT(ISODOW FROM count_date) IN (6,7) THEN customer_count ELSE 0 END) - AVG(CASE WHEN EXTRACT(ISODOW FROM count_date) NOT IN (6,7) THEN customer_count ELSE 0 END) AS 差异值 FROM bss_car_day_count WHERE delete_ts IS NULL GROUP BY service_area_id ORDER BY 差异值 DESC LIMIT 5;"
+  },
+  {
+    "question": "查询2023年节假日(五一假期)期间营收异常波动(超3倍均值)的服务区",
+    "sql": "WITH 日均基准 AS (SELECT service_name, AVG(pay_sum) AS 基准值 FROM bss_business_day_data WHERE delete_ts IS NULL AND oper_date NOT BETWEEN '2023-04-29' AND '2023-05-03' GROUP BY service_name) SELECT b.service_name AS 服务区, b.pay_sum AS 节假日营收, d.基准值 FROM bss_business_day_data b JOIN 日均基准 d ON b.service_name = d.service_name WHERE b.delete_ts IS NULL AND b.oper_date BETWEEN '2023-04-29' AND '2023-05-03' AND b.pay_sum > d.基准值 * 3;"
+  },
+  {
+    "question": "分析不同车辆类型(过境/城际)对应服务区营收相关性",
+    "sql": "SELECT '过境车流' AS 类型, AVG(pay_sum) AS 平均营收 FROM bss_business_day_data b JOIN bss_car_day_count c ON b.service_name = c.service_area_id WHERE c.car_type = '过境' AND b.delete_ts IS NULL AND c.delete_ts IS NULL UNION ALL SELECT '城际车流', AVG(pay_sum) FROM bss_business_day_data b JOIN bss_car_day_count c ON b.service_name = c.service_area_id WHERE c.car_type = '城际' AND b.delete_ts IS NULL AND c.delete_ts IS NULL;"
+  },
+  {
+    "question": "统计最近30天支付方式偏好(各服务区微信/支付宝占比分布)",
+    "sql": "SELECT service_name AS 服务区, SUM(wx) / SUM(pay_sum) * 100 AS 微信占比, SUM(zfb) / SUM(pay_sum) * 100 AS 支付宝占比 FROM bss_business_day_data WHERE delete_ts IS NULL AND oper_date >= CURRENT_DATE - 30 GROUP BY service_name ORDER BY 微信占比 DESC LIMIT 10;"
+  },
+  {
+    "question": "查询连续3天车流量增长且营收排名上升的服务区",
+    "sql": "WITH 车流趋势 AS (SELECT service_area_id, COUNT(*) FILTER (WHERE customer_count > LAG(customer_count,1,0) OVER (PARTITION BY service_area_id ORDER BY count_date)) AS 连续增长天数 FROM bss_car_day_count WHERE delete_ts IS NULL GROUP BY service_area_id HAVING COUNT(*) FILTER (WHERE customer_count > LAG(customer_count,1,0) OVER (PARTITION BY service_area_id ORDER BY count_date)) >=3), 营收趋势 AS (SELECT service_name, COUNT(*) FILTER (WHERE pay_sum > LAG(pay_sum,1,0) OVER (PARTITION BY service_name ORDER BY oper_date)) AS 排名上升次数 FROM bss_business_day_data WHERE delete_ts IS NULL GROUP BY service_name) SELECT c.service_area_id AS 服务区ID FROM 车流趋势 c JOIN 营收趋势 r ON c.service_area_id = r.service_name;"
+  }
+]

+ 14 - 0
data_pipeline/training_data/task_20250701_184430/task_config.json

@@ -0,0 +1,14 @@
+{
+  "task_id": "task_20250701_184430",
+  "created_at": "2025-07-01T10:44:30.782367",
+  "parameters": {
+    "db_connection": "postgresql://postgres:postgres@192.168.67.1:6432/highway_db",
+    "table_list_file": "data_pipeline/tables.txt",
+    "business_context": "高速公路服务区管理系统",
+    "enable_llm_repair": true,
+    "modify_original_file": true,
+    "enable_sql_validation": true,
+    "enable_training_data_load": true
+  },
+  "output_directory": "data_pipeline\\training_data\\task_20250701_184430"
+}

+ 88 - 0
data_pipeline/training_data/task_20250701_184430/task_result.json

@@ -0,0 +1,88 @@
+{
+  "success": true,
+  "workflow_summary": {
+    "total_duration": 1076.83,
+    "completed_steps": [
+      "ddl_md_generation",
+      "question_sql_generation",
+      "sql_validation",
+      "training_data_load"
+    ],
+    "failed_steps": [],
+    "total_steps": 4,
+    "workflow_started": "2025-07-01T18:45:08.144882",
+    "workflow_completed": "2025-07-01T19:03:04.975353"
+  },
+  "input_parameters": {
+    "db_connection": "postgresql://postgres:***@192.168.67.1:6432/highway_db",
+    "table_list_file": "data_pipeline/tables.txt",
+    "business_context": "高速公路服务区管理系统",
+    "db_name": "highway_db",
+    "output_directory": "data_pipeline\\training_data\\task_20250701_184430",
+    "enable_sql_validation": true,
+    "enable_llm_repair": true,
+    "modify_original_file": true,
+    "enable_training_data_load": true
+  },
+  "processing_results": {
+    "ddl_md_generation": {
+      "total_tables": 2,
+      "processed_successfully": 2,
+      "failed": 0,
+      "files_generated": 4,
+      "duration": 124.50190877914429
+    },
+    "question_sql_generation": {
+      "output_file": "data_pipeline\\training_data\\task_20250701_184430\\qs_highway_db_20250701_185822_pair.json",
+      "total_questions": 50,
+      "total_themes": 5,
+      "successful_themes": 5,
+      "failed_themes": [],
+      "duration": 696.1132636070251
+    },
+    "sql_validation": {
+      "original_sql_count": 50,
+      "valid_sql_count": 49,
+      "invalid_sql_count": 1,
+      "success_rate": 0.98,
+      "repair_stats": {
+        "attempted": 6,
+        "successful": 5,
+        "failed": 1
+      },
+      "file_modification_stats": {
+        "modified": 5,
+        "deleted": 1,
+        "failed_modifications": 0
+      },
+      "average_execution_time": 0.033519439697265625,
+      "total_retries": 0,
+      "duration": 191.81386828422546
+    },
+    "training_data_load": {
+      "training_data_dir": "data_pipeline\\training_data\\task_20250701_184430",
+      "load_successful": true,
+      "total_records": 340,
+      "data_type_counts": {
+        "sql": 301,
+        "documentation": 20,
+        "ddl": 18,
+        "error_sql": 1
+      },
+      "duration": 62.95965242385864
+    }
+  },
+  "final_outputs": {
+    "primary_output_file": "data_pipeline\\training_data\\task_20250701_184430\\qs_highway_db_20250701_185822_pair.json",
+    "output_directory": "data_pipeline\\training_data\\task_20250701_184430",
+    "final_question_count": 49,
+    "backup_files_created": true
+  },
+  "performance_metrics": {
+    "step1_duration": 124.5,
+    "step2_duration": 696.11,
+    "step3_duration": 191.81,
+    "step4_duration": 62.96,
+    "total_duration": 1076.83
+  }
+}

+ 2 - 2
data_pipeline/utils/file_manager.py

@@ -1,7 +1,7 @@
 import os
 from typing import Dict, Set, Optional
 from pathlib import Path
-from core.logging import get_data_pipeline_logger
+import logging
 
 class FileNameManager:
     """文件名管理器,处理文件命名和冲突"""
@@ -10,7 +10,7 @@ class FileNameManager:
         self.output_dir = output_dir
         self.used_names: Set[str] = set()
         self.name_mapping: Dict[str, str] = {}  # 原始名 -> 实际文件名
-        self.logger = get_data_pipeline_logger("FileNameManager")
+        self.logger = logging.getLogger("FileNameManager")
         
         # 扫描已存在的文件
         self._scan_existing_files()

+ 2 - 2
data_pipeline/utils/large_table_handler.py

@@ -1,13 +1,13 @@
 import random
 from typing import List, Dict, Any, Optional
 from data_pipeline.config import SCHEMA_TOOLS_CONFIG
-from core.logging import get_data_pipeline_logger
+import logging
 
 class LargeTableHandler:
     """大表处理策略"""
     
     def __init__(self):
-        self.logger = get_data_pipeline_logger("LargeTableHandler")
+        self.logger = logging.getLogger("LargeTableHandler")
         self.large_table_threshold = SCHEMA_TOOLS_CONFIG.get("large_table_threshold", 1000000)
         self.skip_large_tables = SCHEMA_TOOLS_CONFIG.get("skip_large_tables", False)
         self.max_table_size = SCHEMA_TOOLS_CONFIG.get("max_table_size", 10000000)

+ 4 - 4
data_pipeline/utils/logger.py

@@ -2,7 +2,7 @@
 原有日志系统已被新的统一日志系统替代
 保留此文件仅为避免导入错误
 """
-from core.logging import get_data_pipeline_logger
+# 移除旧的日志导入
 from typing import Optional
 import logging
 
@@ -14,8 +14,8 @@ def setup_logging(verbose: bool = False, log_file: Optional[str] = None, log_dir
     pass
 
 def get_logger(name: str = "DataPipeline"):
-    """直接返回新的logger"""
-    return get_data_pipeline_logger(name)
+    """直接返回logger"""
+    return logging.getLogger(name)
 
 def get_colored_console_handler(level=logging.INFO):
     """兼容性函数,返回None"""
@@ -25,7 +25,7 @@ class TableProcessingLogger:
     """兼容性类,实际使用新的日志系统"""
     
     def __init__(self, logger_name: str = "schema_tools.TableProcessor"):
-        self.logger = get_data_pipeline_logger("TableProcessor")
+        self.logger = logging.getLogger("TableProcessor")
         self.current_table = None
         self.start_time = None
     

+ 2 - 2
data_pipeline/utils/permission_checker.py

@@ -1,13 +1,13 @@
 from typing import Dict, Optional
 import asyncio
-from core.logging import get_data_pipeline_logger
+import logging
 
 class DatabasePermissionChecker:
     """数据库权限检查器"""
     
     def __init__(self, db_inspector):
         self.db_inspector = db_inspector
-        self.logger = get_data_pipeline_logger("DatabasePermissionChecker")
+        self.logger = logging.getLogger("DatabasePermissionChecker")
         self._permission_cache: Optional[Dict[str, bool]] = None
     
     async def check_permissions(self) -> Dict[str, bool]:

+ 2 - 2
data_pipeline/utils/system_filter.py

@@ -1,6 +1,6 @@
 from typing import List, Set
 from data_pipeline.config import SCHEMA_TOOLS_CONFIG
-from core.logging import get_data_pipeline_logger
+import logging
 
 class SystemTableFilter:
     """系统表过滤器"""
@@ -18,7 +18,7 @@ class SystemTableFilter:
     ]
     
     def __init__(self):
-        self.logger = get_data_pipeline_logger("SystemTableFilter")
+        self.logger = logging.getLogger("SystemTableFilter")
         
         # 加载自定义配置
         self.custom_prefixes = SCHEMA_TOOLS_CONFIG.get("custom_system_prefixes", [])

+ 2 - 2
data_pipeline/utils/table_parser.py

@@ -1,12 +1,12 @@
 import os
 from typing import List, Tuple
-from core.logging import get_data_pipeline_logger
+import logging
 
 class TableListParser:
     """表清单解析器"""
     
     def __init__(self):
-        self.logger = get_data_pipeline_logger("TableListParser")
+        self.logger = logging.getLogger("TableListParser")
     
     def parse_file(self, file_path: str) -> List[str]:
         """

+ 2 - 2
data_pipeline/validators/file_count_validator.py

@@ -4,7 +4,7 @@ from dataclasses import dataclass, field
 
 from data_pipeline.utils.table_parser import TableListParser
 from data_pipeline.config import SCHEMA_TOOLS_CONFIG
-from core.logging import get_data_pipeline_logger
+import logging
 
 
 @dataclass
@@ -24,7 +24,7 @@ class FileCountValidator:
     """文件数量验证器"""
     
     def __init__(self):
-        self.logger = get_data_pipeline_logger("FileCountValidator")
+        self.logger = logging.getLogger("FileCountValidator")
         self.config = SCHEMA_TOOLS_CONFIG
         
     def validate(self, table_list_file: str, output_dir: str) -> ValidationResult:

+ 14 - 2
data_pipeline/validators/sql_validation_agent.py

@@ -8,7 +8,7 @@ from typing import List, Dict, Any, Optional
 
 from data_pipeline.config import SCHEMA_TOOLS_CONFIG
 from data_pipeline.validators import SQLValidator, SQLValidationResult, ValidationStats
-from core.logging import get_data_pipeline_logger
+from data_pipeline.dp_logging import get_logger
 
 
 class SQLValidationAgent:
@@ -18,6 +18,7 @@ class SQLValidationAgent:
                  db_connection: str,
                  input_file: str,
                  output_dir: str = None,
+                 task_id: str = None,
                  enable_sql_repair: bool = None,
                  modify_original_file: bool = None):
         """
@@ -27,6 +28,7 @@ class SQLValidationAgent:
             db_connection: 数据库连接字符串
             input_file: 输入的JSON文件路径(包含Question-SQL对)
             output_dir: 输出目录(默认为输入文件同目录)
+            task_id: 任务ID
             enable_sql_repair: 是否启用SQL修复(覆盖配置文件)
             modify_original_file: 是否修改原始文件(覆盖配置文件)
         """
@@ -40,7 +42,17 @@ class SQLValidationAgent:
             self.config['enable_sql_repair'] = enable_sql_repair
         if modify_original_file is not None:
             self.config['modify_original_file'] = modify_original_file
-        self.logger = get_data_pipeline_logger("SQLValidationAgent")
+            
+        self.task_id = task_id
+        
+        # 初始化独立日志系统
+        if task_id:
+            self.logger = get_logger("SQLValidationAgent", task_id)
+        else:
+            # 脚本模式下,如果没有传递task_id,生成一个
+            from datetime import datetime
+            self.task_id = f"manual_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
+            self.logger = get_logger("SQLValidationAgent", self.task_id)
         
         # 初始化验证器
         self.validator = SQLValidator(db_connection)

+ 2 - 2
data_pipeline/validators/sql_validator.py

@@ -4,7 +4,7 @@ from typing import Dict, Any, List, Optional
 from dataclasses import dataclass, field
 
 from data_pipeline.config import SCHEMA_TOOLS_CONFIG
-from core.logging import get_data_pipeline_logger
+import logging
 
 
 @dataclass
@@ -52,7 +52,7 @@ class SQLValidator:
         self.db_connection = db_connection
         self.connection_pool = None
         self.config = SCHEMA_TOOLS_CONFIG['sql_validation']
-        self.logger = get_data_pipeline_logger("SQLValidator")
+        self.logger = logging.getLogger("SQLValidator")
         
     async def _get_connection_pool(self):
         """获取或复用现有连接池"""