""" 原有日志系统已被新的统一日志系统替代 保留此文件仅为避免导入错误 """ from core.logging import get_data_pipeline_logger from typing import Optional import logging def setup_logging(verbose: bool = False, log_file: Optional[str] = None, log_dir: Optional[str] = None): """ 函数保留以避免调用错误,但不做任何事 原有日志系统已被统一日志系统替代 """ pass def get_logger(name: str = "DataPipeline"): """直接返回新的logger""" return get_data_pipeline_logger(name) def get_colored_console_handler(level=logging.INFO): """兼容性函数,返回None""" return None class TableProcessingLogger: """兼容性类,实际使用新的日志系统""" def __init__(self, logger_name: str = "schema_tools.TableProcessor"): self.logger = get_data_pipeline_logger("TableProcessor") self.current_table = None self.start_time = None def start_table(self, table_name: str): """开始处理表""" import time self.current_table = table_name self.start_time = time.time() self.logger.info(f"{'='*60}") self.logger.info(f"开始处理表: {table_name}") def end_table(self, success: bool = True): """结束处理表""" if self.start_time: import time duration = time.time() - self.start_time status = "成功" if success else "失败" self.logger.info(f"处理{status},耗时: {duration:.2f}秒") self.logger.info(f"{'='*60}") self.current_table = None self.start_time = None def log_step(self, step_name: str, message: str = None): """记录处理步骤""" if message: self.logger.info(f" [{step_name}] {message}") else: self.logger.info(f" [{step_name}]") def log_warning(self, message: str): """记录警告""" self.logger.warning(f" ⚠ {message}") def log_error(self, message: str): """记录错误""" self.logger.error(f" ✗ {message}") # 兼容性类 class ColoredFormatter: """兼容性类,不再使用""" def __init__(self, *args, **kwargs): pass def format(self, record): return str(record)