123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- """
- Data Pipeline 独立日志管理器
- 专门为data_pipeline模块设计的日志管理器,完全独立于主项目的日志系统
- """
- import os
- from pathlib import Path
- from typing import Dict
- # 明确导入Python内置logging模块
- import logging as std_logging
- class DataPipelineLogManager:
- """Data Pipeline 专用日志管理器"""
-
- _loggers: Dict[str, std_logging.Logger] = {}
- _file_handlers: Dict[str, std_logging.FileHandler] = {}
-
- @classmethod
- def get_logger(cls, name: str, task_id: str) -> std_logging.Logger:
- """
- 获取或创建logger
-
- Args:
- name: logger名称
- task_id: 任务ID,用于确定日志文件位置
-
- Returns:
- 配置好的logger实例
- """
- logger_key = f"data_pipeline.{name}.{task_id}"
-
- if logger_key not in cls._loggers:
- logger = cls._create_logger(name, task_id)
- cls._loggers[logger_key] = logger
-
- return cls._loggers[logger_key]
-
- @classmethod
- def _create_logger(cls, name: str, task_id: str) -> std_logging.Logger:
- """创建新的logger实例"""
- # 创建logger
- logger_name = f"data_pipeline.{name}"
- logger = std_logging.getLogger(logger_name)
-
- # 设置日志级别
- logger.setLevel(std_logging.DEBUG)
-
- # 防止日志重复(清除已有处理器)
- logger.handlers.clear()
- logger.propagate = False
-
- # 添加控制台处理器
- console_handler = cls._create_console_handler()
- logger.addHandler(console_handler)
-
- # 添加文件处理器
- file_handler = cls._create_file_handler(task_id)
- if file_handler:
- logger.addHandler(file_handler)
-
- return logger
-
- @classmethod
- def _create_console_handler(cls) -> std_logging.StreamHandler:
- """创建控制台处理器"""
- handler = std_logging.StreamHandler()
- handler.setLevel(std_logging.INFO)
-
- formatter = std_logging.Formatter(
- '%(asctime)s [%(levelname)s] Pipeline: %(message)s',
- datefmt='%Y-%m-%d %H:%M:%S'
- )
- handler.setFormatter(formatter)
-
- return handler
-
- @classmethod
- def _create_file_handler(cls, task_id: str) -> std_logging.FileHandler:
- """创建文件处理器"""
- try:
- # 获取项目根目录的绝对路径
- project_root = Path(__file__).parent.parent.parent
- task_dir = project_root / "data_pipeline" / "training_data" / task_id
-
- task_dir.mkdir(parents=True, exist_ok=True)
-
- log_file = task_dir / "data_pipeline.log"
-
- # 为每个任务创建独立的文件处理器
- handler_key = f"file_handler_{task_id}"
-
- if handler_key not in cls._file_handlers:
- handler = std_logging.FileHandler(log_file, encoding='utf-8')
- handler.setLevel(std_logging.DEBUG)
-
- formatter = std_logging.Formatter(
- '%(asctime)s [%(levelname)s] [%(name)s] %(filename)s:%(lineno)d - %(message)s',
- datefmt='%Y-%m-%d %H:%M:%S'
- )
- handler.setFormatter(formatter)
-
- cls._file_handlers[handler_key] = handler
-
- return cls._file_handlers[handler_key]
-
- except Exception as e:
- # 如果文件处理器创建失败,记录到stderr但不影响程序运行
- import sys
- sys.stderr.write(f"[WARNING] 无法创建data_pipeline日志文件处理器: {e}\n")
- return None
-
- @classmethod
- def cleanup_logger(cls, task_id: str):
- """清理指定任务的logger和文件处理器"""
- try:
- # 关闭文件处理器
- handler_key = f"file_handler_{task_id}"
- if handler_key in cls._file_handlers:
- cls._file_handlers[handler_key].close()
- del cls._file_handlers[handler_key]
-
- # 清理相关的logger
- keys_to_remove = [key for key in cls._loggers.keys() if task_id in key]
- for key in keys_to_remove:
- logger = cls._loggers[key]
- for handler in logger.handlers:
- handler.close()
- logger.handlers.clear()
- del cls._loggers[key]
-
- except Exception as e:
- import sys
- sys.stderr.write(f"[WARNING] 清理data_pipeline日志资源失败: {e}\n")
-
- @classmethod
- def cleanup_all(cls):
- """清理所有logger和文件处理器"""
- try:
- # 关闭所有文件处理器
- for handler in cls._file_handlers.values():
- handler.close()
- cls._file_handlers.clear()
-
- # 清理所有logger
- for logger in cls._loggers.values():
- for handler in logger.handlers:
- handler.close()
- logger.handlers.clear()
- cls._loggers.clear()
-
- except Exception as e:
- import sys
- sys.stderr.write(f"[WARNING] 清理所有data_pipeline日志资源失败: {e}\n")
|