全局log服务改造方案.md 29 KB

项目日志系统改造设计方案(精简实用版)

1. 整体设计理念

基于您的需求,设计一套类似Log4j的统一日志服务,专注核心功能:

  • 统一的日志级别管理(info/error/debug/warning)
  • 可配置的日志输出路径
  • 支持控制台和文件输出
  • 不同模块独立日志文件(data_pipeline、agent、vanna等)
  • 自动日志轮转和清理
  • 与现有vanna/langchain/langgraph技术栈兼容

2. 核心架构设计

2.1 精简的日志服务层次结构

项目根目录/
├── core/
│   └── logging/
│       ├── __init__.py           # 日志服务入口
│       └── log_manager.py        # 核心日志管理器
├── logs/                         # 日志文件目录
│   ├── data_pipeline.log        # data_pipeline模块日志
│   ├── agent.log                # agent模块日志
│   ├── vanna.log                # vanna模块日志
│   ├── langchain.log            # langchain模块日志
│   ├── langgraph.log            # langgraph模块日志
│   └── app.log                  # 主应用日志
└── config/
    └── logging_config.yaml       # 日志配置文件

2.2 核心日志管理器设计(增强版)

基于用户反馈,增强版包含以下特性:

  • 异步日志支持
  • 灵活的上下文管理(user_id可选)
  • 错误降级策略
  • 重点支持citu_app.py
# core/logging/log_manager.py
import logging
import logging.handlers
import os
from typing import Dict, Optional
from pathlib import Path
import yaml
import asyncio
from concurrent.futures import ThreadPoolExecutor
import contextvars

# 上下文变量,存储可选的上下文信息
log_context = contextvars.ContextVar('log_context', default={})

class ContextFilter(logging.Filter):
    """添加上下文信息到日志记录"""
    def filter(self, record):
        ctx = log_context.get()
        # 设置默认值,避免格式化错误
        record.session_id = ctx.get('session_id', 'N/A')
        record.user_id = ctx.get('user_id', 'anonymous')
        record.request_id = ctx.get('request_id', 'N/A')
        return True

