| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 | import osimport loggingfrom typing import List, Tupleclass TableListParser:    """表清单解析器"""        def __init__(self):        self.logger = logging.getLogger("schema_tools.TableListParser")        def parse_file(self, file_path: str) -> List[str]:        """        解析表清单文件                Args:            file_path: 表清单文件路径                    Returns:            表名列表(已去重)                    Raises:            FileNotFoundError: 文件不存在            ValueError: 文件格式错误        """        if not os.path.exists(file_path):            raise FileNotFoundError(f"表清单文件不存在: {file_path}")                tables = []        seen_tables = set()  # 用于跟踪已见过的表名        duplicate_count = 0  # 重复表计数                try:            with open(file_path, 'r', encoding='utf-8') as f:                for line_num, line in enumerate(f, 1):                    # 移除空白字符                    line = line.strip()                                        # 跳过空行和注释行                    if not line or line.startswith('#') or line.startswith('--'):                        continue                                        # 验证表名格式                    if self._validate_table_name(line):                        # 检查是否重复                        if line not in seen_tables:                            tables.append(line)                            seen_tables.add(line)                            self.logger.debug(f"解析到表: {line}")                        else:                            duplicate_count += 1                            self.logger.debug(f"第 {line_num} 行: 发现重复表名: {line}")                    else:                        self.logger.warning(f"第 {line_num} 行: 无效的表名格式: {line}")                        if not tables:                raise ValueError("表清单文件中没有有效的表名")                        # 记录去重统计信息            original_count = len(tables) + duplicate_count            if duplicate_count > 0:                self.logger.info(f"表清单去重统计: 原始{original_count}个表,去重后{len(tables)}个表,移除了{duplicate_count}个重复项")            else:                self.logger.info(f"成功解析 {len(tables)} 个表(无重复)")                        return tables                    except Exception as e:            self.logger.error(f"解析表清单文件失败: {e}")            raise        def _validate_table_name(self, table_name: str) -> bool:        """        验证表名格式                Args:            table_name: 表名                    Returns:            是否合法        """        # 基本验证:不能为空,不能包含特殊字符        if not table_name:            return False                # 禁止的字符        forbidden_chars = [';', '(', ')', '[', ']', '{', '}', '*', '?', '!', '@', '#', '$', '%', '^', '&']        for char in forbidden_chars:            if char in table_name:                return False                # 表名格式:schema.table 或 table        parts = table_name.split('.')        if len(parts) > 2:            return False                # 每部分都不能为空        for part in parts:            if not part:                return False                return True        def parse_string(self, tables_str: str) -> List[str]:        """        解析表名字符串(用于测试或命令行输入)                Args:            tables_str: 表名字符串,逗号或换行分隔                    Returns:            表名列表(已去重)        """        tables = []        seen_tables = set()                # 支持逗号和换行分隔        for separator in [',', '\n']:            if separator in tables_str:                parts = tables_str.split(separator)                break        else:            parts = [tables_str]                for part in parts:            table_name = part.strip()            if table_name and self._validate_table_name(table_name):                if table_name not in seen_tables:                    tables.append(table_name)                    seen_tables.add(table_name)                return tables        def get_duplicate_info(self, file_path: str) -> Tuple[List[str], List[str]]:        """        获取表清单文件的重复信息                Args:            file_path: 表清单文件路径                    Returns:            (唯一表名列表, 重复表名列表)        """        if not os.path.exists(file_path):            raise FileNotFoundError(f"表清单文件不存在: {file_path}")                unique_tables = []        duplicate_tables = []        seen_tables = set()                try:            with open(file_path, 'r', encoding='utf-8') as f:                for line in f:                    line = line.strip()                    if not line or line.startswith('#') or line.startswith('--'):                        continue                                        if self._validate_table_name(line):                        if line not in seen_tables:                            unique_tables.append(line)                            seen_tables.add(line)                        else:                            duplicate_tables.append(line)                        return unique_tables, duplicate_tables                    except Exception as e:            self.logger.error(f"获取重复信息失败: {e}")            raise
 |