基于您的需求和反馈,设计一套统一的日志服务,专注核心功能:
项目根目录/
├── core/
│   └── logging/
│       ├── __init__.py           # 日志服务入口
│       └── log_manager.py        # 核心日志管理器
├── logs/                         # 日志文件目录
│   ├── app.log                  # 主应用日志(citu_app.py和通用模块)
│   ├── agent.log                # agent模块日志
│   ├── vanna.log                # vanna相关模块日志
│   └── data_pipeline.log        # data_pipeline模块日志
└── config/
    └── logging_config.yaml       # 日志配置文件
# core/logging/log_manager.py
import logging
import logging.handlers
import os
from typing import Dict, Optional
from pathlib import Path
import yaml
import contextvars
# 上下文变量,存储可选的上下文信息
log_context = contextvars.ContextVar('log_context', default={})
class ContextFilter(logging.Filter):
    """添加上下文信息到日志记录"""
    def filter(self, record):
        ctx = log_context.get()
        # 设置默认值,避免格式化错误
        record.session_id = ctx.get('session_id', 'N/A')
        record.user_id = ctx.get('user_id', 'anonymous')
        record.request_id = ctx.get('request_id', 'N/A')
        return True
class LogManager:
    """统一日志管理器 - 类似Log4j的功能"""
    
    _instance = None
    _loggers: Dict[str, logging.Logger] = {}
    _initialized = False
    _fallback_to_console = False  # 标记是否降级到控制台
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance
    
    def __init__(self):
        if not self._initialized:
            self.config = None
            self.base_log_dir = Path("logs")
            self._setup_base_directory()
            LogManager._initialized = True
    
    def initialize(self, config_path: str = "config/logging_config.yaml"):
        """初始化日志系统"""
        self.config = self._load_config(config_path)
        self._setup_base_directory()
        self._configure_root_logger()
    
    def get_logger(self, name: str, module: str = "default") -> logging.Logger:
        """获取指定模块的logger"""
        logger_key = f"{module}.{name}"
        
        if logger_key not in self._loggers:
            logger = logging.getLogger(logger_key)
            self._configure_logger(logger, module)
            self._loggers[logger_key] = logger
        
        return self._loggers[logger_key]
    
    def set_context(self, **kwargs):
        """设置日志上下文(可选)"""
        ctx = log_context.get()
        ctx.update(kwargs)
        log_context.set(ctx)
    
    def clear_context(self):
        """清除日志上下文"""
        log_context.set({})
    
    def _load_config(self, config_path: str) -> dict:
        """加载配置文件"""
        try:
            with open(config_path, 'r', encoding='utf-8') as f:
                return yaml.safe_load(f)
        except FileNotFoundError:
            print(f"[WARNING] 配置文件 {config_path} 未找到,使用默认配置")
            return self._get_default_config()
        except Exception as e:
            print(f"[ERROR] 加载配置文件失败: {e},使用默认配置")
            return self._get_default_config()
    
    def _get_default_config(self) -> dict:
        """获取默认配置"""
        return {
            'global': {'base_level': 'INFO'},
            'default': {
                'level': 'INFO',
                'console': {
                    'enabled': True,
                    'level': 'INFO',
                    'format': '%(asctime)s [%(levelname)s] %(name)s: %(message)s'
                },
                'file': {
                    'enabled': True,
                    'level': 'DEBUG',
                    'filename': 'app.log',
                    'format': '%(asctime)s [%(levelname)s] [%(name)s] [user:%(user_id)s] [session:%(session_id)s] %(filename)s:%(lineno)d - %(message)s',
                    'rotation': {
                        'enabled': True,
                        'max_size': '50MB',
                        'backup_count': 10
                    }
                }
            },
            'modules': {}
        }
    
    def _setup_base_directory(self):
        """创建日志目录(带降级策略)"""
        try:
            self.base_log_dir.mkdir(parents=True, exist_ok=True)
            self._fallback_to_console = False
        except Exception as e:
            print(f"[WARNING] 无法创建日志目录 {self.base_log_dir},将只使用控制台输出: {e}")
            self._fallback_to_console = True
    
    def _configure_root_logger(self):
        """配置根日志器"""
        root_logger = logging.getLogger()
        root_logger.setLevel(getattr(logging, self.config['global']['base_level'].upper()))
    
    def _configure_logger(self, logger: logging.Logger, module: str):
        """配置具体的logger"""
        module_config = self.config.get('modules', {}).get(module, self.config['default'])
        
        # 设置日志级别
        level = getattr(logging, module_config['level'].upper())
        logger.setLevel(level)
        
        # 清除已有处理器
        logger.handlers.clear()
        logger.propagate = False
        
        # 添加控制台处理器
        if module_config.get('console', {}).get('enabled', True):
            console_handler = self._create_console_handler(module_config['console'])
            console_handler.addFilter(ContextFilter())
            logger.addHandler(console_handler)
        
        # 添加文件处理器(如果没有降级到控制台)
        if not self._fallback_to_console and module_config.get('file', {}).get('enabled', True):
            try:
                file_handler = self._create_file_handler(module_config['file'], module)
                file_handler.addFilter(ContextFilter())
                logger.addHandler(file_handler)
            except Exception as e:
                print(f"[WARNING] 无法创建文件处理器: {e}")
    
    def _create_console_handler(self, console_config: dict) -> logging.StreamHandler:
        """创建控制台处理器"""
        handler = logging.StreamHandler()
        handler.setLevel(getattr(logging, console_config.get('level', 'INFO').upper()))
        
        formatter = logging.Formatter(
            console_config.get('format', '%(asctime)s [%(levelname)s] %(name)s: %(message)s'),
            datefmt='%Y-%m-%d %H:%M:%S'
        )
        handler.setFormatter(formatter)
        return handler
    
    def _create_file_handler(self, file_config: dict, module: str) -> logging.Handler:
        """创建文件处理器(支持自动轮转)"""
        log_file = self.base_log_dir / file_config.get('filename', f'{module}.log')
        
        # 使用RotatingFileHandler实现自动轮转和清理
        rotation_config = file_config.get('rotation', {})
        if rotation_config.get('enabled', False):
            handler = logging.handlers.RotatingFileHandler(
                log_file,
                maxBytes=self._parse_size(rotation_config.get('max_size', '50MB')),
                backupCount=rotation_config.get('backup_count', 10),
                encoding='utf-8'
            )
        else:
            handler = logging.FileHandler(log_file, encoding='utf-8')
        
        handler.setLevel(getattr(logging, file_config.get('level', 'DEBUG').upper()))
        
        formatter = logging.Formatter(
            file_config.get('format', '%(asctime)s [%(levelname)s] [%(name)s] %(filename)s:%(lineno)d - %(message)s'),
            datefmt='%Y-%m-%d %H:%M:%S'
        )
        handler.setFormatter(formatter)
        return handler
    
    def _parse_size(self, size_str: str) -> int:
        """解析大小字符串,如 '50MB' -> 字节数"""
        size_str = size_str.upper()
        if size_str.endswith('KB'):
            return int(size_str[:-2]) * 1024
        elif size_str.endswith('MB'):
            return int(size_str[:-2]) * 1024 * 1024
        elif size_str.endswith('GB'):
            return int(size_str[:-2]) * 1024 * 1024 * 1024
        else:
            return int(size_str)