class LogManager:
    """统一日志管理器 - 类似Log4j的功能"""
    
    _instance = None
    _loggers: Dict[str, logging.Logger] = {}
    _initialized = False
    _executor = None
    _fallback_to_console = False  # 标记是否降级到控制台
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance
    
    def __init__(self):
        if not self._initialized:
            self.config = None
            self.base_log_dir = Path("logs")
            self._executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="log")
            self._setup_base_directory()
            LogManager._initialized = True
    
    def initialize(self, config_path: str = "config/logging_config.yaml"):
        """初始化日志系统"""
        self.config = self._load_config(config_path)
        self._setup_base_directory()
        self._configure_root_logger()
    
    def get_logger(self, name: str, module: str = "default") -> logging.Logger:
        """获取指定模块的logger"""
        logger_key = f"{module}.{name}"
        
        if logger_key not in self._loggers:
            logger = logging.getLogger(logger_key)
            self._configure_logger(logger, module)
            self._loggers[logger_key] = logger
        
        return self._loggers[logger_key]
    
    async def alog(self, logger: logging.Logger, level: str, message: str, **kwargs):
        """异步日志方法"""
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(
            self._executor,
            lambda: getattr(logger, level)(message, **kwargs)
        )
    
    def set_context(self, **kwargs):
        """设置日志上下文(可选)"""
        ctx = log_context.get()
        ctx.update(kwargs)
        log_context.set(ctx)
    
    def clear_context(self):
        """清除日志上下文"""
        log_context.set({})
    
    def _load_config(self, config_path: str) -> dict:
        """加载配置文件(带错误处理)"""
        try:
            with open(config_path, 'r', encoding='utf-8') as f:
                return yaml.safe_load(f)
        except FileNotFoundError:
            print(f"[WARNING] 配置文件 {config_path} 未找到,使用默认配置")
            return self._get_default_config()
        except Exception as e:
            print(f"[ERROR] 加载配置文件失败: {e},使用默认配置")
            return self._get_default_config()
    
    def _setup_base_directory(self):
        """创建日志目录(带降级策略)"""
        try:
            self.base_log_dir.mkdir(parents=True, exist_ok=True)
            self._fallback_to_console = False
        except Exception as e:
            print(f"[WARNING] 无法创建日志目录 {self.base_log_dir},将只使用控制台输出: {e}")
            self._fallback_to_console = True
    
    def _configure_logger(self, logger: logging.Logger, module: str):
        """配置具体的logger(支持降级)"""
        module_config = self.config.get('modules', {}).get(module, self.config['default'])
        
        # 设置日志级别
        level = getattr(logging, module_config['level'].upper())
        logger.setLevel(level)
        
        # 清除已有处理器
        logger.handlers.clear()
        logger.propagate = False
        
        # 添加控制台处理器
        if module_config.get('console', {}).get('enabled', True):
            console_handler = self._create_console_handler(module_config['console'])
            console_handler.addFilter(ContextFilter())
            logger.addHandler(console_handler)
        
        # 添加文件处理器(如果没有降级到控制台)
        if not self._fallback_to_console and module_config.get('file', {}).get('enabled', True):
            try:
                file_handler = self._create_file_handler(module_config['file'], module)
                file_handler.addFilter(ContextFilter())
                logger.addHandler(file_handler)
            except Exception as e:
                print(f"[WARNING] 无法创建文件处理器: {e}")
    
    def _get_default_config(self) -> dict:
        """获取默认配置"""
        return {
            'global': {'base_level': 'INFO'},
            'default': {
                'level': 'INFO',
                'console': {
                    'enabled': True,
                    'level': 'INFO',
                    'format': '%(asctime)s [%(levelname)s] %(name)s: %(message)s'
                },
                'file': {
                    'enabled': True,
                    'level': 'DEBUG',
                    'filename': 'app.log',
                    'format': '%(asctime)s [%(levelname)s] [%(name)s] [user:%(user_id)s] [session:%(session_id)s] %(filename)s:%(lineno)d - %(message)s',
                    'rotation': {
                        'enabled': True,
                        'max_size': '50MB',
                        'backup_count': 10
                    }
                }
            },
            'modules': {}
        }
    
    def _create_console_handler(self, console_config: dict) -> logging.StreamHandler:
        """创建控制台处理器"""
        handler = logging.StreamHandler()
        handler.setLevel(getattr(logging, console_config.get('level', 'INFO').upper()))
        
        formatter = logging.Formatter(
            console_config.get('format', '%(asctime)s [%(levelname)s] %(name)s: %(message)s'),
            datefmt='%Y-%m-%d %H:%M:%S'
        )
        handler.setFormatter(formatter)
        return handler
    
    def _create_file_handler(self, file_config: dict, module: str) -> logging.Handler:
        """创建文件处理器(支持自动轮转)"""
        log_file = self.base_log_dir / file_config.get('filename', f'{module}.log')
        
        # 使用RotatingFileHandler实现自动轮转和清理
        rotation_config = file_config.get('rotation', {})
        if rotation_config.get('enabled', False):
            handler = logging.handlers.RotatingFileHandler(
                log_file,
                maxBytes=self._parse_size(rotation_config.get('max_size', '50MB')),
                backupCount=rotation_config.get('backup_count', 10),
                encoding='utf-8'
            )
        else:
            handler = logging.FileHandler(log_file, encoding='utf-8')
        
        handler.setLevel(getattr(logging, file_config.get('level', 'DEBUG').upper()))
        
        formatter = logging.Formatter(
            file_config.get('format', '%(asctime)s [%(levelname)s] [%(name)s] %(filename)s:%(lineno)d - %(message)s'),
            datefmt='%Y-%m-%d %H:%M:%S'
        )
        handler.setFormatter(formatter)
        return handler
    
    def _parse_size(self, size_str: str) -> int:
        """解析大小字符串,如 '50MB' -> 字节数"""
        size_str = size_str.upper()
        if size_str.endswith('KB'):
            return int(size_str[:-2]) * 1024
        elif size_str.endswith('MB'):
            return int(size_str[:-2]) * 1024 * 1024
        elif size_str.endswith('GB'):
            return int(size_str[:-2]) * 1024 * 1024 * 1024
        else:
            return int(size_str)
    
    def __del__(self):
        """清理资源"""
        if self._executor:
            self._executor.shutdown(wait=False)

