基于您的需求和反馈,设计一套统一的日志服务,专注核心功能:
项目根目录/
├── core/
│ └── logging/
│ ├── __init__.py # 日志服务入口
│ └── log_manager.py # 核心日志管理器
├── logs/ # 日志文件目录
│ ├── app.log # 主应用日志(citu_app.py和通用模块)
│ ├── agent.log # agent模块日志
│ ├── vanna.log # vanna相关模块日志
│ └── data_pipeline.log # data_pipeline模块日志
└── config/
└── logging_config.yaml # 日志配置文件
# core/logging/log_manager.py
import logging
import logging.handlers
import os
from typing import Dict, Optional
from pathlib import Path
import yaml
import contextvars
# 上下文变量,存储可选的上下文信息
log_context = contextvars.ContextVar('log_context', default={})
class ContextFilter(logging.Filter):
"""添加上下文信息到日志记录"""
def filter(self, record):
ctx = log_context.get()
# 设置默认值,避免格式化错误
record.session_id = ctx.get('session_id', 'N/A')
record.user_id = ctx.get('user_id', 'anonymous')
record.request_id = ctx.get('request_id', 'N/A')
return True
class LogManager:
"""统一日志管理器 - 类似Log4j的功能"""
_instance = None
_loggers: Dict[str, logging.Logger] = {}
_initialized = False
_fallback_to_console = False # 标记是否降级到控制台
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if not self._initialized:
self.config = None
self.base_log_dir = Path("logs")
self._setup_base_directory()
LogManager._initialized = True
def initialize(self, config_path: str = "config/logging_config.yaml"):
"""初始化日志系统"""
self.config = self._load_config(config_path)
self._setup_base_directory()
self._configure_root_logger()
def get_logger(self, name: str, module: str = "default") -> logging.Logger:
"""获取指定模块的logger"""
logger_key = f"{module}.{name}"
if logger_key not in self._loggers:
logger = logging.getLogger(logger_key)
self._configure_logger(logger, module)
self._loggers[logger_key] = logger
return self._loggers[logger_key]
def set_context(self, **kwargs):
"""设置日志上下文(可选)"""
ctx = log_context.get()
ctx.update(kwargs)
log_context.set(ctx)
def clear_context(self):
"""清除日志上下文"""
log_context.set({})
def _load_config(self, config_path: str) -> dict:
"""加载配置文件"""
try:
with open(config_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
except FileNotFoundError:
print(f"[WARNING] 配置文件 {config_path} 未找到,使用默认配置")
return self._get_default_config()
except Exception as e:
print(f"[ERROR] 加载配置文件失败: {e},使用默认配置")
return self._get_default_config()
def _get_default_config(self) -> dict:
"""获取默认配置"""
return {
'global': {'base_level': 'INFO'},
'default': {
'level': 'INFO',
'console': {
'enabled': True,
'level': 'INFO',
'format': '%(asctime)s [%(levelname)s] %(name)s: %(message)s'
},
'file': {
'enabled': True,
'level': 'DEBUG',
'filename': 'app.log',
'format': '%(asctime)s [%(levelname)s] [%(name)s] [user:%(user_id)s] [session:%(session_id)s] %(filename)s:%(lineno)d - %(message)s',
'rotation': {
'enabled': True,
'max_size': '50MB',
'backup_count': 10
}
}
},
'modules': {}
}
def _setup_base_directory(self):
"""创建日志目录(带降级策略)"""
try:
self.base_log_dir.mkdir(parents=True, exist_ok=True)
self._fallback_to_console = False
except Exception as e:
print(f"[WARNING] 无法创建日志目录 {self.base_log_dir},将只使用控制台输出: {e}")
self._fallback_to_console = True
def _configure_root_logger(self):
"""配置根日志器"""
root_logger = logging.getLogger()
root_logger.setLevel(getattr(logging, self.config['global']['base_level'].upper()))
def _configure_logger(self, logger: logging.Logger, module: str):
"""配置具体的logger"""
module_config = self.config.get('modules', {}).get(module, self.config['default'])
# 设置日志级别
level = getattr(logging, module_config['level'].upper())
logger.setLevel(level)
# 清除已有处理器
logger.handlers.clear()
logger.propagate = False
# 添加控制台处理器
if module_config.get('console', {}).get('enabled', True):
console_handler = self._create_console_handler(module_config['console'])
console_handler.addFilter(ContextFilter())
logger.addHandler(console_handler)
# 添加文件处理器(如果没有降级到控制台)
if not self._fallback_to_console and module_config.get('file', {}).get('enabled', True):
try:
file_handler = self._create_file_handler(module_config['file'], module)
file_handler.addFilter(ContextFilter())
logger.addHandler(file_handler)
except Exception as e:
print(f"[WARNING] 无法创建文件处理器: {e}")
def _create_console_handler(self, console_config: dict) -> logging.StreamHandler:
"""创建控制台处理器"""
handler = logging.StreamHandler()
handler.setLevel(getattr(logging, console_config.get('level', 'INFO').upper()))
formatter = logging.Formatter(
console_config.get('format', '%(asctime)s [%(levelname)s] %(name)s: %(message)s'),
datefmt='%Y-%m-%d %H:%M:%S'
)
handler.setFormatter(formatter)
return handler
def _create_file_handler(self, file_config: dict, module: str) -> logging.Handler:
"""创建文件处理器(支持自动轮转)"""
log_file = self.base_log_dir / file_config.get('filename', f'{module}.log')
# 使用RotatingFileHandler实现自动轮转和清理
rotation_config = file_config.get('rotation', {})
if rotation_config.get('enabled', False):
handler = logging.handlers.RotatingFileHandler(
log_file,
maxBytes=self._parse_size(rotation_config.get('max_size', '50MB')),
backupCount=rotation_config.get('backup_count', 10),
encoding='utf-8'
)
else:
handler = logging.FileHandler(log_file, encoding='utf-8')
handler.setLevel(getattr(logging, file_config.get('level', 'DEBUG').upper()))
formatter = logging.Formatter(
file_config.get('format', '%(asctime)s [%(levelname)s] [%(name)s] %(filename)s:%(lineno)d - %(message)s'),
datefmt='%Y-%m-%d %H:%M:%S'
)
handler.setFormatter(formatter)
return handler
def _parse_size(self, size_str: str) -> int:
"""解析大小字符串,如 '50MB' -> 字节数"""
size_str = size_str.upper()
if size_str.endswith('KB'):
return int(size_str[:-2]) * 1024
elif size_str.endswith('MB'):
return int(size_str[:-2]) * 1024 * 1024
elif size_str.endswith('GB'):
return int(size_str[:-2]) * 1024 * 1024 * 1024
else:
return int(size_str)
# core/logging/__init__.py
from .log_manager import LogManager
import logging
# 全局日志管理器实例
_log_manager = LogManager()
def initialize_logging(config_path: str = "config/logging_config.yaml"):
"""初始化项目日志系统"""
_log_manager.initialize(config_path)
def get_logger(name: str, module: str = "default") -> logging.Logger:
"""获取logger实例 - 主要API"""
return _log_manager.get_logger(name, module)
# 便捷方法
def get_data_pipeline_logger(name: str) -> logging.Logger:
"""获取data_pipeline模块logger"""
return get_logger(name, "data_pipeline")
def get_agent_logger(name: str) -> logging.Logger:
"""获取agent模块logger"""
return get_logger(name, "agent")
def get_vanna_logger(name: str) -> logging.Logger:
"""获取vanna模块logger"""
return get_logger(name, "vanna")
def get_app_logger(name: str) -> logging.Logger:
"""获取app模块logger"""
return get_logger(name, "app")
# 上下文管理便捷方法
def set_log_context(**kwargs):
"""设置日志上下文(可选)
示例: set_log_context(user_id='user123', session_id='sess456')
"""
_log_manager.set_context(**kwargs)
def clear_log_context():
"""清除日志上下文"""
_log_manager.clear_context()
# config/logging_config.yaml
version: 1
# 全局配置
global:
base_level: INFO
# 默认配置(用于app.log)
default:
level: INFO
console:
enabled: true
level: INFO
format: "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
file:
enabled: true
level: DEBUG
filename: "app.log"
format: "%(asctime)s [%(levelname)s] [%(name)s] [user:%(user_id)s] [session:%(session_id)s] %(filename)s:%(lineno)d - %(message)s"
rotation:
enabled: true
max_size: "50MB"
backup_count: 10
# 模块特定配置
modules:
app:
level: INFO
console:
enabled: true
level: INFO
format: "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
file:
enabled: true
level: DEBUG
filename: "app.log"
format: "%(asctime)s [%(levelname)s] [%(name)s] [user:%(user_id)s] [session:%(session_id)s] %(filename)s:%(lineno)d - %(message)s"
rotation:
enabled: true
max_size: "50MB"
backup_count: 10
data_pipeline:
level: DEBUG
console:
enabled: true
level: INFO
format: "%(asctime)s [%(levelname)s] Pipeline: %(message)s"
file:
enabled: true
level: DEBUG
filename: "data_pipeline.log"
format: "%(asctime)s [%(levelname)s] [%(name)s] %(filename)s:%(lineno)d - %(message)s"
rotation:
enabled: true
max_size: "30MB"
backup_count: 8
agent:
level: DEBUG
console:
enabled: true
level: INFO
format: "%(asctime)s [%(levelname)s] Agent: %(message)s"
file:
enabled: true
level: DEBUG
filename: "agent.log"
format: "%(asctime)s [%(levelname)s] [%(name)s] [user:%(user_id)s] [session:%(session_id)s] %(filename)s:%(lineno)d - %(message)s"
rotation:
enabled: true
max_size: "30MB"
backup_count: 8
vanna:
level: INFO
console:
enabled: true
level: INFO
format: "%(asctime)s [%(levelname)s] Vanna: %(message)s"
file:
enabled: true
level: DEBUG
filename: "vanna.log"
format: "%(asctime)s [%(levelname)s] [%(name)s] %(filename)s:%(lineno)d - %(message)s"
rotation:
enabled: true
max_size: "20MB"
backup_count: 5
创建日志服务目录结构
mkdir -p core/logging
mkdir -p config
mkdir -p logs
core/logging/log_manager.py
core/logging/__init__.py
创建 config/logging_config.yaml
集成到citu_app.py ```python
from core.logging import initialize_logging, get_app_logger, set_log_context, clear_log_context
initialize_logging("config/logging_config.yaml") app_logger = get_app_logger("CituApp")
@app.flask_app.before_request def before_request(): # 为每个请求设置上下文(如果有的话) request_id = str(uuid.uuid4())[:8] user_id = request.headers.get('X-User-ID', 'anonymous') set_log_context(request_id=request_id, user_id=user_id)
@app.flask_app.after_request def after_request(response): # 清理上下文 clear_log_context() return response ```
# 替换所有print语句
# agent/citu_agent.py
from core.logging import get_agent_logger
class CituLangGraphAgent:
def __init__(self):
self.logger = get_agent_logger("CituAgent")
self.logger.info("LangGraph Agent初始化")
# 直接替换原有的 print 语句
# 方案一:完全删除 data_pipeline/utils/logger.py
# 直接在所有使用位置导入新的日志系统
# 方案二:保留文件但清空实现
# data_pipeline/utils/logger.py
"""
原有日志系统已被新的统一日志系统替代
保留此文件仅为避免导入错误
"""
from core.logging import get_data_pipeline_logger
def setup_logging(verbose: bool = False, log_file: str = None, log_dir: str = None):
"""函数保留以避免调用错误,但不做任何事"""
pass
def get_logger(name: str = "DataPipeline"):
"""直接返回新的logger"""
return get_data_pipeline_logger(name)
# 在各个文件中直接使用新的logger
# data_pipeline/schema_workflow.py
from core.logging import get_data_pipeline_logger
class SchemaWorkflowOrchestrator:
def __init__(self, ...):
# 原来: self.logger = logging.getLogger("schema_tools.SchemaWorkflowOrchestrator")
# 现在: 直接使用新系统
self.logger = get_data_pipeline_logger("SchemaWorkflow")
# data_pipeline/qa_generation/qs_agent.py
from core.logging import get_data_pipeline_logger
class QuestionSQLGenerationAgent:
def __init__(self, ...):
# 原来: self.logger = logging.getLogger("schema_tools.QSAgent")
# 现在: 直接替换
self.logger = get_data_pipeline_logger("QSAgent")
# customllm/base_llm_chat.py
from core.logging import get_vanna_logger
class BaseLLMChat(VannaBase, ABC):
def __init__(self, config=None):
VannaBase.__init__(self, config=config)
self.logger = get_vanna_logger("BaseLLMChat")
# 替换原有的print
self.logger.info("传入的 config 参数如下:")
for key, value in self.config.items():
self.logger.info(f" {key}: {value}")
# common/qa_feedback_manager.py
from core.logging import get_app_logger
class QAFeedbackManager:
def __init__(self, vanna_instance=None):
self.logger = get_app_logger("QAFeedbackManager")
# 替换原有的print
self.logger.info("QAFeedbackManager 初始化")
# citu_app.py
from core.logging import initialize_logging, get_app_logger, get_agent_logger, set_log_context
import uuid
# 应用启动时初始化
initialize_logging("config/logging_config.yaml")
app_logger = get_app_logger("CituApp")
# API端点示例
@app.flask_app.route('/api/v0/ask_agent', methods=['POST'])
def ask_agent():
logger = get_agent_logger("AskAgent")
request_id = str(uuid.uuid4())[:8]
try:
data = request.json
user_id = data.get('user_id')
# 设置上下文(安全的,即使没有user_id)
set_log_context(
request_id=request_id,
user_id=user_id or 'anonymous'
)
logger.info("开始处理请求")
# ... 业务逻辑
logger.info("请求处理成功")
return success_response(...)
except Exception as e:
logger.error(f"请求处理失败: {str(e)}", exc_info=True)
return error_response(...)
finally:
clear_log_context()
# data_pipeline/schema_workflow.py
import time
from core.logging import get_data_pipeline_logger
class SchemaWorkflowOrchestrator:
def __init__(self, ...):
self.logger = get_data_pipeline_logger("SchemaWorkflow")
async def execute_complete_workflow(self):
"""执行完整工作流"""
workflow_start = time.time()
self.logger.info("开始执行完整的Schema工作流")
# 保持原有的跨函数时间统计逻辑
try:
# 步骤1
step1_start = time.time()
self.logger.info("步骤1: 开始生成DDL和MD文件")
# ... 执行逻辑
step1_time = time.time() - step1_start
self.logger.info(f"步骤1完成,耗时: {step1_time:.2f}秒")
# 继续其他步骤...
except Exception as e:
self.logger.error(f"工作流执行失败: {str(e)}", exc_info=True)
raise
# config/logging_config_dev.yaml
version: 1
global:
base_level: DEBUG
default:
level: DEBUG
console:
enabled: true
level: DEBUG
file:
enabled: false # 开发环境可以只用控制台
# config/logging_config_prod.yaml
version: 1
global:
base_level: INFO
default:
level: INFO
console:
enabled: false # 生产环境不输出到控制台
file:
enabled: true
level: INFO
rotation:
enabled: true
max_size: "100MB"
backup_count: 20
这个优化后的方案特点:
该方案已根据您的反馈完全优化,可以直接落地实施。