# core/logging/__init__.py
from .log_manager import LogManager
import logging
# 全局日志管理器实例
_log_manager = LogManager()
def initialize_logging(config_path: str = "config/logging_config.yaml"):
    """初始化项目日志系统"""
    _log_manager.initialize(config_path)
def get_logger(name: str, module: str = "default") -> logging.Logger:
    """获取logger实例 - 主要API"""
    return _log_manager.get_logger(name, module)
# 便捷方法
def get_data_pipeline_logger(name: str) -> logging.Logger:
    """获取data_pipeline模块logger"""
    return get_logger(name, "data_pipeline")
def get_agent_logger(name: str) -> logging.Logger:
    """获取agent模块logger"""
    return get_logger(name, "agent")
def get_vanna_logger(name: str) -> logging.Logger:
    """获取vanna模块logger"""
    return get_logger(name, "vanna")
def get_app_logger(name: str) -> logging.Logger:
    """获取app模块logger"""
    return get_logger(name, "app")
# 上下文管理便捷方法
def set_log_context(**kwargs):
    """设置日志上下文(可选)
    示例: set_log_context(user_id='user123', session_id='sess456')
    """
    _log_manager.set_context(**kwargs)
def clear_log_context():
    """清除日志上下文"""
    _log_manager.clear_context()