2.3 统一日志接口(增强版)

# core/logging/__init__.py
from .log_manager import LogManager
import logging

# 全局日志管理器实例
_log_manager = LogManager()

def initialize_logging(config_path: str = "config/logging_config.yaml"):
    """初始化项目日志系统"""
    _log_manager.initialize(config_path)

def get_logger(name: str, module: str = "default") -> logging.Logger:
    """获取logger实例 - 主要API"""
    return _log_manager.get_logger(name, module)

# 便捷方法
def get_data_pipeline_logger(name: str) -> logging.Logger:
    """获取data_pipeline模块logger"""
    return get_logger(name, "data_pipeline")

def get_agent_logger(name: str) -> logging.Logger:
    """获取agent模块logger"""
    return get_logger(name, "agent")

def get_vanna_logger(name: str) -> logging.Logger:
    """获取vanna模块logger"""
    return get_logger(name, "vanna")

# 上下文管理便捷方法
def set_log_context(**kwargs):
    """设置日志上下文(可选)
    示例: set_log_context(user_id='user123', session_id='sess456')
    """
    _log_manager.set_context(**kwargs)

def clear_log_context():
    """清除日志上下文"""
    _log_manager.clear_context()

# 异步日志便捷方法
async def alog_info(logger: logging.Logger, message: str, **kwargs):
    """异步记录INFO日志"""
    await _log_manager.alog(logger, 'info', message, **kwargs)

async def alog_error(logger: logging.Logger, message: str, **kwargs):
    """异步记录ERROR日志"""
    await _log_manager.alog(logger, 'error', message, **kwargs)

async def alog_debug(logger: logging.Logger, message: str, **kwargs):
    """异步记录DEBUG日志"""
    await _log_manager.alog(logger, 'debug', message, **kwargs)

async def alog_warning(logger: logging.Logger, message: str, **kwargs):
    """异步记录WARNING日志"""
    await _log_manager.alog(logger, 'warning', message, **kwargs)

2.4 日志配置文件(支持上下文信息)

# config/logging_config.yaml
version: 1

# 全局配置
global:
  base_level: INFO
  
# 默认配置
default:
  level: INFO
  console:
    enabled: true
    level: INFO
    format: "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
  file:
    enabled: true
    level: DEBUG
    filename: "app.log"
    # 支持上下文信息,但有默认值避免错误
    format: "%(asctime)s [%(levelname)s] [%(name)s] [user:%(user_id)s] [session:%(session_id)s] %(filename)s:%(lineno)d - %(message)s"
    rotation:
      enabled: true
      max_size: "50MB"
      backup_count: 10

