基于您的需求,设计一套类似Log4j的统一日志服务,专注核心功能:
项目根目录/
├── core/
│ └── logging/
│ ├── __init__.py # 日志服务入口
│ └── log_manager.py # 核心日志管理器
├── logs/ # 日志文件目录
│ ├── data_pipeline.log # data_pipeline模块日志
│ ├── agent.log # agent模块日志
│ ├── vanna.log # vanna模块日志
│ ├── langchain.log # langchain模块日志
│ ├── langgraph.log # langgraph模块日志
│ └── app.log # 主应用日志
└── config/
└── logging_config.yaml # 日志配置文件
基于用户反馈,增强版包含以下特性:
# core/logging/log_manager.py
import logging
import logging.handlers
import os
from typing import Dict, Optional
from pathlib import Path
import yaml
import asyncio
from concurrent.futures import ThreadPoolExecutor
import contextvars
# 上下文变量,存储可选的上下文信息
log_context = contextvars.ContextVar('log_context', default={})
class ContextFilter(logging.Filter):
"""添加上下文信息到日志记录"""
def filter(self, record):
ctx = log_context.get()
# 设置默认值,避免格式化错误
record.session_id = ctx.get('session_id', 'N/A')
record.user_id = ctx.get('user_id', 'anonymous')
record.request_id = ctx.get('request_id', 'N/A')
return True
class LogManager:
"""统一日志管理器 - 类似Log4j的功能"""
_instance = None
_loggers: Dict[str, logging.Logger] = {}
_initialized = False
_executor = None
_fallback_to_console = False # 标记是否降级到控制台
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if not self._initialized:
self.config = None
self.base_log_dir = Path("logs")
self._executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="log")
self._setup_base_directory()
LogManager._initialized = True
def initialize(self, config_path: str = "config/logging_config.yaml"):
"""初始化日志系统"""
self.config = self._load_config(config_path)
self._setup_base_directory()
self._configure_root_logger()
def get_logger(self, name: str, module: str = "default") -> logging.Logger:
"""获取指定模块的logger"""
logger_key = f"{module}.{name}"
if logger_key not in self._loggers:
logger = logging.getLogger(logger_key)
self._configure_logger(logger, module)
self._loggers[logger_key] = logger
return self._loggers[logger_key]
async def alog(self, logger: logging.Logger, level: str, message: str, **kwargs):
"""异步日志方法"""
loop = asyncio.get_event_loop()
await loop.run_in_executor(
self._executor,
lambda: getattr(logger, level)(message, **kwargs)
)
def set_context(self, **kwargs):
"""设置日志上下文(可选)"""
ctx = log_context.get()
ctx.update(kwargs)
log_context.set(ctx)
def clear_context(self):
"""清除日志上下文"""
log_context.set({})
def _load_config(self, config_path: str) -> dict:
"""加载配置文件(带错误处理)"""
try:
with open(config_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
except FileNotFoundError:
print(f"[WARNING] 配置文件 {config_path} 未找到,使用默认配置")
return self._get_default_config()
except Exception as e:
print(f"[ERROR] 加载配置文件失败: {e},使用默认配置")
return self._get_default_config()
def _setup_base_directory(self):
"""创建日志目录(带降级策略)"""
try:
self.base_log_dir.mkdir(parents=True, exist_ok=True)
self._fallback_to_console = False
except Exception as e:
print(f"[WARNING] 无法创建日志目录 {self.base_log_dir},将只使用控制台输出: {e}")
self._fallback_to_console = True
def _configure_logger(self, logger: logging.Logger, module: str):
"""配置具体的logger(支持降级)"""
module_config = self.config.get('modules', {}).get(module, self.config['default'])
# 设置日志级别
level = getattr(logging, module_config['level'].upper())
logger.setLevel(level)
# 清除已有处理器
logger.handlers.clear()
logger.propagate = False
# 添加控制台处理器
if module_config.get('console', {}).get('enabled', True):
console_handler = self._create_console_handler(module_config['console'])
console_handler.addFilter(ContextFilter())
logger.addHandler(console_handler)
# 添加文件处理器(如果没有降级到控制台)
if not self._fallback_to_console and module_config.get('file', {}).get('enabled', True):
try:
file_handler = self._create_file_handler(module_config['file'], module)
file_handler.addFilter(ContextFilter())
logger.addHandler(file_handler)
except Exception as e:
print(f"[WARNING] 无法创建文件处理器: {e}")
def _get_default_config(self) -> dict:
"""获取默认配置"""
return {
'global': {'base_level': 'INFO'},
'default': {
'level': 'INFO',
'console': {
'enabled': True,
'level': 'INFO',
'format': '%(asctime)s [%(levelname)s] %(name)s: %(message)s'
},
'file': {
'enabled': True,
'level': 'DEBUG',
'filename': 'app.log',
'format': '%(asctime)s [%(levelname)s] [%(name)s] [user:%(user_id)s] [session:%(session_id)s] %(filename)s:%(lineno)d - %(message)s',
'rotation': {
'enabled': True,
'max_size': '50MB',
'backup_count': 10
}
}
},
'modules': {}
}
def _create_console_handler(self, console_config: dict) -> logging.StreamHandler:
"""创建控制台处理器"""
handler = logging.StreamHandler()
handler.setLevel(getattr(logging, console_config.get('level', 'INFO').upper()))
formatter = logging.Formatter(
console_config.get('format', '%(asctime)s [%(levelname)s] %(name)s: %(message)s'),
datefmt='%Y-%m-%d %H:%M:%S'
)
handler.setFormatter(formatter)
return handler
def _create_file_handler(self, file_config: dict, module: str) -> logging.Handler:
"""创建文件处理器(支持自动轮转)"""
log_file = self.base_log_dir / file_config.get('filename', f'{module}.log')
# 使用RotatingFileHandler实现自动轮转和清理
rotation_config = file_config.get('rotation', {})
if rotation_config.get('enabled', False):
handler = logging.handlers.RotatingFileHandler(
log_file,
maxBytes=self._parse_size(rotation_config.get('max_size', '50MB')),
backupCount=rotation_config.get('backup_count', 10),
encoding='utf-8'
)
else:
handler = logging.FileHandler(log_file, encoding='utf-8')
handler.setLevel(getattr(logging, file_config.get('level', 'DEBUG').upper()))
formatter = logging.Formatter(
file_config.get('format', '%(asctime)s [%(levelname)s] [%(name)s] %(filename)s:%(lineno)d - %(message)s'),
datefmt='%Y-%m-%d %H:%M:%S'
)
handler.setFormatter(formatter)
return handler
def _parse_size(self, size_str: str) -> int:
"""解析大小字符串,如 '50MB' -> 字节数"""
size_str = size_str.upper()
if size_str.endswith('KB'):
return int(size_str[:-2]) * 1024
elif size_str.endswith('MB'):
return int(size_str[:-2]) * 1024 * 1024
elif size_str.endswith('GB'):
return int(size_str[:-2]) * 1024 * 1024 * 1024
else:
return int(size_str)
def __del__(self):
"""清理资源"""
if self._executor:
self._executor.shutdown(wait=False)
# core/logging/__init__.py
from .log_manager import LogManager
import logging
# 全局日志管理器实例
_log_manager = LogManager()
def initialize_logging(config_path: str = "config/logging_config.yaml"):
"""初始化项目日志系统"""
_log_manager.initialize(config_path)
def get_logger(name: str, module: str = "default") -> logging.Logger:
"""获取logger实例 - 主要API"""
return _log_manager.get_logger(name, module)
# 便捷方法
def get_data_pipeline_logger(name: str) -> logging.Logger:
"""获取data_pipeline模块logger"""
return get_logger(name, "data_pipeline")
def get_agent_logger(name: str) -> logging.Logger:
"""获取agent模块logger"""
return get_logger(name, "agent")
def get_vanna_logger(name: str) -> logging.Logger:
"""获取vanna模块logger"""
return get_logger(name, "vanna")
# 上下文管理便捷方法
def set_log_context(**kwargs):
"""设置日志上下文(可选)
示例: set_log_context(user_id='user123', session_id='sess456')
"""
_log_manager.set_context(**kwargs)
def clear_log_context():
"""清除日志上下文"""
_log_manager.clear_context()
# 异步日志便捷方法
async def alog_info(logger: logging.Logger, message: str, **kwargs):
"""异步记录INFO日志"""
await _log_manager.alog(logger, 'info', message, **kwargs)
async def alog_error(logger: logging.Logger, message: str, **kwargs):
"""异步记录ERROR日志"""
await _log_manager.alog(logger, 'error', message, **kwargs)
async def alog_debug(logger: logging.Logger, message: str, **kwargs):
"""异步记录DEBUG日志"""
await _log_manager.alog(logger, 'debug', message, **kwargs)
async def alog_warning(logger: logging.Logger, message: str, **kwargs):
"""异步记录WARNING日志"""
await _log_manager.alog(logger, 'warning', message, **kwargs)
# config/logging_config.yaml
version: 1
# 全局配置
global:
base_level: INFO
# 默认配置
default:
level: INFO
console:
enabled: true
level: INFO
format: "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
file:
enabled: true
level: DEBUG
filename: "app.log"
# 支持上下文信息,但有默认值避免错误
format: "%(asctime)s [%(levelname)s] [%(name)s] [user:%(user_id)s] [session:%(session_id)s] %(filename)s:%(lineno)d - %(message)s"
rotation:
enabled: true
max_size: "50MB"
backup_count: 10
# 模块特定配置
modules:
data_pipeline:
level: DEBUG
console:
enabled: true
level: INFO
format: "🔄 %(asctime)s [%(levelname)s] Pipeline: %(message)s"
file:
enabled: true
level: DEBUG
filename: "data_pipeline.log"
format: "%(asctime)s [%(levelname)s] [%(name)s] %(filename)s:%(lineno)d - %(message)s"
rotation:
enabled: true
max_size: "30MB"
backup_count: 8
agent:
level: DEBUG
console:
enabled: true
level: INFO
format: "🤖 %(asctime)s [%(levelname)s] Agent: %(message)s"
file:
enabled: true
level: DEBUG
filename: "agent.log"
# Agent模块支持user_id和session_id
format: "%(asctime)s [%(levelname)s] [%(name)s] [user:%(user_id)s] [session:%(session_id)s] %(filename)s:%(lineno)d - %(message)s"
rotation:
enabled: true
max_size: "30MB"
backup_count: 8
vanna:
level: INFO
console:
enabled: true
level: INFO
format: "🧠 %(asctime)s [%(levelname)s] Vanna: %(message)s"
file:
enabled: true
level: DEBUG
filename: "vanna.log"
format: "%(asctime)s [%(levelname)s] [%(name)s] %(filename)s:%(lineno)d - %(message)s"
rotation:
enabled: true
max_size: "20MB"
backup_count: 5
创建日志服务目录结构
mkdir -p core/logging
mkdir -p config
mkdir -p logs
core/logging/log_manager.py
core/logging/__init__.py
创建 config/logging_config.yaml
集成到citu_app.py(主要应用) ```python
from core.logging import initialize_logging, get_logger, set_log_context, clear_log_context import uuid
initialize_logging("config/logging_config.yaml") app_logger = get_logger("CituApp", "default")
@app.flask_app.before_request def before_request(): # 为每个请求设置上下文(如果有的话) request_id = str(uuid.uuid4())[:8] user_id = request.headers.get('X-User-ID', 'anonymous') set_log_context(request_id=request_id, user_id=user_id)
@app.flask_app.after_request def after_request(response): # 清理上下文 clear_log_context() return response ```
# 替换 data_pipeline/utils/logger.py 中的使用方式
from core.logging import get_data_pipeline_logger
def setup_logging(verbose: bool = False, log_file: str = None, log_dir: str = None):
"""
保持原有接口,内部使用新的日志系统
"""
# 不再需要复杂的设置,直接使用统一日志系统
pass
# 在各个文件中使用
# data_pipeline/qa_generation/qs_agent.py
class QuestionSQLGenerationAgent:
def __init__(self, ...):
# 替换原有的 logging.getLogger("schema_tools.QSAgent")
self.logger = get_data_pipeline_logger("QSAgent")
async def generate(self):
self.logger.info("🚀 开始生成Question-SQL训练数据")
# ... 其他代码
# 手动记录关键节点的时间
start_time = time.time()
self.logger.info("开始初始化LLM组件")
self._initialize_llm_components()
init_time = time.time() - start_time
self.logger.info(f"LLM组件初始化完成,耗时: {init_time:.2f}秒")
# 在ask_agent接口中使用
@app.flask_app.route('/api/v0/ask_agent', methods=['POST'])
def ask_agent():
logger = get_agent_logger("AskAgent")
try:
data = request.json
question = data.get('question', '')
user_id = data.get('user_id') # 可选
session_id = data.get('session_id') # 可选
# 设置上下文(如果有的话)
if user_id or session_id:
set_log_context(user_id=user_id or 'anonymous', session_id=session_id or 'N/A')
logger.info(f"收到问题: {question[:50]}...")
# 异步记录示例(在async函数中)
# await alog_info(logger, f"开始处理问题: {question}")
# ... 其他处理逻辑
except Exception as e:
logger.error(f"处理失败: {str(e)}", exc_info=True)
# ...
由于vanna使用print方式,创建简单的适配器:
# core/logging/vanna_adapter.py
from core.logging import get_vanna_logger
class VannaLogAdapter:
"""Vanna日志适配器 - 将print转换为logger调用"""
def __init__(self, logger_name: str = "VannaBase"):
self.logger = get_vanna_logger(logger_name)
def log(self, message: str):
"""替换vanna的log方法"""
# 根据内容判断日志级别
message_lower = message.lower()
if any(keyword in message_lower for keyword in ['error', 'exception', 'fail']):
self.logger.error(message)
elif any(keyword in message_lower for keyword in ['warning', 'warn']):
self.logger.warning(message)
else:
self.logger.info(message)
# 使用装饰器改造vanna实例
def enhance_vanna_logging(vanna_instance):
"""增强vanna实例的日志功能"""
adapter = VannaLogAdapter(vanna_instance.__class__.__name__)
# 替换log方法
vanna_instance.log = adapter.log
return vanna_instance
# 在vanna实例创建时使用
# core/vanna_llm_factory.py
from core.logging.vanna_adapter import enhance_vanna_logging
def create_vanna_instance():
# 原有创建逻辑
vn = VannaDefault(...)
# 增强日志功能
vn = enhance_vanna_logging(vn)
return vn
对于跨多个函数的执行时间统计,在关键业务节点手动记录:
# data_pipeline/schema_workflow.py
import time
from core.logging import get_data_pipeline_logger
class SchemaWorkflowOrchestrator:
def __init__(self, ...):
self.logger = get_data_pipeline_logger("SchemaWorkflow")
async def run_full_workflow(self):
"""执行完整工作流"""
workflow_start = time.time()
self.logger.info("🚀 开始执行完整的Schema工作流")
try:
# 步骤1:生成DDL和MD文件
step1_start = time.time()
self.logger.info("📝 步骤1: 开始生成DDL和MD文件")
result1 = await self.generate_ddl_md()
step1_time = time.time() - step1_start
self.logger.info(f"✅ 步骤1完成,生成了{result1['ddl_count']}个DDL文件和{result1['md_count']}个MD文件,耗时: {step1_time:.2f}秒")
# 步骤2:生成Question-SQL对
step2_start = time.time()
self.logger.info("❓ 步骤2: 开始生成Question-SQL对")
result2 = await self.generate_qa_pairs()
step2_time = time.time() - step2_start
self.logger.info(f"✅ 步骤2完成,生成了{result2['qa_count']}个问答对,耗时: {step2_time:.2f}秒")
# 步骤3:验证SQL
step3_start = time.time()
self.logger.info("🔍 步骤3: 开始验证SQL")
result3 = await self.validate_sql()
step3_time = time.time() - step3_start
self.logger.info(f"✅ 步骤3完成,验证了{result3['validated_count']}个SQL,修复了{result3['fixed_count']}个,耗时: {step3_time:.2f}秒")
# 步骤4:加载训练数据
step4_start = time.time()
self.logger.info("📚 步骤4: 开始加载训练数据")
result4 = await self.load_training_data()
step4_time = time.time() - step4_start
self.logger.info(f"✅ 步骤4完成,加载了{result4['loaded_count']}个训练文件,耗时: {step4_time:.2f}秒")
# 总结
total_time = time.time() - workflow_start
self.logger.info(f"🎉 完整工作流执行成功!总耗时: {total_time:.2f}秒")
self.logger.info(f" - DDL/MD生成: {step1_time:.2f}秒")
self.logger.info(f" - QA生成: {step2_time:.2f}秒")
self.logger.info(f" - SQL验证: {step3_time:.2f}秒")
self.logger.info(f" - 数据加载: {step4_time:.2f}秒")
return {
"success": True,
"total_time": total_time,
"steps": {
"ddl_md": {"time": step1_time, "result": result1},
"qa_generation": {"time": step2_time, "result": result2},
"sql_validation": {"time": step3_time, "result": result3},
"data_loading": {"time": step4_time, "result": result4}
}
}
except Exception as e:
total_time = time.time() - workflow_start
self.logger.error(f"❌ 工作流执行失败,耗时: {total_time:.2f}秒,错误: {str(e)}")
raise
# citu_app.py
from core.logging import initialize_logging, get_logger, set_log_context, clear_log_context
import uuid
# 应用启动时初始化
initialize_logging("config/logging_config.yaml")
app_logger = get_logger("CituApp", "default")
# API端点示例
@app.flask_app.route('/api/v0/ask_agent', methods=['POST'])
def ask_agent():
logger = get_agent_logger("AskAgent")
request_id = str(uuid.uuid4())[:8]
try:
data = request.json
user_id = data.get('user_id')
# 设置上下文(安全的,即使没有user_id)
set_log_context(
request_id=request_id,
user_id=user_id or 'anonymous'
)
logger.info(f"开始处理请求")
# ... 业务逻辑
logger.info(f"请求处理成功")
return success_response(...)
except Exception as e:
logger.error(f"请求处理失败: {str(e)}", exc_info=True)
return error_response(...)
finally:
clear_log_context()
# data_pipeline/ddl_generation/training_data_agent.py
from core.logging import get_data_pipeline_logger
import time
class SchemaTrainingDataAgent:
def __init__(self, db_config, output_dir):
self.logger = get_data_pipeline_logger("TrainingDataAgent")
self.db_config = db_config
self.output_dir = output_dir
async def process_tables(self, table_list):
"""处理表列表"""
start_time = time.time()
self.logger.info(f"开始处理{len(table_list)}个表的训练数据生成")
success_count = 0
failed_tables = []
for table in table_list:
try:
table_start = time.time()
self.logger.debug(f"开始处理表: {table}")
await self._process_single_table(table)
table_time = time.time() - table_start
self.logger.info(f"表 {table} 处理完成,耗时: {table_time:.2f}秒")
success_count += 1
except Exception as e:
self.logger.error(f"表 {table} 处理失败: {str(e)}")
failed_tables.append(table)
total_time = time.time() - start_time
self.logger.info(f"批量处理完成,成功: {success_count}个,失败: {len(failed_tables)}个,总耗时: {total_time:.2f}秒")
if failed_tables:
self.logger.warning(f"处理失败的表: {failed_tables}")
return {
"success_count": success_count,
"failed_count": len(failed_tables),
"failed_tables": failed_tables,
"total_time": total_time
}
# agent/citu_agent.py
from core.logging import get_agent_logger, alog_info, alog_error
class CituLangGraphAgent:
def __init__(self):
self.logger = get_agent_logger("CituAgent")
async def process_question(self, question: str, session_id: str = None, user_id: str = None):
"""异步处理问题"""
# 设置上下文(如果有的话)
if user_id or session_id:
set_log_context(user_id=user_id or 'anonymous', session_id=session_id or 'N/A')
# 同步日志
self.logger.info(f"开始处理问题: {question[:50]}...")
try:
# 异步日志
await alog_info(self.logger, f"开始分类问题")
# 业务逻辑
result = await self._classify_question(question)
await alog_info(self.logger, f"分类完成: {result.question_type}")
return result
except Exception as e:
await alog_error(self.logger, f"处理失败: {str(e)}")
raise
# core/vanna_llm_factory.py
from core.logging.vanna_adapter import enhance_vanna_logging
from core.logging import get_vanna_logger
def create_vanna_instance():
"""创建增强了日志功能的vanna实例"""
logger = get_vanna_logger("VannaFactory")
logger.info("🧠 开始创建Vanna实例")
try:
# 原有创建逻辑
vn = VannaDefault(
config={
'api_key': os.getenv('OPENAI_API_KEY'),
'model': 'gpt-4'
}
)
# 增强日志功能
vn = enhance_vanna_logging(vn)
logger.info("✅ Vanna实例创建成功")
return vn
except Exception as e:
logger.error(f"❌ Vanna实例创建失败: {str(e)}")
raise
# config/logging_config_dev.yaml
version: 1
global:
base_level: DEBUG
default:
level: DEBUG
console:
enabled: true
level: DEBUG
file:
enabled: false # 开发环境可以只用控制台
modules:
data_pipeline:
level: DEBUG
console:
enabled: true
level: DEBUG
format: "🔄 %(asctime)s [%(levelname)s] Pipeline: %(message)s"
file:
enabled: true
level: DEBUG
filename: "data_pipeline.log"
agent:
level: DEBUG
console:
enabled: true
level: DEBUG
format: "🤖 %(asctime)s [%(levelname)s] Agent: %(message)s"
# config/logging_config_prod.yaml
version: 1
global:
base_level: INFO
default:
level: INFO
console:
enabled: false # 生产环境不输出到控制台
file:
enabled: true
level: INFO
rotation:
enabled: true
max_size: "100MB"
backup_count: 20
modules:
data_pipeline:
level: INFO
console:
enabled: false
file:
enabled: true
level: INFO
filename: "data_pipeline.log"
rotation:
enabled: true
max_size: "50MB"
backup_count: 15
langchain:
level: ERROR # 生产环境只记录错误
console:
enabled: false
file:
enabled: true
level: ERROR
基于用户反馈,特别注意以下几点:
这个精简实用的日志改造方案提供了:
该方案专注核心功能,去掉了不必要的复杂性,是一个可以直接落地实施的实用设计。