manager.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. """
  2. Data Pipeline 独立日志管理器
  3. 专门为data_pipeline模块设计的日志管理器,完全独立于主项目的日志系统
  4. """
  5. import os
  6. from pathlib import Path
  7. from typing import Dict
  8. # 明确导入Python内置logging模块
  9. import logging as std_logging
  10. class DataPipelineLogManager:
  11. """Data Pipeline 专用日志管理器"""
  12. _loggers: Dict[str, std_logging.Logger] = {}
  13. _file_handlers: Dict[str, std_logging.FileHandler] = {}
  14. @classmethod
  15. def get_logger(cls, name: str, task_id: str) -> std_logging.Logger:
  16. """
  17. 获取或创建logger
  18. Args:
  19. name: logger名称
  20. task_id: 任务ID,用于确定日志文件位置
  21. Returns:
  22. 配置好的logger实例
  23. """
  24. logger_key = f"data_pipeline.{name}.{task_id}"
  25. if logger_key not in cls._loggers:
  26. logger = cls._create_logger(name, task_id)
  27. cls._loggers[logger_key] = logger
  28. return cls._loggers[logger_key]
  29. @classmethod
  30. def _create_logger(cls, name: str, task_id: str) -> std_logging.Logger:
  31. """创建新的logger实例"""
  32. # 创建logger
  33. logger_name = f"data_pipeline.{name}"
  34. logger = std_logging.getLogger(logger_name)
  35. # 设置日志级别
  36. logger.setLevel(std_logging.DEBUG)
  37. # 防止日志重复(清除已有处理器)
  38. logger.handlers.clear()
  39. logger.propagate = False
  40. # 添加控制台处理器
  41. console_handler = cls._create_console_handler()
  42. logger.addHandler(console_handler)
  43. # 添加文件处理器
  44. file_handler = cls._create_file_handler(task_id)
  45. if file_handler:
  46. logger.addHandler(file_handler)
  47. return logger
  48. @classmethod
  49. def _create_console_handler(cls) -> std_logging.StreamHandler:
  50. """创建控制台处理器"""
  51. handler = std_logging.StreamHandler()
  52. handler.setLevel(std_logging.INFO)
  53. formatter = std_logging.Formatter(
  54. '%(asctime)s [%(levelname)s] Pipeline: %(message)s',
  55. datefmt='%Y-%m-%d %H:%M:%S'
  56. )
  57. handler.setFormatter(formatter)
  58. return handler
  59. @classmethod
  60. def _create_file_handler(cls, task_id: str) -> std_logging.FileHandler:
  61. """创建文件处理器"""
  62. try:
  63. # 获取项目根目录的绝对路径
  64. project_root = Path(__file__).parent.parent.parent
  65. task_dir = project_root / "data_pipeline" / "training_data" / task_id
  66. task_dir.mkdir(parents=True, exist_ok=True)
  67. log_file = task_dir / "data_pipeline.log"
  68. # 为每个任务创建独立的文件处理器
  69. handler_key = f"file_handler_{task_id}"
  70. if handler_key not in cls._file_handlers:
  71. handler = std_logging.FileHandler(log_file, encoding='utf-8')
  72. handler.setLevel(std_logging.DEBUG)
  73. formatter = std_logging.Formatter(
  74. '%(asctime)s [%(levelname)s] [%(name)s] %(filename)s:%(lineno)d - %(message)s',
  75. datefmt='%Y-%m-%d %H:%M:%S'
  76. )
  77. handler.setFormatter(formatter)
  78. cls._file_handlers[handler_key] = handler
  79. return cls._file_handlers[handler_key]
  80. except Exception as e:
  81. # 如果文件处理器创建失败,记录到stderr但不影响程序运行
  82. import sys
  83. sys.stderr.write(f"[WARNING] 无法创建data_pipeline日志文件处理器: {e}\n")
  84. return None
  85. @classmethod
  86. def cleanup_logger(cls, task_id: str):
  87. """清理指定任务的logger和文件处理器"""
  88. try:
  89. # 关闭文件处理器
  90. handler_key = f"file_handler_{task_id}"
  91. if handler_key in cls._file_handlers:
  92. cls._file_handlers[handler_key].close()
  93. del cls._file_handlers[handler_key]
  94. # 清理相关的logger
  95. keys_to_remove = [key for key in cls._loggers.keys() if task_id in key]
  96. for key in keys_to_remove:
  97. logger = cls._loggers[key]
  98. for handler in logger.handlers:
  99. handler.close()
  100. logger.handlers.clear()
  101. del cls._loggers[key]
  102. except Exception as e:
  103. import sys
  104. sys.stderr.write(f"[WARNING] 清理data_pipeline日志资源失败: {e}\n")
  105. @classmethod
  106. def cleanup_all(cls):
  107. """清理所有logger和文件处理器"""
  108. try:
  109. # 关闭所有文件处理器
  110. for handler in cls._file_handlers.values():
  111. handler.close()
  112. cls._file_handlers.clear()
  113. # 清理所有logger
  114. for logger in cls._loggers.values():
  115. for handler in logger.handlers:
  116. handler.close()
  117. logger.handlers.clear()
  118. cls._loggers.clear()
  119. except Exception as e:
  120. import sys
  121. sys.stderr.write(f"[WARNING] 清理所有data_pipeline日志资源失败: {e}\n")