# 模块特定配置
modules:
  data_pipeline:
    level: DEBUG
    console:
      enabled: true
      level: INFO
      format: "🔄 %(asctime)s [%(levelname)s] Pipeline: %(message)s"
    file:
      enabled: true
      level: DEBUG
      filename: "data_pipeline.log"
      format: "%(asctime)s [%(levelname)s] [%(name)s] %(filename)s:%(lineno)d - %(message)s"
      rotation:
        enabled: true
        max_size: "30MB"
        backup_count: 8
  
  agent:
    level: DEBUG
    console:
      enabled: true
      level: INFO
      format: "🤖 %(asctime)s [%(levelname)s] Agent: %(message)s"
    file:
      enabled: true
      level: DEBUG
      filename: "agent.log"
      # Agent模块支持user_id和session_id
      format: "%(asctime)s [%(levelname)s] [%(name)s] [user:%(user_id)s] [session:%(session_id)s] %(filename)s:%(lineno)d - %(message)s"
      rotation:
        enabled: true
        max_size: "30MB"
        backup_count: 8
  
  vanna:
    level: INFO
    console:
      enabled: true
      level: INFO
      format: "🧠 %(asctime)s [%(levelname)s] Vanna: %(message)s"
    file:
      enabled: true
      level: DEBUG
      filename: "vanna.log"
      format: "%(asctime)s [%(levelname)s] [%(name)s] %(filename)s:%(lineno)d - %(message)s"
      rotation:
        enabled: true
        max_size: "20MB"
        backup_count: 5

3. 改造实施步骤

3.1 第一阶段:基础架构搭建

  1. 创建日志服务目录结构

    mkdir -p core/logging
    mkdir -p config
    mkdir -p logs
    
    1. 实现核心组件
    2. 创建 core/logging/log_manager.py
    3. 创建 core/logging/__init__.py
    4. 创建 config/logging_config.yaml

    5. 集成到citu_app.py(主要应用) ```python

      在citu_app.py的开头添加

      from core.logging import initialize_logging, get_logger, set_log_context, clear_log_context import uuid

    初始化日志系统

    initialize_logging("config/logging_config.yaml") app_logger = get_logger("CituApp", "default")

    在Flask应用配置后集成请求级别的日志上下文

    @app.flask_app.before_request def before_request(): # 为每个请求设置上下文(如果有的话) request_id = str(uuid.uuid4())[:8] user_id = request.headers.get('X-User-ID', 'anonymous') set_log_context(request_id=request_id, user_id=user_id)

    @app.flask_app.after_request def after_request(response): # 清理上下文 clear_log_context() return response ```

3.2 第二阶段:模块改造

3.2.1 改造data_pipeline模块

# 替换 data_pipeline/utils/logger.py 中的使用方式
from core.logging import get_data_pipeline_logger

def setup_logging(verbose: bool = False, log_file: str = None, log_dir: str = None):
    """
    保持原有接口,内部使用新的日志系统
    """
    # 不再需要复杂的设置,直接使用统一日志系统
    pass

# 在各个文件中使用
# data_pipeline/qa_generation/qs_agent.py
class QuestionSQLGenerationAgent:
    def __init__(self, ...):
        # 替换原有的 logging.getLogger("schema_tools.QSAgent")
        self.logger = get_data_pipeline_logger("QSAgent")
        
    async def generate(self):
        self.logger.info("🚀 开始生成Question-SQL训练数据")
        # ... 其他代码
        
        # 手动记录关键节点的时间
        start_time = time.time()
        self.logger.info("开始初始化LLM组件")
        
        self._initialize_llm_components()
        
        init_time = time.time() - start_time
        self.logger.info(f"LLM组件初始化完成,耗时: {init_time:.2f}秒")

3.2.2 改造Agent模块(支持可选的用户上下文)

# 在ask_agent接口中使用
@app.flask_app.route('/api/v0/ask_agent', methods=['POST'])
def ask_agent():
    logger = get_agent_logger("AskAgent")
    
    try:
        data = request.json
        question = data.get('question', '')
        user_id = data.get('user_id')  # 可选
        session_id = data.get('session_id')  # 可选
        
        # 设置上下文(如果有的话)
        if user_id or session_id:
            set_log_context(user_id=user_id or 'anonymous', session_id=session_id or 'N/A')
        
        logger.info(f"收到问题: {question[:50]}...")
        
        # 异步记录示例(在async函数中)
        # await alog_info(logger, f"开始处理问题: {question}")
        
        # ... 其他处理逻辑
        
    except Exception as e:
        logger.error(f"处理失败: {str(e)}", exc_info=True)
        # ...