# config/logging_config.yaml
version: 1
# 全局配置
global:
  base_level: INFO
  
# 默认配置(用于app.log)
default:
  level: INFO
  console:
    enabled: true
    level: INFO
    format: "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
  file:
    enabled: true
    level: DEBUG
    filename: "app.log"
    format: "%(asctime)s [%(levelname)s] [%(name)s] [user:%(user_id)s] [session:%(session_id)s] %(filename)s:%(lineno)d - %(message)s"
    rotation:
      enabled: true
      max_size: "50MB"
      backup_count: 10
# 模块特定配置
modules:
  app:
    level: INFO
    console:
      enabled: true
      level: INFO
      format: "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
    file:
      enabled: true
      level: DEBUG
      filename: "app.log"
      format: "%(asctime)s [%(levelname)s] [%(name)s] [user:%(user_id)s] [session:%(session_id)s] %(filename)s:%(lineno)d - %(message)s"
      rotation:
        enabled: true
        max_size: "50MB"
        backup_count: 10
  
  data_pipeline:
    level: DEBUG
    console:
      enabled: true
      level: INFO
      format: "%(asctime)s [%(levelname)s] Pipeline: %(message)s"
    file:
      enabled: true
      level: DEBUG
      filename: "data_pipeline.log"
      format: "%(asctime)s [%(levelname)s] [%(name)s] %(filename)s:%(lineno)d - %(message)s"
      rotation:
        enabled: true
        max_size: "30MB"
        backup_count: 8
  
  agent:
    level: DEBUG
    console:
      enabled: true
      level: INFO
      format: "%(asctime)s [%(levelname)s] Agent: %(message)s"
    file:
      enabled: true
      level: DEBUG
      filename: "agent.log"
      format: "%(asctime)s [%(levelname)s] [%(name)s] [user:%(user_id)s] [session:%(session_id)s] %(filename)s:%(lineno)d - %(message)s"
      rotation:
        enabled: true
        max_size: "30MB"
        backup_count: 8
  
  vanna:
    level: INFO
    console:
      enabled: true
      level: INFO
      format: "%(asctime)s [%(levelname)s] Vanna: %(message)s"
    file:
      enabled: true
      level: DEBUG
      filename: "vanna.log"
      format: "%(asctime)s [%(levelname)s] [%(name)s] %(filename)s:%(lineno)d - %(message)s"
      rotation:
        enabled: true
        max_size: "20MB"
        backup_count: 5
创建日志服务目录结构
mkdir -p core/logging
mkdir -p config
mkdir -p logs
core/logging/log_manager.pycore/logging/__init__.py创建 config/logging_config.yaml
集成到citu_app.py ```python
from core.logging import initialize_logging, get_app_logger, set_log_context, clear_log_context
initialize_logging("config/logging_config.yaml") app_logger = get_app_logger("CituApp")
@app.flask_app.before_request def before_request(): # 为每个请求设置上下文(如果有的话) request_id = str(uuid.uuid4())[:8] user_id = request.headers.get('X-User-ID', 'anonymous') set_log_context(request_id=request_id, user_id=user_id)
@app.flask_app.after_request def after_request(response): # 清理上下文 clear_log_context() return response ```
# 替换所有print语句
# agent/citu_agent.py
from core.logging import get_agent_logger
class CituLangGraphAgent:
    def __init__(self):
        self.logger = get_agent_logger("CituAgent")
        self.logger.info("LangGraph Agent初始化")
        # 直接替换原有的 print 语句
# 方案一:完全删除 data_pipeline/utils/logger.py
# 直接在所有使用位置导入新的日志系统
# 方案二:保留文件但清空实现
# data_pipeline/utils/logger.py
"""
原有日志系统已被新的统一日志系统替代
保留此文件仅为避免导入错误
"""
from core.logging import get_data_pipeline_logger
def setup_logging(verbose: bool = False, log_file: str = None, log_dir: str = None):
    """函数保留以避免调用错误,但不做任何事"""
    pass
