file_count_validator.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. from pathlib import Path
  2. from typing import Dict, List, Tuple, Set
  3. from dataclasses import dataclass, field
  4. from data_pipeline.utils.table_parser import TableListParser
  5. from data_pipeline.config import SCHEMA_TOOLS_CONFIG
  6. import logging
  7. @dataclass
  8. class ValidationResult:
  9. """验证结果"""
  10. is_valid: bool
  11. table_count: int
  12. ddl_count: int
  13. md_count: int
  14. error: str = ""
  15. missing_ddl: List[str] = field(default_factory=list)
  16. missing_md: List[str] = field(default_factory=list)
  17. duplicate_tables: List[str] = field(default_factory=list)
  18. class FileCountValidator:
  19. """文件数量验证器"""
  20. def __init__(self):
  21. self.logger = logging.getLogger("FileCountValidator")
  22. self.config = SCHEMA_TOOLS_CONFIG
  23. def validate(self, table_list_file: str, output_dir: str) -> ValidationResult:
  24. """
  25. 验证生成的文件数量是否与表数量一致
  26. Args:
  27. table_list_file: 表清单文件路径
  28. output_dir: 输出目录路径
  29. Returns:
  30. ValidationResult: 验证结果
  31. """
  32. try:
  33. # 1. 解析表清单获取表数量(自动去重)
  34. table_parser = TableListParser()
  35. tables = table_parser.parse_file(table_list_file)
  36. table_count = len(tables)
  37. # 获取重复信息
  38. unique_tables, duplicate_tables = table_parser.get_duplicate_info(table_list_file)
  39. # 2. 检查表数量限制
  40. max_tables = self.config['qs_generation']['max_tables']
  41. if table_count > max_tables:
  42. return ValidationResult(
  43. is_valid=False,
  44. table_count=table_count,
  45. ddl_count=0,
  46. md_count=0,
  47. error=f"表数量({table_count})超过限制({max_tables})。请分批处理或调整配置中的max_tables参数。",
  48. duplicate_tables=duplicate_tables
  49. )
  50. # 3. 扫描输出目录
  51. output_path = Path(output_dir)
  52. if not output_path.exists():
  53. return ValidationResult(
  54. is_valid=False,
  55. table_count=table_count,
  56. ddl_count=0,
  57. md_count=0,
  58. error=f"输出目录不存在: {output_dir}",
  59. duplicate_tables=duplicate_tables
  60. )
  61. # 4. 精确验证每个表对应的文件
  62. missing_ddl, missing_md = self._find_missing_files_precise(tables, output_path)
  63. # 计算实际存在的文件数量
  64. ddl_count = table_count - len(missing_ddl)
  65. md_count = table_count - len(missing_md)
  66. self.logger.info(f"文件统计 - 表: {table_count}, 存在DDL: {ddl_count}, 存在MD: {md_count}")
  67. if duplicate_tables:
  68. self.logger.info(f"表清单中存在 {len(duplicate_tables)} 个重复项")
  69. # 5. 验证文件完整性
  70. if missing_ddl or missing_md:
  71. error_parts = []
  72. if missing_ddl:
  73. error_parts.append(f"缺失{len(missing_ddl)}个DDL文件")
  74. self.logger.error(f"缺失的DDL文件对应的表: {', '.join(missing_ddl)}")
  75. if missing_md:
  76. error_parts.append(f"缺失{len(missing_md)}个MD文件")
  77. self.logger.error(f"缺失的MD文件对应的表: {', '.join(missing_md)}")
  78. return ValidationResult(
  79. is_valid=False,
  80. table_count=table_count,
  81. ddl_count=ddl_count,
  82. md_count=md_count,
  83. error="; ".join(error_parts),
  84. missing_ddl=missing_ddl,
  85. missing_md=missing_md,
  86. duplicate_tables=duplicate_tables
  87. )
  88. # 6. 验证通过
  89. self.logger.info(f"文件验证通过:{table_count}个表,{ddl_count}个DDL,{md_count}个MD")
  90. return ValidationResult(
  91. is_valid=True,
  92. table_count=table_count,
  93. ddl_count=ddl_count,
  94. md_count=md_count,
  95. duplicate_tables=duplicate_tables
  96. )
  97. except Exception as e:
  98. self.logger.exception("文件验证失败")
  99. return ValidationResult(
  100. is_valid=False,
  101. table_count=0,
  102. ddl_count=0,
  103. md_count=0,
  104. error=f"验证过程发生异常: {str(e)}"
  105. )
  106. def _find_missing_files_precise(self, tables: List[str], output_path: Path) -> Tuple[List[str], List[str]]:
  107. """精确查找缺失的文件,基于表名生成期望的文件名"""
  108. missing_ddl = []
  109. missing_md = []
  110. for table_spec in tables:
  111. # 根据FileNameManager的命名规则生成期望的文件名
  112. expected_filename = self._get_expected_filename(table_spec)
  113. # 检查DDL文件
  114. ddl_file = output_path / f"{expected_filename}.ddl"
  115. if not ddl_file.exists():
  116. missing_ddl.append(table_spec)
  117. self.logger.debug(f"缺失DDL文件: {ddl_file.name} (表: {table_spec})")
  118. # 检查MD文件
  119. md_file = output_path / f"{expected_filename}_detail.md"
  120. if not md_file.exists():
  121. missing_md.append(table_spec)
  122. self.logger.debug(f"缺失MD文件: {md_file.name} (表: {table_spec})")
  123. return missing_ddl, missing_md
  124. def _get_expected_filename(self, table_spec: str) -> str:
  125. """根据表名生成期望的文件名(复制FileNameManager的逻辑)"""
  126. # 解析表名
  127. if '.' in table_spec:
  128. schema, table = table_spec.split('.', 1)
  129. else:
  130. schema, table = 'public', table_spec
  131. # 生成基础文件名(遵循FileNameManager的规则)
  132. if schema.lower() == 'public':
  133. safe_name = table
  134. else:
  135. safe_name = f"{schema}__{table}"
  136. # 替换特殊字符(遵循FileNameManager的规则)
  137. replacements = {
  138. '.': '__',
  139. '-': '_',
  140. ' ': '_',
  141. '/': '_',
  142. '\\': '_',
  143. ':': '_',
  144. '*': '_',
  145. '?': '_',
  146. '"': '_',
  147. '<': '_',
  148. '>': '_',
  149. '|': '_'
  150. }
  151. for old_char, new_char in replacements.items():
  152. safe_name = safe_name.replace(old_char, new_char)
  153. # 移除连续的下划线
  154. while '__' in safe_name:
  155. safe_name = safe_name.replace('__', '_')
  156. return safe_name
  157. def _find_missing_files(self, tables: List[str], ddl_files: List[Path], md_files: List[Path]) -> Tuple[List[str], List[str]]:
  158. """查找缺失的文件"""
  159. # 获取已生成的文件名(不含扩展名)
  160. ddl_names = {f.stem for f in ddl_files}
  161. md_names = {f.stem.replace('_detail', '') for f in md_files} # 移除_detail后缀
  162. missing_ddl = []
  163. missing_md = []
  164. # 为每个表建立可能的文件名映射
  165. table_to_filenames = self._get_table_filename_mapping(tables)
  166. # 检查每个表的文件
  167. for table_spec in tables:
  168. # 获取该表可能的文件名
  169. possible_filenames = table_to_filenames[table_spec]
  170. # 检查DDL文件
  171. ddl_exists = any(fname in ddl_names for fname in possible_filenames)
  172. if not ddl_exists:
  173. missing_ddl.append(table_spec)
  174. # 检查MD文件
  175. md_exists = any(fname in md_names for fname in possible_filenames)
  176. if not md_exists:
  177. missing_md.append(table_spec)
  178. return missing_ddl, missing_md
  179. def _get_table_filename_mapping(self, tables: List[str]) -> Dict[str, Set[str]]:
  180. """获取表名到可能的文件名的映射"""
  181. mapping = {}
  182. for table_spec in tables:
  183. # 解析表名
  184. if '.' in table_spec:
  185. schema, table = table_spec.split('.', 1)
  186. else:
  187. schema, table = 'public', table_spec
  188. # 生成可能的文件名
  189. possible_names = set()
  190. # 基本格式
  191. if schema.lower() == 'public':
  192. possible_names.add(table)
  193. else:
  194. possible_names.add(f"{schema}__{table}")
  195. possible_names.add(f"{schema}_{table}") # 兼容不同格式
  196. # 考虑特殊字符替换
  197. safe_name = table.replace('-', '_').replace(' ', '_')
  198. if safe_name != table:
  199. if schema.lower() == 'public':
  200. possible_names.add(safe_name)
  201. else:
  202. possible_names.add(f"{schema}__{safe_name}")
  203. possible_names.add(f"{schema}_{safe_name}")
  204. mapping[table_spec] = possible_names
  205. return mapping