3.2.3 改造vanna相关代码

由于vanna使用print方式,创建简单的适配器:

# core/logging/vanna_adapter.py
from core.logging import get_vanna_logger

class VannaLogAdapter:
    """Vanna日志适配器 - 将print转换为logger调用"""
    
    def __init__(self, logger_name: str = "VannaBase"):
        self.logger = get_vanna_logger(logger_name)
    
    def log(self, message: str):
        """替换vanna的log方法"""
        # 根据内容判断日志级别
        message_lower = message.lower()
        if any(keyword in message_lower for keyword in ['error', 'exception', 'fail']):
            self.logger.error(message)
        elif any(keyword in message_lower for keyword in ['warning', 'warn']):
            self.logger.warning(message)
        else:
            self.logger.info(message)

# 使用装饰器改造vanna实例
def enhance_vanna_logging(vanna_instance):
    """增强vanna实例的日志功能"""
    adapter = VannaLogAdapter(vanna_instance.__class__.__name__)
    
    # 替换log方法
    vanna_instance.log = adapter.log
    return vanna_instance

# 在vanna实例创建时使用
# core/vanna_llm_factory.py
from core.logging.vanna_adapter import enhance_vanna_logging

def create_vanna_instance():
    # 原有创建逻辑
    vn = VannaDefault(...)
    
    # 增强日志功能
    vn = enhance_vanna_logging(vn)
    
    return vn

3.3 第三阶段:workflow级别的时间统计

对于跨多个函数的执行时间统计,在关键业务节点手动记录:

# data_pipeline/schema_workflow.py
import time
from core.logging import get_data_pipeline_logger

class SchemaWorkflowOrchestrator:
    def __init__(self, ...):
        self.logger = get_data_pipeline_logger("SchemaWorkflow")
    
    async def run_full_workflow(self):
        """执行完整工作流"""
        workflow_start = time.time()
        self.logger.info("🚀 开始执行完整的Schema工作流")
        
        try:
            # 步骤1:生成DDL和MD文件
            step1_start = time.time()
            self.logger.info("📝 步骤1: 开始生成DDL和MD文件")
            
            result1 = await self.generate_ddl_md()
            
            step1_time = time.time() - step1_start
            self.logger.info(f"✅ 步骤1完成,生成了{result1['ddl_count']}个DDL文件和{result1['md_count']}个MD文件,耗时: {step1_time:.2f}秒")
            
            # 步骤2:生成Question-SQL对
            step2_start = time.time()
            self.logger.info("❓ 步骤2: 开始生成Question-SQL对")
            
            result2 = await self.generate_qa_pairs()
            
            step2_time = time.time() - step2_start
            self.logger.info(f"✅ 步骤2完成,生成了{result2['qa_count']}个问答对,耗时: {step2_time:.2f}秒")
            
            # 步骤3:验证SQL
            step3_start = time.time()
            self.logger.info("🔍 步骤3: 开始验证SQL")
            
            result3 = await self.validate_sql()
            
            step3_time = time.time() - step3_start
            self.logger.info(f"✅ 步骤3完成,验证了{result3['validated_count']}个SQL,修复了{result3['fixed_count']}个,耗时: {step3_time:.2f}秒")
            
            # 步骤4:加载训练数据
            step4_start = time.time()
            self.logger.info("📚 步骤4: 开始加载训练数据")
            
            result4 = await self.load_training_data()
            
            step4_time = time.time() - step4_start
            self.logger.info(f"✅ 步骤4完成,加载了{result4['loaded_count']}个训练文件,耗时: {step4_time:.2f}秒")
            
            # 总结
            total_time = time.time() - workflow_start
            self.logger.info(f"🎉 完整工作流执行成功!总耗时: {total_time:.2f}秒")
            self.logger.info(f"   - DDL/MD生成: {step1_time:.2f}秒")
            self.logger.info(f"   - QA生成: {step2_time:.2f}秒")  
            self.logger.info(f"   - SQL验证: {step3_time:.2f}秒")
            self.logger.info(f"   - 数据加载: {step4_time:.2f}秒")
            
            return {
                "success": True,
                "total_time": total_time,
                "steps": {
                    "ddl_md": {"time": step1_time, "result": result1},
                    "qa_generation": {"time": step2_time, "result": result2},
                    "sql_validation": {"time": step3_time, "result": result3},
                    "data_loading": {"time": step4_time, "result": result4}
                }
            }
            
        except Exception as e:
            total_time = time.time() - workflow_start
            self.logger.error(f"❌ 工作流执行失败,耗时: {total_time:.2f}秒,错误: {str(e)}")
            raise

