table_parser.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. import os
  2. from typing import List, Tuple
  3. import logging
  4. class TableListParser:
  5. """表清单解析器"""
  6. def __init__(self):
  7. self.logger = logging.getLogger("TableListParser")
  8. def parse_file(self, file_path: str) -> List[str]:
  9. """
  10. 解析表清单文件,支持换行符和逗号分隔
  11. Args:
  12. file_path: 表清单文件路径
  13. Returns:
  14. 表名列表(已去重)
  15. Raises:
  16. FileNotFoundError: 文件不存在
  17. ValueError: 文件格式错误
  18. """
  19. if not os.path.exists(file_path):
  20. raise FileNotFoundError(f"表清单文件不存在: {file_path}")
  21. tables = []
  22. seen_tables = set() # 用于跟踪已见过的表名
  23. duplicate_count = 0 # 重复表计数
  24. try:
  25. with open(file_path, 'r', encoding='utf-8') as f:
  26. for line_num, line in enumerate(f, 1):
  27. # 移除空白字符
  28. line = line.strip()
  29. # 跳过空行和注释行
  30. if not line or line.startswith('#') or line.startswith('--'):
  31. continue
  32. # 如果行内包含逗号,按逗号分割;否则整行作为一个表名
  33. if ',' in line:
  34. tables_in_line = [t.strip() for t in line.split(',') if t.strip()]
  35. else:
  36. tables_in_line = [line]
  37. # 验证每个表名并添加到结果中
  38. for table_name in tables_in_line:
  39. if self._validate_table_name(table_name):
  40. # 检查是否重复
  41. if table_name not in seen_tables:
  42. tables.append(table_name)
  43. seen_tables.add(table_name)
  44. self.logger.debug(f"解析到表: {table_name}")
  45. else:
  46. duplicate_count += 1
  47. self.logger.debug(f"第 {line_num} 行: 发现重复表名: {table_name}")
  48. else:
  49. self.logger.warning(f"第 {line_num} 行: 无效的表名格式: {table_name}")
  50. if not tables:
  51. raise ValueError("表清单文件中没有有效的表名")
  52. # 记录去重统计信息
  53. original_count = len(tables) + duplicate_count
  54. if duplicate_count > 0:
  55. self.logger.info(f"表清单去重统计: 原始{original_count}个表,去重后{len(tables)}个表,移除了{duplicate_count}个重复项")
  56. else:
  57. self.logger.info(f"成功解析 {len(tables)} 个表(无重复)")
  58. return tables
  59. except Exception as e:
  60. self.logger.error(f"解析表清单文件失败: {e}")
  61. raise
  62. def _validate_table_name(self, table_name: str) -> bool:
  63. """
  64. 验证表名格式
  65. Args:
  66. table_name: 表名
  67. Returns:
  68. 是否合法
  69. """
  70. # 基本验证:不能为空,不能包含特殊字符
  71. if not table_name:
  72. return False
  73. # 禁止的字符
  74. forbidden_chars = [';', '(', ')', '[', ']', '{', '}', '*', '?', '!', '@', '#', '$', '%', '^', '&']
  75. for char in forbidden_chars:
  76. if char in table_name:
  77. return False
  78. # 表名格式:schema.table 或 table
  79. parts = table_name.split('.')
  80. if len(parts) > 2:
  81. return False
  82. # 每部分都不能为空
  83. for part in parts:
  84. if not part:
  85. return False
  86. return True
  87. def parse_string(self, tables_str: str) -> List[str]:
  88. """
  89. 解析表名字符串,支持换行符和逗号分隔(用于测试或命令行输入)
  90. Args:
  91. tables_str: 表名字符串,逗号或换行分隔
  92. Returns:
  93. 表名列表(已去重)
  94. """
  95. tables = []
  96. seen_tables = set()
  97. # 按换行符分割,然后处理每一行
  98. lines = tables_str.split('\n')
  99. for line in lines:
  100. line = line.strip()
  101. # 跳过空行和注释行
  102. if not line or line.startswith('#') or line.startswith('--'):
  103. continue
  104. # 如果行内包含逗号,按逗号分割;否则整行作为一个表名
  105. if ',' in line:
  106. tables_in_line = [t.strip() for t in line.split(',') if t.strip()]
  107. else:
  108. tables_in_line = [line]
  109. # 验证每个表名并添加到结果中
  110. for table_name in tables_in_line:
  111. if table_name and self._validate_table_name(table_name):
  112. if table_name not in seen_tables:
  113. tables.append(table_name)
  114. seen_tables.add(table_name)
  115. return tables
  116. def get_duplicate_info(self, file_path: str) -> Tuple[List[str], List[str]]:
  117. """
  118. 获取表清单文件的重复信息
  119. Args:
  120. file_path: 表清单文件路径
  121. Returns:
  122. (唯一表名列表, 重复表名列表)
  123. """
  124. if not os.path.exists(file_path):
  125. raise FileNotFoundError(f"表清单文件不存在: {file_path}")
  126. unique_tables = []
  127. duplicate_tables = []
  128. seen_tables = set()
  129. try:
  130. with open(file_path, 'r', encoding='utf-8') as f:
  131. for line in f:
  132. line = line.strip()
  133. if not line or line.startswith('#') or line.startswith('--'):
  134. continue
  135. # 如果行内包含逗号,按逗号分割;否则整行作为一个表名
  136. if ',' in line:
  137. tables_in_line = [t.strip() for t in line.split(',') if t.strip()]
  138. else:
  139. tables_in_line = [line]
  140. # 处理每个表名
  141. for table_name in tables_in_line:
  142. if self._validate_table_name(table_name):
  143. if table_name not in seen_tables:
  144. unique_tables.append(table_name)
  145. seen_tables.add(table_name)
  146. else:
  147. duplicate_tables.append(table_name)
  148. return unique_tables, duplicate_tables
  149. except Exception as e:
  150. self.logger.error(f"获取重复信息失败: {e}")
  151. raise