log_manager.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. import logging
  2. import logging.handlers
  3. import os
  4. from typing import Dict, Optional
  5. from pathlib import Path
  6. import yaml
  7. import contextvars
  8. # 上下文变量,存储可选的上下文信息
  9. log_context = contextvars.ContextVar('log_context', default={})
  10. class ContextFilter(logging.Filter):
  11. """添加上下文信息到日志记录"""
  12. def filter(self, record):
  13. ctx = log_context.get()
  14. # 设置默认值,避免格式化错误
  15. record.session_id = ctx.get('session_id', 'N/A')
  16. record.user_id = ctx.get('user_id', 'anonymous')
  17. record.request_id = ctx.get('request_id', 'N/A')
  18. return True
  19. class LogManager:
  20. """统一日志管理器 - 类似Log4j的功能"""
  21. _instance = None
  22. _loggers: Dict[str, logging.Logger] = {}
  23. _initialized = False
  24. _fallback_to_console = False # 标记是否降级到控制台
  25. def __new__(cls):
  26. if cls._instance is None:
  27. cls._instance = super().__new__(cls)
  28. return cls._instance
  29. def __init__(self):
  30. if not self._initialized:
  31. self.config = None
  32. self.base_log_dir = Path("logs")
  33. self._setup_base_directory()
  34. LogManager._initialized = True
  35. def initialize(self, config_path: str = "config/logging_config.yaml"):
  36. """初始化日志系统"""
  37. self.config = self._load_config(config_path)
  38. self._setup_base_directory()
  39. self._configure_root_logger()
  40. def get_logger(self, name: str, module: str = "default") -> logging.Logger:
  41. """获取指定模块的logger"""
  42. logger_key = f"{module}.{name}"
  43. if logger_key not in self._loggers:
  44. logger = logging.getLogger(logger_key)
  45. self._configure_logger(logger, module)
  46. self._loggers[logger_key] = logger
  47. return self._loggers[logger_key]
  48. def set_context(self, **kwargs):
  49. """设置日志上下文(可选)"""
  50. ctx = log_context.get()
  51. ctx.update(kwargs)
  52. log_context.set(ctx)
  53. def clear_context(self):
  54. """清除日志上下文"""
  55. log_context.set({})
  56. def _load_config(self, config_path: str) -> dict:
  57. """加载配置文件"""
  58. try:
  59. with open(config_path, 'r', encoding='utf-8') as f:
  60. return yaml.safe_load(f)
  61. except FileNotFoundError:
  62. import sys
  63. sys.stderr.write(f"[WARNING] 配置文件 {config_path} 未找到,使用默认配置\n")
  64. return self._get_default_config()
  65. except Exception as e:
  66. import sys
  67. sys.stderr.write(f"[ERROR] 加载配置文件失败: {e},使用默认配置\n")
  68. return self._get_default_config()
  69. def _get_default_config(self) -> dict:
  70. """获取默认配置"""
  71. return {
  72. 'global': {'base_level': 'INFO'},
  73. 'default': {
  74. 'level': 'INFO',
  75. 'console': {
  76. 'enabled': True,
  77. 'level': 'INFO',
  78. 'format': '%(asctime)s [%(levelname)s] %(name)s: %(message)s'
  79. },
  80. 'file': {
  81. 'enabled': True,
  82. 'level': 'DEBUG',
  83. 'filename': 'app.log',
  84. 'format': '%(asctime)s [%(levelname)s] [%(name)s] [user:%(user_id)s] [session:%(session_id)s] %(filename)s:%(lineno)d - %(message)s',
  85. 'rotation': {
  86. 'enabled': True,
  87. 'max_size': '50MB',
  88. 'backup_count': 10
  89. }
  90. }
  91. },
  92. 'modules': {}
  93. }
  94. def _setup_base_directory(self):
  95. """创建日志目录(带降级策略)"""
  96. try:
  97. os.makedirs(self.base_log_dir, exist_ok=True)
  98. self._fallback_to_console = False
  99. except Exception as e:
  100. import sys
  101. sys.stderr.write(f"[WARNING] 无法创建日志目录 {self.base_log_dir},将只使用控制台输出: {e}\n")
  102. self._fallback_to_console = True
  103. def _configure_root_logger(self):
  104. """配置根日志器"""
  105. root_logger = logging.getLogger()
  106. root_logger.setLevel(getattr(logging, self.config['global']['base_level'].upper()))
  107. def _configure_logger(self, logger: logging.Logger, module: str):
  108. """配置具体的logger"""
  109. module_config = self.config.get('modules', {}).get(module, self.config['default'])
  110. # 设置日志级别
  111. level = getattr(logging, module_config['level'].upper())
  112. logger.setLevel(level)
  113. # 清除已有处理器
  114. logger.handlers.clear()
  115. logger.propagate = False
  116. # 添加控制台处理器
  117. if module_config.get('console', {}).get('enabled', True):
  118. console_handler = self._create_console_handler(module_config['console'])
  119. console_handler.addFilter(ContextFilter())
  120. logger.addHandler(console_handler)
  121. # 添加文件处理器(如果没有降级到控制台)
  122. if not self._fallback_to_console and module_config.get('file', {}).get('enabled', True):
  123. try:
  124. file_handler = self._create_file_handler(module_config['file'], module)
  125. file_handler.addFilter(ContextFilter())
  126. logger.addHandler(file_handler)
  127. except Exception as e:
  128. import sys
  129. sys.stderr.write(f"[WARNING] 无法创建文件处理器: {e}\n")
  130. # 如果文件处理器创建失败,标记降级
  131. self._fallback_to_console = True
  132. def _create_console_handler(self, console_config: dict) -> logging.StreamHandler:
  133. """创建控制台处理器"""
  134. handler = logging.StreamHandler()
  135. handler.setLevel(getattr(logging, console_config.get('level', 'INFO').upper()))
  136. formatter = logging.Formatter(
  137. console_config.get('format', '%(asctime)s [%(levelname)s] %(name)s: %(message)s'),
  138. datefmt='%Y-%m-%d %H:%M:%S'
  139. )
  140. handler.setFormatter(formatter)
  141. return handler
  142. def _create_file_handler(self, file_config: dict, module: str) -> logging.Handler:
  143. """创建文件处理器(支持自动轮转)"""
  144. log_file = self.base_log_dir / file_config.get('filename', f'{module}.log')
  145. # 使用RotatingFileHandler实现自动轮转和清理
  146. rotation_config = file_config.get('rotation', {})
  147. if rotation_config.get('enabled', False):
  148. handler = logging.handlers.RotatingFileHandler(
  149. log_file,
  150. maxBytes=self._parse_size(rotation_config.get('max_size', '50MB')),
  151. backupCount=rotation_config.get('backup_count', 10),
  152. encoding='utf-8'
  153. )
  154. else:
  155. handler = logging.FileHandler(log_file, encoding='utf-8')
  156. handler.setLevel(getattr(logging, file_config.get('level', 'DEBUG').upper()))
  157. formatter = logging.Formatter(
  158. file_config.get('format', '%(asctime)s [%(levelname)s] [%(name)s] %(filename)s:%(lineno)d - %(message)s'),
  159. datefmt='%Y-%m-%d %H:%M:%S'
  160. )
  161. handler.setFormatter(formatter)
  162. return handler
  163. def _parse_size(self, size_str: str) -> int:
  164. """解析大小字符串,如 '50MB' -> 字节数"""
  165. size_str = size_str.upper()
  166. if size_str.endswith('KB'):
  167. return int(size_str[:-2]) * 1024
  168. elif size_str.endswith('MB'):
  169. return int(size_str[:-2]) * 1024 * 1024
  170. elif size_str.endswith('GB'):
  171. return int(size_str[:-2]) * 1024 * 1024 * 1024
  172. else:
  173. return int(size_str)