| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 | import osimport loggingfrom typing import Dict, Set, Optionalfrom pathlib import Pathclass FileNameManager:    """文件名管理器,处理文件命名和冲突"""        def __init__(self, output_dir: str):        self.output_dir = output_dir        self.used_names: Set[str] = set()        self.name_mapping: Dict[str, str] = {}  # 原始名 -> 实际文件名        self.logger = logging.getLogger("schema_tools.FileNameManager")                # 扫描已存在的文件        self._scan_existing_files()        def _scan_existing_files(self):        """扫描输出目录中已存在的文件"""        if not os.path.exists(self.output_dir):            return                for root, dirs, files in os.walk(self.output_dir):            for file in files:                if file.endswith(('.ddl', '.md')):                    self.used_names.add(file)        def get_safe_filename(self, schema_name: str, table_name: str, suffix: str) -> str:        """        生成安全的文件名,避免冲突                Args:            schema_name: Schema名称            table_name: 表名            suffix: 文件后缀(如 .ddl 或 _detail.md)                    Returns:            安全的文件名        """        # 生成基础文件名        base_name = self._generate_base_name(schema_name, table_name)                # 添加后缀        if suffix.startswith('.'):            filename = f"{base_name}{suffix}"        else:            filename = f"{base_name}{suffix}"                # 检查冲突并生成唯一名称        unique_filename = self._ensure_unique_filename(filename)                # 记录映射关系        original_key = f"{schema_name}.{table_name}"        self.name_mapping[original_key] = unique_filename        self.used_names.add(unique_filename)                return unique_filename        def _generate_base_name(self, schema_name: str, table_name: str) -> str:        """        生成基础文件名                规则:        - public.table_name → table_name        - schema.table_name → schema__table_name          - 特殊字符替换: . → __, - → _, 空格 → _        """        if schema_name.lower() == 'public':            safe_name = table_name        else:            safe_name = f"{schema_name}__{table_name}"                # 替换特殊字符        replacements = {            '.': '__',            '-': '_',            ' ': '_',            '/': '_',            '\\': '_',            ':': '_',            '*': '_',            '?': '_',            '"': '_',            '<': '_',            '>': '_',            '|': '_'        }                for old_char, new_char in replacements.items():            safe_name = safe_name.replace(old_char, new_char)                # 移除连续的下划线        while '__' in safe_name:            safe_name = safe_name.replace('__', '_')                return safe_name        def _ensure_unique_filename(self, filename: str) -> str:        """确保文件名唯一性"""        if filename not in self.used_names:            return filename                # 如果重名,添加数字后缀        base, ext = os.path.splitext(filename)        counter = 1                while True:            unique_name = f"{base}_{counter}{ext}"            if unique_name not in self.used_names:                self.logger.warning(f"文件名冲突,'{filename}' 重命名为 '{unique_name}'")                return unique_name            counter += 1        def get_full_path(self, filename: str, subdirectory: Optional[str] = None) -> str:        """        获取完整文件路径                Args:            filename: 文件名            subdirectory: 子目录(如 'ddl' 或 'docs')                    Returns:            完整路径        """        if subdirectory:            full_path = os.path.join(self.output_dir, subdirectory, filename)        else:            full_path = os.path.join(self.output_dir, filename)                # 确保目录存在        os.makedirs(os.path.dirname(full_path), exist_ok=True)                return full_path        def get_mapping_report(self) -> Dict[str, str]:        """获取文件名映射报告"""        return self.name_mapping.copy()        def write_mapping_report(self):        """写入文件名映射报告"""        report_path = os.path.join(self.output_dir, "filename_mapping.txt")                try:            with open(report_path, 'w', encoding='utf-8') as f:                f.write("# 文件名映射报告\n")                f.write("# 格式: 原始表名 -> 实际文件名\n\n")                                for original, actual in sorted(self.name_mapping.items()):                    f.write(f"{original} -> {actual}\n")                        self.logger.info(f"文件名映射报告已保存到: {report_path}")        except Exception as e:            self.logger.error(f"写入文件名映射报告失败: {e}")
 |