def get_logger(name: str = "DataPipeline"):
    """直接返回新的logger"""
    return get_data_pipeline_logger(name)
# 在各个文件中直接使用新的logger
# data_pipeline/schema_workflow.py
from core.logging import get_data_pipeline_logger
class SchemaWorkflowOrchestrator:
    def __init__(self, ...):
        # 原来: self.logger = logging.getLogger("schema_tools.SchemaWorkflowOrchestrator") 
        # 现在: 直接使用新系统
        self.logger = get_data_pipeline_logger("SchemaWorkflow")
# data_pipeline/qa_generation/qs_agent.py
from core.logging import get_data_pipeline_logger
class QuestionSQLGenerationAgent:
    def __init__(self, ...):
        # 原来: self.logger = logging.getLogger("schema_tools.QSAgent")
        # 现在: 直接替换
        self.logger = get_data_pipeline_logger("QSAgent")
# customllm/base_llm_chat.py
from core.logging import get_vanna_logger
class BaseLLMChat(VannaBase, ABC):
    def __init__(self, config=None):
        VannaBase.__init__(self, config=config)
        self.logger = get_vanna_logger("BaseLLMChat")
        
        # 替换原有的print
        self.logger.info("传入的 config 参数如下:")
        for key, value in self.config.items():
            self.logger.info(f"  {key}: {value}")
# common/qa_feedback_manager.py
from core.logging import get_app_logger
class QAFeedbackManager:
    def __init__(self, vanna_instance=None):
        self.logger = get_app_logger("QAFeedbackManager")
        # 替换原有的print
        self.logger.info("QAFeedbackManager 初始化")
# citu_app.py
from core.logging import initialize_logging, get_app_logger, get_agent_logger, set_log_context
import uuid
# 应用启动时初始化
initialize_logging("config/logging_config.yaml")
app_logger = get_app_logger("CituApp")
# API端点示例
@app.flask_app.route('/api/v0/ask_agent', methods=['POST'])
def ask_agent():
    logger = get_agent_logger("AskAgent")
    request_id = str(uuid.uuid4())[:8]
    
    try:
        data = request.json
        user_id = data.get('user_id')
        
        # 设置上下文(安全的,即使没有user_id)
        set_log_context(
            request_id=request_id,
            user_id=user_id or 'anonymous'
        )
        
        logger.info("开始处理请求")
        # ... 业务逻辑
        
        logger.info("请求处理成功")
        return success_response(...)
        
    except Exception as e:
        logger.error(f"请求处理失败: {str(e)}", exc_info=True)
        return error_response(...)
    finally:
        clear_log_context()
# data_pipeline/schema_workflow.py
import time
from core.logging import get_data_pipeline_logger
class SchemaWorkflowOrchestrator:
    def __init__(self, ...):
        self.logger = get_data_pipeline_logger("SchemaWorkflow")
    
    async def execute_complete_workflow(self):
        """执行完整工作流"""
        workflow_start = time.time()
        self.logger.info("开始执行完整的Schema工作流")
        
        # 保持原有的跨函数时间统计逻辑
        try:
            # 步骤1
            step1_start = time.time()
            self.logger.info("步骤1: 开始生成DDL和MD文件")
            # ... 执行逻辑
            step1_time = time.time() - step1_start
            self.logger.info(f"步骤1完成,耗时: {step1_time:.2f}秒")
            
            # 继续其他步骤...
            
        except Exception as e:
            self.logger.error(f"工作流执行失败: {str(e)}", exc_info=True)
            raise
# config/logging_config_dev.yaml
version: 1
global:
  base_level: DEBUG
default:
  level: DEBUG
  console:
    enabled: true
    level: DEBUG
  file:
    enabled: false  # 开发环境可以只用控制台
# config/logging_config_prod.yaml
version: 1
global:
  base_level: INFO
default:
  level: INFO
  console:
    enabled: false  # 生产环境不输出到控制台
  file:
    enabled: true
    level: INFO
    rotation:
      enabled: true
      max_size: "100MB"
      backup_count: 20
这个优化后的方案特点:
该方案已根据您的反馈完全优化,可以直接落地实施。