4. 实际使用示例

4.1 在citu_app.py中的使用(主要应用)

# citu_app.py
from core.logging import initialize_logging, get_logger, set_log_context, clear_log_context
import uuid

# 应用启动时初始化
initialize_logging("config/logging_config.yaml")
app_logger = get_logger("CituApp", "default")

# API端点示例
@app.flask_app.route('/api/v0/ask_agent', methods=['POST'])
def ask_agent():
    logger = get_agent_logger("AskAgent")
    request_id = str(uuid.uuid4())[:8]
    
    try:
        data = request.json
        user_id = data.get('user_id')
        
        # 设置上下文(安全的,即使没有user_id)
        set_log_context(
            request_id=request_id,
            user_id=user_id or 'anonymous'
        )
        
        logger.info(f"开始处理请求")
        # ... 业务逻辑
        
        logger.info(f"请求处理成功")
        return success_response(...)
        
    except Exception as e:
        logger.error(f"请求处理失败: {str(e)}", exc_info=True)
        return error_response(...)
    finally:
        clear_log_context()

4.2 在data_pipeline中的使用

# data_pipeline/ddl_generation/training_data_agent.py
from core.logging import get_data_pipeline_logger
import time

class SchemaTrainingDataAgent:
    def __init__(self, db_config, output_dir):
        self.logger = get_data_pipeline_logger("TrainingDataAgent")
        self.db_config = db_config
        self.output_dir = output_dir
        
    async def process_tables(self, table_list):
        """处理表列表"""
        start_time = time.time()
        self.logger.info(f"开始处理{len(table_list)}个表的训练数据生成")
        
        success_count = 0
        failed_tables = []
        
        for table in table_list:
            try:
                table_start = time.time()
                self.logger.debug(f"开始处理表: {table}")
                
                await self._process_single_table(table)
                
                table_time = time.time() - table_start
                self.logger.info(f"表 {table} 处理完成,耗时: {table_time:.2f}秒")
                success_count += 1
                
            except Exception as e:
                self.logger.error(f"表 {table} 处理失败: {str(e)}")
                failed_tables.append(table)
        
        total_time = time.time() - start_time
        self.logger.info(f"批量处理完成,成功: {success_count}个,失败: {len(failed_tables)}个,总耗时: {total_time:.2f}秒")
        
        if failed_tables:
            self.logger.warning(f"处理失败的表: {failed_tables}")
            
        return {
            "success_count": success_count,
            "failed_count": len(failed_tables),
            "failed_tables": failed_tables,
            "total_time": total_time
        }

4.3 在Agent中的使用(支持异步)

# agent/citu_agent.py
from core.logging import get_agent_logger, alog_info, alog_error

class CituLangGraphAgent:
    def __init__(self):
        self.logger = get_agent_logger("CituAgent")
    
    async def process_question(self, question: str, session_id: str = None, user_id: str = None):
        """异步处理问题"""
        # 设置上下文(如果有的话)
        if user_id or session_id:
            set_log_context(user_id=user_id or 'anonymous', session_id=session_id or 'N/A')
        
        # 同步日志
        self.logger.info(f"开始处理问题: {question[:50]}...")
        
        try:
            # 异步日志
            await alog_info(self.logger, f"开始分类问题")
            
            # 业务逻辑
            result = await self._classify_question(question)
            
            await alog_info(self.logger, f"分类完成: {result.question_type}")
            
            return result
            
        except Exception as e:
            await alog_error(self.logger, f"处理失败: {str(e)}")
            raise

