manager.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. """
  2. Data Pipeline 独立日志管理器
  3. 专门为data_pipeline模块设计的日志管理器,完全独立于主项目的日志系统
  4. """
  5. import os
  6. from pathlib import Path
  7. from typing import Dict
  8. # 明确导入Python内置logging模块
  9. import logging as std_logging
  10. class DataPipelineLogManager:
  11. """Data Pipeline 专用日志管理器"""
  12. _loggers: Dict[str, std_logging.Logger] = {}
  13. _file_handlers: Dict[str, std_logging.FileHandler] = {}
  14. @classmethod
  15. def get_logger(cls, name: str, task_id: str) -> std_logging.Logger:
  16. """
  17. 获取或创建logger
  18. Args:
  19. name: logger名称
  20. task_id: 任务ID,用于确定日志文件位置
  21. Returns:
  22. 配置好的logger实例
  23. """
  24. logger_key = f"data_pipeline.{name}.{task_id}"
  25. if logger_key not in cls._loggers:
  26. logger = cls._create_logger(name, task_id)
  27. cls._loggers[logger_key] = logger
  28. return cls._loggers[logger_key]
  29. @classmethod
  30. def _create_logger(cls, name: str, task_id: str) -> std_logging.Logger:
  31. """创建新的logger实例"""
  32. # 创建logger
  33. logger_name = f"data_pipeline.{name}"
  34. logger = std_logging.getLogger(logger_name)
  35. # 设置日志级别
  36. logger.setLevel(std_logging.DEBUG)
  37. # 防止日志重复(清除已有处理器)
  38. logger.handlers.clear()
  39. logger.propagate = False
  40. # 添加控制台处理器
  41. console_handler = cls._create_console_handler()
  42. logger.addHandler(console_handler)
  43. # 添加文件处理器
  44. file_handler = cls._create_file_handler(task_id)
  45. if file_handler:
  46. logger.addHandler(file_handler)
  47. return logger
  48. @classmethod
  49. def _create_console_handler(cls) -> std_logging.StreamHandler:
  50. """创建控制台处理器"""
  51. handler = std_logging.StreamHandler()
  52. handler.setLevel(std_logging.INFO)
  53. formatter = std_logging.Formatter(
  54. '%(asctime)s [%(levelname)s] Pipeline: %(message)s',
  55. datefmt='%Y-%m-%d %H:%M:%S'
  56. )
  57. handler.setFormatter(formatter)
  58. return handler
  59. @classmethod
  60. def _create_file_handler(cls, task_id: str) -> std_logging.FileHandler:
  61. """创建文件处理器"""
  62. try:
  63. # 确定日志文件路径
  64. task_dir = Path("data_pipeline/training_data") / task_id
  65. task_dir.mkdir(parents=True, exist_ok=True)
  66. log_file = task_dir / "data_pipeline.log"
  67. # 为每个任务创建独立的文件处理器
  68. handler_key = f"file_handler_{task_id}"
  69. if handler_key not in cls._file_handlers:
  70. handler = std_logging.FileHandler(log_file, encoding='utf-8')
  71. handler.setLevel(std_logging.DEBUG)
  72. formatter = std_logging.Formatter(
  73. '%(asctime)s [%(levelname)s] [%(name)s] %(filename)s:%(lineno)d - %(message)s',
  74. datefmt='%Y-%m-%d %H:%M:%S'
  75. )
  76. handler.setFormatter(formatter)
  77. cls._file_handlers[handler_key] = handler
  78. return cls._file_handlers[handler_key]
  79. except Exception as e:
  80. # 如果文件处理器创建失败,记录到stderr但不影响程序运行
  81. import sys
  82. sys.stderr.write(f"[WARNING] 无法创建data_pipeline日志文件处理器: {e}\n")
  83. return None
  84. @classmethod
  85. def cleanup_logger(cls, task_id: str):
  86. """清理指定任务的logger和文件处理器"""
  87. try:
  88. # 关闭文件处理器
  89. handler_key = f"file_handler_{task_id}"
  90. if handler_key in cls._file_handlers:
  91. cls._file_handlers[handler_key].close()
  92. del cls._file_handlers[handler_key]
  93. # 清理相关的logger
  94. keys_to_remove = [key for key in cls._loggers.keys() if task_id in key]
  95. for key in keys_to_remove:
  96. logger = cls._loggers[key]
  97. for handler in logger.handlers:
  98. handler.close()
  99. logger.handlers.clear()
  100. del cls._loggers[key]
  101. except Exception as e:
  102. import sys
  103. sys.stderr.write(f"[WARNING] 清理data_pipeline日志资源失败: {e}\n")
  104. @classmethod
  105. def cleanup_all(cls):
  106. """清理所有logger和文件处理器"""
  107. try:
  108. # 关闭所有文件处理器
  109. for handler in cls._file_handlers.values():
  110. handler.close()
  111. cls._file_handlers.clear()
  112. # 清理所有logger
  113. for logger in cls._loggers.values():
  114. for handler in logger.handlers:
  115. handler.close()
  116. logger.handlers.clear()
  117. cls._loggers.clear()
  118. except Exception as e:
  119. import sys
  120. sys.stderr.write(f"[WARNING] 清理所有data_pipeline日志资源失败: {e}\n")