file_count_validator.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. import logging
  2. from pathlib import Path
  3. from typing import Dict, List, Tuple, Set
  4. from dataclasses import dataclass, field
  5. from schema_tools.utils.table_parser import TableListParser
  6. from schema_tools.config import SCHEMA_TOOLS_CONFIG
  7. @dataclass
  8. class ValidationResult:
  9. """验证结果"""
  10. is_valid: bool
  11. table_count: int
  12. ddl_count: int
  13. md_count: int
  14. error: str = ""
  15. missing_ddl: List[str] = field(default_factory=list)
  16. missing_md: List[str] = field(default_factory=list)
  17. duplicate_tables: List[str] = field(default_factory=list)
  18. class FileCountValidator:
  19. """文件数量验证器"""
  20. def __init__(self):
  21. self.logger = logging.getLogger("schema_tools.FileCountValidator")
  22. self.config = SCHEMA_TOOLS_CONFIG
  23. def validate(self, table_list_file: str, output_dir: str) -> ValidationResult:
  24. """
  25. 验证生成的文件数量是否与表数量一致
  26. Args:
  27. table_list_file: 表清单文件路径
  28. output_dir: 输出目录路径
  29. Returns:
  30. ValidationResult: 验证结果
  31. """
  32. try:
  33. # 1. 解析表清单获取表数量(自动去重)
  34. table_parser = TableListParser()
  35. tables = table_parser.parse_file(table_list_file)
  36. table_count = len(tables)
  37. # 获取重复信息
  38. unique_tables, duplicate_tables = table_parser.get_duplicate_info(table_list_file)
  39. # 2. 检查表数量限制
  40. max_tables = self.config['qs_generation']['max_tables']
  41. if table_count > max_tables:
  42. return ValidationResult(
  43. is_valid=False,
  44. table_count=table_count,
  45. ddl_count=0,
  46. md_count=0,
  47. error=f"表数量({table_count})超过限制({max_tables})。请分批处理或调整配置中的max_tables参数。",
  48. duplicate_tables=duplicate_tables
  49. )
  50. # 3. 扫描输出目录
  51. output_path = Path(output_dir)
  52. if not output_path.exists():
  53. return ValidationResult(
  54. is_valid=False,
  55. table_count=table_count,
  56. ddl_count=0,
  57. md_count=0,
  58. error=f"输出目录不存在: {output_dir}",
  59. duplicate_tables=duplicate_tables
  60. )
  61. # 4. 统计DDL和MD文件
  62. ddl_files = list(output_path.glob("*.ddl"))
  63. md_files = list(output_path.glob("*_detail.md")) # 注意文件后缀格式
  64. ddl_count = len(ddl_files)
  65. md_count = len(md_files)
  66. self.logger.info(f"文件统计 - 表: {table_count}, DDL: {ddl_count}, MD: {md_count}")
  67. if duplicate_tables:
  68. self.logger.info(f"表清单中存在 {len(duplicate_tables)} 个重复项")
  69. # 5. 验证数量一致性
  70. if ddl_count != table_count or md_count != table_count:
  71. # 查找缺失的文件
  72. missing_ddl, missing_md = self._find_missing_files(tables, ddl_files, md_files)
  73. error_parts = []
  74. if ddl_count != table_count:
  75. error_parts.append(f"DDL文件数量({ddl_count})与表数量({table_count})不一致")
  76. if missing_ddl:
  77. self.logger.error(f"缺失的DDL文件对应的表: {', '.join(missing_ddl)}")
  78. if md_count != table_count:
  79. error_parts.append(f"MD文件数量({md_count})与表数量({table_count})不一致")
  80. if missing_md:
  81. self.logger.error(f"缺失的MD文件对应的表: {', '.join(missing_md)}")
  82. return ValidationResult(
  83. is_valid=False,
  84. table_count=table_count,
  85. ddl_count=ddl_count,
  86. md_count=md_count,
  87. error="; ".join(error_parts),
  88. missing_ddl=missing_ddl,
  89. missing_md=missing_md,
  90. duplicate_tables=duplicate_tables
  91. )
  92. # 6. 验证通过
  93. self.logger.info(f"文件验证通过:{table_count}个表,{ddl_count}个DDL,{md_count}个MD")
  94. return ValidationResult(
  95. is_valid=True,
  96. table_count=table_count,
  97. ddl_count=ddl_count,
  98. md_count=md_count,
  99. duplicate_tables=duplicate_tables
  100. )
  101. except Exception as e:
  102. self.logger.exception("文件验证失败")
  103. return ValidationResult(
  104. is_valid=False,
  105. table_count=0,
  106. ddl_count=0,
  107. md_count=0,
  108. error=f"验证过程发生异常: {str(e)}"
  109. )
  110. def _find_missing_files(self, tables: List[str], ddl_files: List[Path], md_files: List[Path]) -> Tuple[List[str], List[str]]:
  111. """查找缺失的文件"""
  112. # 获取已生成的文件名(不含扩展名)
  113. ddl_names = {f.stem for f in ddl_files}
  114. md_names = {f.stem.replace('_detail', '') for f in md_files} # 移除_detail后缀
  115. missing_ddl = []
  116. missing_md = []
  117. # 为每个表建立可能的文件名映射
  118. table_to_filenames = self._get_table_filename_mapping(tables)
  119. # 检查每个表的文件
  120. for table_spec in tables:
  121. # 获取该表可能的文件名
  122. possible_filenames = table_to_filenames[table_spec]
  123. # 检查DDL文件
  124. ddl_exists = any(fname in ddl_names for fname in possible_filenames)
  125. if not ddl_exists:
  126. missing_ddl.append(table_spec)
  127. # 检查MD文件
  128. md_exists = any(fname in md_names for fname in possible_filenames)
  129. if not md_exists:
  130. missing_md.append(table_spec)
  131. return missing_ddl, missing_md
  132. def _get_table_filename_mapping(self, tables: List[str]) -> Dict[str, Set[str]]:
  133. """获取表名到可能的文件名的映射"""
  134. mapping = {}
  135. for table_spec in tables:
  136. # 解析表名
  137. if '.' in table_spec:
  138. schema, table = table_spec.split('.', 1)
  139. else:
  140. schema, table = 'public', table_spec
  141. # 生成可能的文件名
  142. possible_names = set()
  143. # 基本格式
  144. if schema.lower() == 'public':
  145. possible_names.add(table)
  146. else:
  147. possible_names.add(f"{schema}__{table}")
  148. possible_names.add(f"{schema}_{table}") # 兼容不同格式
  149. # 考虑特殊字符替换
  150. safe_name = table.replace('-', '_').replace(' ', '_')
  151. if safe_name != table:
  152. if schema.lower() == 'public':
  153. possible_names.add(safe_name)
  154. else:
  155. possible_names.add(f"{schema}__{safe_name}")
  156. possible_names.add(f"{schema}_{safe_name}")
  157. mapping[table_spec] = possible_names
  158. return mapping