4.4 增强vanna日志

# core/vanna_llm_factory.py
from core.logging.vanna_adapter import enhance_vanna_logging
from core.logging import get_vanna_logger

def create_vanna_instance():
    """创建增强了日志功能的vanna实例"""
    logger = get_vanna_logger("VannaFactory")
    logger.info("🧠 开始创建Vanna实例")
    
    try:
        # 原有创建逻辑
        vn = VannaDefault(
            config={
                'api_key': os.getenv('OPENAI_API_KEY'),
                'model': 'gpt-4'
            }
        )
        
        # 增强日志功能
        vn = enhance_vanna_logging(vn)
        
        logger.info("✅ Vanna实例创建成功")
        return vn
        
    except Exception as e:
        logger.error(f"❌ Vanna实例创建失败: {str(e)}")
        raise

5. 配置调优建议

5.1 开发环境配置

# config/logging_config_dev.yaml
version: 1

global:
  base_level: DEBUG

default:
  level: DEBUG
  console:
    enabled: true
    level: DEBUG
  file:
    enabled: false  # 开发环境可以只用控制台

modules:
  data_pipeline:
    level: DEBUG
    console:
      enabled: true
      level: DEBUG
      format: "🔄 %(asctime)s [%(levelname)s] Pipeline: %(message)s"
    file:
      enabled: true
      level: DEBUG
      filename: "data_pipeline.log"
      
  agent:
    level: DEBUG
    console:
      enabled: true
      level: DEBUG
      format: "🤖 %(asctime)s [%(levelname)s] Agent: %(message)s"

5.2 生产环境配置

# config/logging_config_prod.yaml
version: 1

global:
  base_level: INFO

default:
  level: INFO
  console:
    enabled: false  # 生产环境不输出到控制台
  file:
    enabled: true
    level: INFO
    rotation:
      enabled: true
      max_size: "100MB"
      backup_count: 20

modules:
  data_pipeline:
    level: INFO
    console:
      enabled: false
    file:
      enabled: true
      level: INFO
      filename: "data_pipeline.log"
      rotation:
        enabled: true
        max_size: "50MB"
        backup_count: 15
        
  langchain:
    level: ERROR  # 生产环境只记录错误
    console:
      enabled: false
    file:
      enabled: true
      level: ERROR

6. 注意事项

基于用户反馈,特别注意以下几点:

  1. 上下文安全性:即使没有用户信息,日志系统也能正常工作(使用默认值)
  2. 降级策略:当文件系统不可用时,自动降级到控制台输出
  3. 异步支持:在async函数中使用异步日志方法,避免阻塞
  4. 主应用聚焦:重点关注citu_app.py的集成,忽略flask_app.py和chainlit_app.py
  5. 性能考虑:保持原有的跨函数时间统计方式,不强制使用装饰器

7. 总结

这个精简实用的日志改造方案提供了:

  1. 统一的日志管理:类似Log4j的架构,单一配置文件管理所有日志
  2. 模块化日志文件:每个模块独立的日志文件,便于问题定位
  3. 自动日志轮转:使用RotatingFileHandler自动管理日志文件大小和数量
  4. 灵活的配置:支持不同环境的配置,控制台和文件输出可独立配置
  5. 简单易用:提供便捷的API,一行代码获取对应模块的logger
  6. 性能友好:手动记录关键节点时间,不影响整体性能
  7. 技术栈兼容:专门为vanna/langchain/langgraph设计适配器
  8. 异步支持:适配项目大量使用async/await的特点
  9. 安全容错:上下文信息可选,文件系统可降级

该方案专注核心功能,去掉了不必要的复杂性,是一个可以直接落地实施的实用设计。