file_count_validator.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. from pathlib import Path
  2. from typing import Dict, List, Tuple, Set
  3. from dataclasses import dataclass, field
  4. from data_pipeline.utils.table_parser import TableListParser
  5. from data_pipeline.config import SCHEMA_TOOLS_CONFIG
  6. import logging
  7. @dataclass
  8. class ValidationResult:
  9. """验证结果"""
  10. is_valid: bool
  11. table_count: int
  12. ddl_count: int
  13. md_count: int
  14. error: str = ""
  15. missing_ddl: List[str] = field(default_factory=list)
  16. missing_md: List[str] = field(default_factory=list)
  17. duplicate_tables: List[str] = field(default_factory=list)
  18. class FileCountValidator:
  19. """文件数量验证器"""
  20. def __init__(self):
  21. self.logger = logging.getLogger("FileCountValidator")
  22. self.config = SCHEMA_TOOLS_CONFIG
  23. def validate(self, table_list_file: str, output_dir: str) -> ValidationResult:
  24. """
  25. 验证生成的文件数量是否与表数量一致
  26. Args:
  27. table_list_file: 表清单文件路径
  28. output_dir: 输出目录路径
  29. Returns:
  30. ValidationResult: 验证结果
  31. """
  32. try:
  33. # 1. 解析表清单获取表数量(自动去重)
  34. table_parser = TableListParser()
  35. tables = table_parser.parse_file(table_list_file)
  36. table_count = len(tables)
  37. # 获取重复信息
  38. unique_tables, duplicate_tables = table_parser.get_duplicate_info(table_list_file)
  39. # 2. 检查表数量限制
  40. max_tables = self.config['qs_generation']['max_tables']
  41. if table_count > max_tables:
  42. return ValidationResult(
  43. is_valid=False,
  44. table_count=table_count,
  45. ddl_count=0,
  46. md_count=0,
  47. error=f"表数量({table_count})超过限制({max_tables})。请分批处理或调整配置中的max_tables参数。",
  48. duplicate_tables=duplicate_tables
  49. )
  50. # 3. 扫描输出目录
  51. output_path = Path(output_dir)
  52. if not output_path.exists():
  53. return ValidationResult(
  54. is_valid=False,
  55. table_count=table_count,
  56. ddl_count=0,
  57. md_count=0,
  58. error=f"输出目录不存在: {output_dir}",
  59. duplicate_tables=duplicate_tables
  60. )
  61. # 4. 统计DDL和MD文件(兼容数字后缀)
  62. ddl_files = list(output_path.glob("*.ddl"))
  63. # 匹配基础格式和带数字后缀的格式
  64. md_files_basic = list(output_path.glob("*_detail.md"))
  65. md_files_numbered = list(output_path.glob("*_detail_*.md"))
  66. md_files = md_files_basic + md_files_numbered
  67. ddl_count = len(ddl_files)
  68. md_count = len(md_files)
  69. self.logger.info(f"文件统计 - 表: {table_count}, DDL: {ddl_count}, MD: {md_count}")
  70. if duplicate_tables:
  71. self.logger.info(f"表清单中存在 {len(duplicate_tables)} 个重复项")
  72. # 5. 验证数量一致性
  73. if ddl_count != table_count or md_count != table_count:
  74. # 查找缺失的文件
  75. missing_ddl, missing_md = self._find_missing_files(tables, ddl_files, md_files)
  76. error_parts = []
  77. if ddl_count != table_count:
  78. error_parts.append(f"DDL文件数量({ddl_count})与表数量({table_count})不一致")
  79. if missing_ddl:
  80. self.logger.error(f"缺失的DDL文件对应的表: {', '.join(missing_ddl)}")
  81. if md_count != table_count:
  82. error_parts.append(f"MD文件数量({md_count})与表数量({table_count})不一致")
  83. if missing_md:
  84. self.logger.error(f"缺失的MD文件对应的表: {', '.join(missing_md)}")
  85. return ValidationResult(
  86. is_valid=False,
  87. table_count=table_count,
  88. ddl_count=ddl_count,
  89. md_count=md_count,
  90. error="; ".join(error_parts),
  91. missing_ddl=missing_ddl,
  92. missing_md=missing_md,
  93. duplicate_tables=duplicate_tables
  94. )
  95. # 6. 验证通过
  96. self.logger.info(f"文件验证通过:{table_count}个表,{ddl_count}个DDL,{md_count}个MD")
  97. return ValidationResult(
  98. is_valid=True,
  99. table_count=table_count,
  100. ddl_count=ddl_count,
  101. md_count=md_count,
  102. duplicate_tables=duplicate_tables
  103. )
  104. except Exception as e:
  105. self.logger.exception("文件验证失败")
  106. return ValidationResult(
  107. is_valid=False,
  108. table_count=0,
  109. ddl_count=0,
  110. md_count=0,
  111. error=f"验证过程发生异常: {str(e)}"
  112. )
  113. def _find_missing_files(self, tables: List[str], ddl_files: List[Path], md_files: List[Path]) -> Tuple[List[str], List[str]]:
  114. """查找缺失的文件"""
  115. # 获取已生成的文件名(不含扩展名)
  116. ddl_names = {f.stem for f in ddl_files}
  117. md_names = {f.stem.replace('_detail', '') for f in md_files} # 移除_detail后缀
  118. missing_ddl = []
  119. missing_md = []
  120. # 为每个表建立可能的文件名映射
  121. table_to_filenames = self._get_table_filename_mapping(tables)
  122. # 检查每个表的文件
  123. for table_spec in tables:
  124. # 获取该表可能的文件名
  125. possible_filenames = table_to_filenames[table_spec]
  126. # 检查DDL文件
  127. ddl_exists = any(fname in ddl_names for fname in possible_filenames)
  128. if not ddl_exists:
  129. missing_ddl.append(table_spec)
  130. # 检查MD文件
  131. md_exists = any(fname in md_names for fname in possible_filenames)
  132. if not md_exists:
  133. missing_md.append(table_spec)
  134. return missing_ddl, missing_md
  135. def _get_table_filename_mapping(self, tables: List[str]) -> Dict[str, Set[str]]:
  136. """获取表名到可能的文件名的映射"""
  137. mapping = {}
  138. for table_spec in tables:
  139. # 解析表名
  140. if '.' in table_spec:
  141. schema, table = table_spec.split('.', 1)
  142. else:
  143. schema, table = 'public', table_spec
  144. # 生成可能的文件名
  145. possible_names = set()
  146. # 基本格式
  147. if schema.lower() == 'public':
  148. possible_names.add(table)
  149. else:
  150. possible_names.add(f"{schema}__{table}")
  151. possible_names.add(f"{schema}_{table}") # 兼容不同格式
  152. # 考虑特殊字符替换
  153. safe_name = table.replace('-', '_').replace(' ', '_')
  154. if safe_name != table:
  155. if schema.lower() == 'public':
  156. possible_names.add(safe_name)
  157. else:
  158. possible_names.add(f"{schema}__{safe_name}")
  159. possible_names.add(f"{schema}_{safe_name}")
  160. mapping[table_spec] = possible_names
  161. return mapping