table_parser.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. import os
  2. import logging
  3. from typing import List
  4. class TableListParser:
  5. """表清单解析器"""
  6. def __init__(self):
  7. self.logger = logging.getLogger("schema_tools.TableListParser")
  8. def parse_file(self, file_path: str) -> List[str]:
  9. """
  10. 解析表清单文件
  11. Args:
  12. file_path: 表清单文件路径
  13. Returns:
  14. 表名列表
  15. Raises:
  16. FileNotFoundError: 文件不存在
  17. ValueError: 文件格式错误
  18. """
  19. if not os.path.exists(file_path):
  20. raise FileNotFoundError(f"表清单文件不存在: {file_path}")
  21. tables = []
  22. try:
  23. with open(file_path, 'r', encoding='utf-8') as f:
  24. for line_num, line in enumerate(f, 1):
  25. # 移除空白字符
  26. line = line.strip()
  27. # 跳过空行和注释行
  28. if not line or line.startswith('#') or line.startswith('--'):
  29. continue
  30. # 验证表名格式
  31. if self._validate_table_name(line):
  32. tables.append(line)
  33. self.logger.debug(f"解析到表: {line}")
  34. else:
  35. self.logger.warning(f"第 {line_num} 行: 无效的表名格式: {line}")
  36. if not tables:
  37. raise ValueError("表清单文件中没有有效的表名")
  38. self.logger.info(f"成功解析 {len(tables)} 个表")
  39. return tables
  40. except Exception as e:
  41. self.logger.error(f"解析表清单文件失败: {e}")
  42. raise
  43. def _validate_table_name(self, table_name: str) -> bool:
  44. """
  45. 验证表名格式
  46. Args:
  47. table_name: 表名
  48. Returns:
  49. 是否合法
  50. """
  51. # 基本验证:不能为空,不能包含特殊字符
  52. if not table_name:
  53. return False
  54. # 禁止的字符
  55. forbidden_chars = [';', '(', ')', '[', ']', '{', '}', '*', '?', '!', '@', '#', '$', '%', '^', '&']
  56. for char in forbidden_chars:
  57. if char in table_name:
  58. return False
  59. # 表名格式:schema.table 或 table
  60. parts = table_name.split('.')
  61. if len(parts) > 2:
  62. return False
  63. # 每部分都不能为空
  64. for part in parts:
  65. if not part:
  66. return False
  67. return True
  68. def parse_string(self, tables_str: str) -> List[str]:
  69. """
  70. 解析表名字符串(用于测试或命令行输入)
  71. Args:
  72. tables_str: 表名字符串,逗号或换行分隔
  73. Returns:
  74. 表名列表
  75. """
  76. tables = []
  77. # 支持逗号和换行分隔
  78. for separator in [',', '\n']:
  79. if separator in tables_str:
  80. parts = tables_str.split(separator)
  81. break
  82. else:
  83. parts = [tables_str]
  84. for part in parts:
  85. table_name = part.strip()
  86. if table_name and self._validate_table_name(table_name):
  87. tables.append(table_name)
  88. return tables