table_parser.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. import os
  2. import logging
  3. from typing import List, Tuple
  4. class TableListParser:
  5. """表清单解析器"""
  6. def __init__(self):
  7. self.logger = logging.getLogger("schema_tools.TableListParser")
  8. def parse_file(self, file_path: str) -> List[str]:
  9. """
  10. 解析表清单文件
  11. Args:
  12. file_path: 表清单文件路径
  13. Returns:
  14. 表名列表(已去重)
  15. Raises:
  16. FileNotFoundError: 文件不存在
  17. ValueError: 文件格式错误
  18. """
  19. if not os.path.exists(file_path):
  20. raise FileNotFoundError(f"表清单文件不存在: {file_path}")
  21. tables = []
  22. seen_tables = set() # 用于跟踪已见过的表名
  23. duplicate_count = 0 # 重复表计数
  24. try:
  25. with open(file_path, 'r', encoding='utf-8') as f:
  26. for line_num, line in enumerate(f, 1):
  27. # 移除空白字符
  28. line = line.strip()
  29. # 跳过空行和注释行
  30. if not line or line.startswith('#') or line.startswith('--'):
  31. continue
  32. # 验证表名格式
  33. if self._validate_table_name(line):
  34. # 检查是否重复
  35. if line not in seen_tables:
  36. tables.append(line)
  37. seen_tables.add(line)
  38. self.logger.debug(f"解析到表: {line}")
  39. else:
  40. duplicate_count += 1
  41. self.logger.debug(f"第 {line_num} 行: 发现重复表名: {line}")
  42. else:
  43. self.logger.warning(f"第 {line_num} 行: 无效的表名格式: {line}")
  44. if not tables:
  45. raise ValueError("表清单文件中没有有效的表名")
  46. # 记录去重统计信息
  47. original_count = len(tables) + duplicate_count
  48. if duplicate_count > 0:
  49. self.logger.info(f"表清单去重统计: 原始{original_count}个表,去重后{len(tables)}个表,移除了{duplicate_count}个重复项")
  50. else:
  51. self.logger.info(f"成功解析 {len(tables)} 个表(无重复)")
  52. return tables
  53. except Exception as e:
  54. self.logger.error(f"解析表清单文件失败: {e}")
  55. raise
  56. def _validate_table_name(self, table_name: str) -> bool:
  57. """
  58. 验证表名格式
  59. Args:
  60. table_name: 表名
  61. Returns:
  62. 是否合法
  63. """
  64. # 基本验证:不能为空,不能包含特殊字符
  65. if not table_name:
  66. return False
  67. # 禁止的字符
  68. forbidden_chars = [';', '(', ')', '[', ']', '{', '}', '*', '?', '!', '@', '#', '$', '%', '^', '&']
  69. for char in forbidden_chars:
  70. if char in table_name:
  71. return False
  72. # 表名格式:schema.table 或 table
  73. parts = table_name.split('.')
  74. if len(parts) > 2:
  75. return False
  76. # 每部分都不能为空
  77. for part in parts:
  78. if not part:
  79. return False
  80. return True
  81. def parse_string(self, tables_str: str) -> List[str]:
  82. """
  83. 解析表名字符串(用于测试或命令行输入)
  84. Args:
  85. tables_str: 表名字符串,逗号或换行分隔
  86. Returns:
  87. 表名列表(已去重)
  88. """
  89. tables = []
  90. seen_tables = set()
  91. # 支持逗号和换行分隔
  92. for separator in [',', '\n']:
  93. if separator in tables_str:
  94. parts = tables_str.split(separator)
  95. break
  96. else:
  97. parts = [tables_str]
  98. for part in parts:
  99. table_name = part.strip()
  100. if table_name and self._validate_table_name(table_name):
  101. if table_name not in seen_tables:
  102. tables.append(table_name)
  103. seen_tables.add(table_name)
  104. return tables
  105. def get_duplicate_info(self, file_path: str) -> Tuple[List[str], List[str]]:
  106. """
  107. 获取表清单文件的重复信息
  108. Args:
  109. file_path: 表清单文件路径
  110. Returns:
  111. (唯一表名列表, 重复表名列表)
  112. """
  113. if not os.path.exists(file_path):
  114. raise FileNotFoundError(f"表清单文件不存在: {file_path}")
  115. unique_tables = []
  116. duplicate_tables = []
  117. seen_tables = set()
  118. try:
  119. with open(file_path, 'r', encoding='utf-8') as f:
  120. for line in f:
  121. line = line.strip()
  122. if not line or line.startswith('#') or line.startswith('--'):
  123. continue
  124. if self._validate_table_name(line):
  125. if line not in seen_tables:
  126. unique_tables.append(line)
  127. seen_tables.add(line)
  128. else:
  129. duplicate_tables.append(line)
  130. return unique_tables, duplicate_tables
  131. except Exception as e:
  132. self.logger.error(f"获取重复信息失败: {e}")
  133. raise