12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- """
- 原有日志系统已被新的统一日志系统替代
- 保留此文件仅为避免导入错误
- """
- from core.logging import get_data_pipeline_logger
- from typing import Optional
- import logging
- def setup_logging(verbose: bool = False, log_file: Optional[str] = None, log_dir: Optional[str] = None):
- """
- 函数保留以避免调用错误,但不做任何事
- 原有日志系统已被统一日志系统替代
- """
- pass
- def get_logger(name: str = "DataPipeline"):
- """直接返回新的logger"""
- return get_data_pipeline_logger(name)
- def get_colored_console_handler(level=logging.INFO):
- """兼容性函数,返回None"""
- return None
- class TableProcessingLogger:
- """兼容性类,实际使用新的日志系统"""
-
- def __init__(self, logger_name: str = "schema_tools.TableProcessor"):
- self.logger = get_data_pipeline_logger("TableProcessor")
- self.current_table = None
- self.start_time = None
-
- def start_table(self, table_name: str):
- """开始处理表"""
- import time
- self.current_table = table_name
- self.start_time = time.time()
- self.logger.info(f"{'='*60}")
- self.logger.info(f"开始处理表: {table_name}")
-
- def end_table(self, success: bool = True):
- """结束处理表"""
- if self.start_time:
- import time
- duration = time.time() - self.start_time
- status = "成功" if success else "失败"
- self.logger.info(f"处理{status},耗时: {duration:.2f}秒")
- self.logger.info(f"{'='*60}")
- self.current_table = None
- self.start_time = None
-
- def log_step(self, step_name: str, message: str = None):
- """记录处理步骤"""
- if message:
- self.logger.info(f" [{step_name}] {message}")
- else:
- self.logger.info(f" [{step_name}]")
-
- def log_warning(self, message: str):
- """记录警告"""
- self.logger.warning(f" ⚠ {message}")
-
- def log_error(self, message: str):
- """记录错误"""
- self.logger.error(f" ✗ {message}")
- # 兼容性类
- class ColoredFormatter:
- """兼容性类,不再使用"""
- def __init__(self, *args, **kwargs):
- pass
-
- def format(self, record):
- return str(record)
|