Kaynağa Gözat

基本完成对pg数据库的训练数据*.dll和*.md的开发,脚本测试通过,下面准备继续第二阶段的开发,生成sql键值对。

wangxq 1 hafta önce
ebeveyn
işleme
8dbaa0c2f7

+ 17 - 0
output/ddl/bss_car_day_count_1.ddl

@@ -0,0 +1,17 @@
+-- 中文名: 服务区车辆日统计表
+-- 描述: 服务区车辆日统计表,记录每日车辆数量及类型,用于流量分析与资源调度
+create table public.bss_car_day_count (
+  id varchar(32) not null     -- 主键ID,主键,
+  version integer not null    -- 版本号,
+  create_ts timestamp         -- 创建时间,
+  created_by varchar(50)      -- 创建人ID,
+  update_ts timestamp         -- 更新时间,
+  updated_by varchar(50)      -- 更新人ID,
+  delete_ts timestamp         -- 删除时间,
+  deleted_by varchar(50)      -- 删除人ID,
+  customer_count bigint       -- 车辆数量,
+  car_type varchar(100)       -- 车辆类别,
+  count_date date             -- 统计日期,
+  service_area_id varchar(32) -- 服务区ID,
+  primary key (id)
+);

+ 18 - 0
output/docs/bss_car_day_count_detail_1.md

@@ -0,0 +1,18 @@
+## bss_car_day_count(服务区车辆日统计表)
+bss_car_day_count 表服务区车辆日统计表,记录每日车辆数量及类型,用于流量分析与资源调度
+字段列表:
+- id (varchar(32)) - 主键ID [主键, 非空] [示例: 00022c1c99ff11ec86d4fa163ec0f8fc, 00022caa99ff11ec86d4fa163ec0f8fc]
+- version (integer) - 版本号 [非空] [示例: 1]
+- create_ts (timestamp) - 创建时间 [示例: 2022-03-02 16:01:43, 2022-02-02 14:18:55]
+- created_by (varchar(50)) - 创建人ID
+- update_ts (timestamp) - 更新时间 [示例: 2022-03-02 16:01:43, 2022-02-02 14:18:55]
+- updated_by (varchar(50)) - 更新人ID
+- delete_ts (timestamp) - 删除时间
+- deleted_by (varchar(50)) - 删除人ID
+- customer_count (bigint) - 车辆数量 [示例: 1114, 295]
+- car_type (varchar(100)) - 车辆类别 [示例: 其他]
+- count_date (date) - 统计日期 [示例: 2022-03-02, 2022-02-02]
+- service_area_id (varchar(32)) - 服务区ID [示例: 17461166e7fa3ecda03534a5795ce985, 81f4eb731fb0728aef17ae61f1f1daef]
+字段补充说明:
+- id 为主键
+- car_type 为枚举字段,包含取值:其他、危化品、城际、过境

+ 4 - 0
output/filename_mapping.txt

@@ -0,0 +1,4 @@
+# 文件名映射报告
+# 格式: 原始表名 -> 实际文件名
+
+public.bss_car_day_count -> bss_car_day_count_detail_1.md

+ 235 - 0
schema_tools/README.md

@@ -0,0 +1,235 @@
+# Schema Tools
+
+自动化数据库逆向工程工具,用于从PostgreSQL数据库生成vanna.ai格式的训练数据。
+
+## 功能特性
+
+- 🚀 自动连接PostgreSQL数据库
+- 📋 批量处理表清单
+- 🤖 LLM智能生成中文注释
+- 🔍 自动检测枚举字段
+- ⚡ 并发处理提高效率
+- 📁 生成标准化的DDL和MD文档
+- 🛡️ 完整的错误处理和日志记录
+
+## 安装依赖
+
+```bash
+pip install asyncpg asyncio
+```
+
+## 使用方法
+
+### 1. 命令行方式
+
+#### 基本使用
+```bash
+python -m schema_tools \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --table-list tables.txt \
+  --business-context "高速公路服务区管理系统"
+```
+
+#### 指定输出目录和处理链
+```bash
+python -m schema_tools \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --table-list tables.txt \
+  --business-context "电商系统" \
+  --output-dir ./output \
+  --pipeline full
+```
+
+#### 仅检查数据库权限
+```bash
+python -m schema_tools \
+  --db-connection "postgresql://user:pass@localhost:5432/dbname" \
+  --check-permissions-only
+```
+
+### 2. 编程方式
+
+```python
+import asyncio
+from schema_tools import SchemaTrainingDataAgent
+
+async def generate_training_data():
+    agent = SchemaTrainingDataAgent(
+        db_connection="postgresql://user:pass@localhost:5432/dbname",
+        table_list_file="tables.txt",
+        business_context="高速公路服务区管理系统",
+        output_dir="./output",
+        pipeline="full"
+    )
+    
+    report = await agent.generate_training_data()
+    print(f"处理完成: {report['summary']}")
+
+asyncio.run(generate_training_data())
+```
+
+### 3. 表清单文件格式
+
+创建一个文本文件(如 `tables.txt`),每行一个表名:
+
+```text
+# 这是注释行
+public.users
+public.orders
+hr.employees
+sales.products
+```
+
+## 输出文件结构
+
+```
+output/
+├── ddl/                          # DDL文件目录
+│   ├── users.ddl
+│   ├── orders.ddl
+│   └── hr__employees.ddl
+├── docs/                         # MD文档目录
+│   ├── users_detail.md
+│   ├── orders_detail.md
+│   └── hr__employees_detail.md
+├── logs/                         # 日志目录
+│   └── schema_tools_20240101_120000.log
+└── filename_mapping.txt          # 文件名映射报告
+```
+
+## 配置选项
+
+主要配置在 `schema_tools/config.py` 中:
+
+```python
+SCHEMA_TOOLS_CONFIG = {
+    # 核心配置
+    "output_directory": "training/generated_data",
+    "default_pipeline": "full",
+    
+    # 数据处理配置
+    "sample_data_limit": 20,              # 采样数据量
+    "max_concurrent_tables": 3,           # 最大并发数
+    
+    # LLM配置
+    "max_llm_retries": 3,                # LLM重试次数
+    "comment_generation_timeout": 30,     # 超时时间
+    
+    # 系统表过滤
+    "filter_system_tables": True,         # 过滤系统表
+    
+    # 错误处理
+    "continue_on_error": True,            # 错误后继续
+}
+```
+
+## 处理链类型
+
+- **full**: 完整处理链(默认)
+  - 数据库检查 → 数据采样 → 注释生成 → DDL生成 → MD文档生成
+
+- **ddl_only**: 仅生成DDL
+  - 数据库检查 → 数据采样 → 注释生成 → DDL生成
+
+- **analysis_only**: 仅分析不生成文件
+  - 数据库检查 → 数据采样 → 注释生成
+
+## 业务上下文
+
+业务上下文帮助LLM更好地理解表和字段的含义:
+
+### 方式1:命令行参数
+```bash
+--business-context "高速公路服务区管理系统"
+```
+
+### 方式2:文件方式
+```bash
+--business-context-file business_context.txt
+```
+
+### 方式3:业务词典
+编辑 `schema_tools/prompts/business_dictionary.txt`:
+```text
+BSS - Business Support System,业务支撑系统
+SA - Service Area,服务区
+POS - Point of Sale,销售点
+```
+
+## 高级功能
+
+### 1. 自定义系统表过滤
+
+```python
+from schema_tools.utils.system_filter import SystemTableFilter
+
+filter = SystemTableFilter()
+filter.add_custom_prefix("tmp_")      # 添加自定义前缀
+filter.add_custom_schema("temp")      # 添加自定义schema
+```
+
+### 2. 大表智能采样
+
+对于超过100万行的大表,自动使用分层采样策略:
+- 前N行
+- 随机中间行
+- 后N行
+
+### 3. 枚举字段检测
+
+自动检测并验证枚举字段:
+- VARCHAR类型
+- 样例值重复度高
+- 字段名包含类型关键词(状态、类型、级别等)
+
+## 常见问题
+
+### Q: 如何处理只读数据库?
+A: 工具自动检测并适配只读数据库,不会尝试写操作。
+
+### Q: 如何处理重名表?
+A: 自动生成唯一文件名,如 `hr__users.ddl` 和 `sales__users.ddl`。
+
+### Q: 如何跳过某些表?
+A: 在表清单文件中注释掉(使用 # 开头)或删除相应行。
+
+### Q: LLM调用失败怎么办?
+A: 自动重试3次,失败后使用原始注释或默认值。
+
+## 注意事项
+
+1. **数据库权限**:至少需要SELECT权限
+2. **LLM配置**:复用项目的vanna实例配置
+3. **并发控制**:默认最大3个表并发,可调整
+4. **内存使用**:大表采样会限制数据量
+
+## 开发与扩展
+
+### 添加新工具
+
+1. 创建工具类:
+```python
+from schema_tools.tools.base import BaseTool, ToolRegistry
+
+@ToolRegistry.register("my_tool")
+class MyTool(BaseTool):
+    needs_llm = False
+    tool_name = "我的工具"
+    
+    async def execute(self, context):
+        # 实现工具逻辑
+        return ProcessingResult(success=True)
+```
+
+2. 添加到处理链:
+```python
+"my_pipeline": [
+    "database_inspector",
+    "my_tool",
+    "ddl_generator"
+]
+```
+
+## 许可证
+
+本工具作为VANNA-CHAINLIT-CHROMADB项目的一部分,遵循项目许可证。

+ 15 - 0
schema_tools/__init__.py

@@ -0,0 +1,15 @@
+"""
+Schema Tools - 自动化数据库逆向工程工具
+用于从PostgreSQL数据库生成vanna.ai格式的训练数据(DDL和MD文档)
+"""
+
+from .training_data_agent import SchemaTrainingDataAgent
+from .config import SCHEMA_TOOLS_CONFIG, get_config, update_config
+
+__version__ = "1.0.0"
+__all__ = [
+    "SchemaTrainingDataAgent",
+    "SCHEMA_TOOLS_CONFIG", 
+    "get_config",
+    "update_config"
+]

+ 243 - 0
schema_tools/__main__.py

@@ -0,0 +1,243 @@
+import argparse
+import asyncio
+import sys
+import os
+import logging
+from pathlib import Path
+
+def setup_argument_parser():
+    """设置命令行参数解析器"""
+    parser = argparse.ArgumentParser(
+        description='Schema Tools - 自动生成数据库训练数据',
+        formatter_class=argparse.RawDescriptionHelpFormatter,
+        epilog="""
+示例用法:
+  # 基本使用
+  python -m schema_tools --db-connection "postgresql://user:pass@host:5432/db" --table-list tables.txt
+  
+  # 指定业务上下文和输出目录
+  python -m schema_tools --db-connection "..." --table-list tables.txt --business-context "电商系统" --output-dir output
+  
+  # 仅生成DDL文件
+  python -m schema_tools --db-connection "..." --table-list tables.txt --pipeline ddl_only
+  
+  # 权限检查模式
+  python -m schema_tools --db-connection "..." --check-permissions-only
+        """
+    )
+    
+    # 必需参数
+    parser.add_argument(
+        '--db-connection',
+        required=True,
+        help='数据库连接字符串 (例如: postgresql://user:pass@localhost:5432/dbname)'
+    )
+    
+    # 可选参数
+    parser.add_argument(
+        '--table-list',
+        help='表清单文件路径'
+    )
+    
+    parser.add_argument(
+        '--business-context',
+        help='业务上下文描述'
+    )
+    
+    parser.add_argument(
+        '--business-context-file',
+        help='业务上下文文件路径'
+    )
+    
+    parser.add_argument(
+        '--output-dir',
+        help='输出目录路径'
+    )
+    
+    parser.add_argument(
+        '--pipeline',
+        choices=['full', 'ddl_only', 'analysis_only'],
+        help='处理链类型'
+    )
+    
+    parser.add_argument(
+        '--max-concurrent',
+        type=int,
+        help='最大并发表数量'
+    )
+    
+    # 功能开关
+    parser.add_argument(
+        '--no-filter-system-tables',
+        action='store_true',
+        help='禁用系统表过滤'
+    )
+    
+    parser.add_argument(
+        '--check-permissions-only',
+        action='store_true',
+        help='仅检查数据库权限,不处理表'
+    )
+    
+    parser.add_argument(
+        '--verbose', '-v',
+        action='store_true',
+        help='启用详细日志输出'
+    )
+    
+    parser.add_argument(
+        '--log-file',
+        help='日志文件路径'
+    )
+    
+    return parser
+
+def load_config_with_overrides(args):
+    """加载配置并应用命令行覆盖"""
+    from schema_tools.config import SCHEMA_TOOLS_CONFIG
+    
+    config = SCHEMA_TOOLS_CONFIG.copy()
+    
+    # 命令行参数覆盖配置
+    if args.output_dir:
+        config["output_directory"] = args.output_dir
+    
+    if args.pipeline:
+        config["default_pipeline"] = args.pipeline
+    
+    if args.max_concurrent:
+        config["max_concurrent_tables"] = args.max_concurrent
+    
+    if args.no_filter_system_tables:
+        config["filter_system_tables"] = False
+    
+    if args.log_file:
+        config["log_file"] = args.log_file
+    
+    return config
+
+def load_business_context(args):
+    """加载业务上下文"""
+    if args.business_context_file:
+        try:
+            with open(args.business_context_file, 'r', encoding='utf-8') as f:
+                return f.read().strip()
+        except Exception as e:
+            print(f"警告: 无法读取业务上下文文件 {args.business_context_file}: {e}")
+    
+    if args.business_context:
+        return args.business_context
+    
+    from schema_tools.config import SCHEMA_TOOLS_CONFIG
+    return SCHEMA_TOOLS_CONFIG.get("default_business_context", "数据库管理系统")
+
+async def check_permissions_only(db_connection: str):
+    """仅检查数据库权限"""
+    from schema_tools.training_data_agent import SchemaTrainingDataAgent
+    
+    print("🔍 检查数据库权限...")
+    
+    try:
+        agent = SchemaTrainingDataAgent(
+            db_connection=db_connection,
+            table_list_file="",  # 不需要表清单
+            business_context=""   # 不需要业务上下文
+        )
+        
+        # 初始化Agent以建立数据库连接
+        await agent._initialize()
+        
+        # 检查权限
+        permissions = await agent.check_database_permissions()
+        
+        print("\n📋 权限检查结果:")
+        print(f"  ✅ 数据库连接: {'可用' if permissions['connect'] else '不可用'}")
+        print(f"  ✅ 元数据查询: {'可用' if permissions['select_metadata'] else '不可用'}")
+        print(f"  ✅ 数据查询: {'可用' if permissions['select_data'] else '不可用'}")
+        print(f"  ℹ️  数据库类型: {'只读' if permissions['is_readonly'] else '读写'}")
+        
+        # 修复判断逻辑:is_readonly=False表示可读写,是好事
+        required_permissions = ['connect', 'select_metadata', 'select_data']
+        has_required_permissions = all(permissions.get(perm, False) for perm in required_permissions)
+        
+        if has_required_permissions:
+            print("\n✅ 数据库权限检查通过,可以开始处理")
+            return True
+        else:
+            print("\n❌ 数据库权限不足,请检查配置")
+            return False
+            
+    except Exception as e:
+        print(f"\n❌ 权限检查失败: {e}")
+        return False
+
+async def main():
+    """主入口函数"""
+    parser = setup_argument_parser()
+    args = parser.parse_args()
+    
+    # 设置日志
+    from schema_tools.utils.logger import setup_logging
+    setup_logging(
+        verbose=args.verbose,
+        log_file=args.log_file
+    )
+    
+    # 仅权限检查模式
+    if args.check_permissions_only:
+        success = await check_permissions_only(args.db_connection)
+        sys.exit(0 if success else 1)
+    
+    # 验证必需参数
+    if not args.table_list:
+        print("错误: 需要指定 --table-list 参数")
+        parser.print_help()
+        sys.exit(1)
+    
+    if not os.path.exists(args.table_list):
+        print(f"错误: 表清单文件不存在: {args.table_list}")
+        sys.exit(1)
+    
+    try:
+        # 加载配置和业务上下文
+        config = load_config_with_overrides(args)
+        business_context = load_business_context(args)
+        
+        # 创建Agent
+        from schema_tools.training_data_agent import SchemaTrainingDataAgent
+        
+        agent = SchemaTrainingDataAgent(
+            db_connection=args.db_connection,
+            table_list_file=args.table_list,
+            business_context=business_context,
+            output_dir=config["output_directory"],
+            pipeline=config["default_pipeline"]
+        )
+        
+        # 执行生成
+        print("🚀 开始生成Schema训练数据...")
+        report = await agent.generate_training_data()
+        
+        # 输出结果
+        if report['summary']['failed'] == 0:
+            print("\n🎉 所有表处理成功!")
+        else:
+            print(f"\n⚠️  处理完成,但有 {report['summary']['failed']} 个表失败")
+        
+        print(f"📁 输出目录: {config['output_directory']}")
+        
+        # 如果有失败的表,返回非零退出码
+        sys.exit(1 if report['summary']['failed'] > 0 else 0)
+        
+    except KeyboardInterrupt:
+        print("\n\n⏹️  用户中断,程序退出")
+        sys.exit(130)
+    except Exception as e:
+        print(f"\n❌ 程序执行失败: {e}")
+        if args.verbose:
+            import traceback
+            traceback.print_exc()
+        sys.exit(1)
+
+if __name__ == "__main__":
+    asyncio.run(main())

+ 139 - 0
schema_tools/config.py

@@ -0,0 +1,139 @@
+import os
+import sys
+
+# 导入app_config获取数据库等配置
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+try:
+    import app_config
+except ImportError:
+    app_config = None
+
+# Schema Tools专用配置
+SCHEMA_TOOLS_CONFIG = {
+    # 核心配置
+    "default_db_connection": None,  # 从命令行指定
+    "default_business_context": "数据库管理系统", 
+    "output_directory": "training/generated_data",
+    
+    # 处理链配置
+    "default_pipeline": "full",
+    "available_pipelines": {
+        "full": [
+            "database_inspector", 
+            "data_sampler", 
+            "comment_generator", 
+            "ddl_generator", 
+            "doc_generator"
+        ],
+        "ddl_only": [
+            "database_inspector", 
+            "data_sampler", 
+            "comment_generator", 
+            "ddl_generator"
+        ],
+        "analysis_only": [
+            "database_inspector", 
+            "data_sampler", 
+            "comment_generator"
+        ]
+    },
+    
+    # 数据处理配置
+    "sample_data_limit": 20,                    # 用于LLM分析的采样数据量
+    "enum_detection_sample_limit": 5000,        # 枚举检测时的采样限制
+    "enum_max_distinct_values": 20,             # 枚举字段最大不同值数量
+    "enum_varchar_keywords": [                  # VARCHAR枚举关键词
+        "性别", "gender", "状态", "status", "类型", "type", 
+        "级别", "level", "方向", "direction", "品类", "classify",
+        "模式", "mode", "格式", "format"
+    ],
+    "large_table_threshold": 1000000,           # 大表阈值(行数)
+    
+    # 并发配置
+    "max_concurrent_tables": 1,                 # 最大并发处理表数(建议保持1,避免LLM并发调用问题)
+    
+    # LLM配置
+    "use_app_config_llm": True,                # 是否使用app_config中的LLM配置
+    "comment_generation_timeout": 30,          # LLM调用超时时间(秒)
+    "max_llm_retries": 3,                      # LLM调用最大重试次数
+    
+    # 系统表过滤配置
+    "filter_system_tables": True,              # 是否过滤系统表
+    "custom_system_prefixes": [],              # 用户自定义系统表前缀
+    "custom_system_schemas": [],               # 用户自定义系统schema
+    
+    # 权限与安全配置
+    "check_permissions": True,                 # 是否检查数据库权限
+    "require_select_permission": True,         # 是否要求SELECT权限
+    "allow_readonly_database": True,           # 是否允许只读数据库
+    
+    # 错误处理配置
+    "continue_on_error": True,                 # 遇到错误是否继续
+    "max_table_failures": 5,                  # 最大允许失败表数
+    "skip_large_tables": False,               # 是否跳过超大表
+    "max_table_size": 10000000,               # 最大表行数限制
+    
+    # 文件配置
+    "ddl_file_suffix": ".ddl",
+    "doc_file_suffix": "_detail.md",
+    "log_file": "schema_tools.log",
+    "create_subdirectories": True,            # 是否创建ddl/docs子目录
+    
+    # 输出格式配置
+    "include_sample_data_in_comments": True,  # 注释中是否包含示例数据
+    "max_comment_length": 500,                # 最大注释长度
+    "include_field_statistics": True,         # 是否包含字段统计信息
+    
+    # 调试配置
+    "debug_mode": False,                      # 调试模式
+    "save_llm_prompts": False,               # 是否保存LLM提示词
+    "save_llm_responses": False,             # 是否保存LLM响应
+}
+
+# 从app_config获取相关配置(如果可用)
+if app_config:
+    # 继承数据库配置
+    if hasattr(app_config, 'PGVECTOR_CONFIG'):
+        pgvector_config = app_config.PGVECTOR_CONFIG
+        if not SCHEMA_TOOLS_CONFIG["default_db_connection"]:
+            SCHEMA_TOOLS_CONFIG["default_db_connection"] = (
+                f"postgresql://{pgvector_config['user']}:{pgvector_config['password']}"
+                f"@{pgvector_config['host']}:{pgvector_config['port']}/{pgvector_config['dbname']}"
+            )
+
+def get_config():
+    """获取当前配置"""
+    return SCHEMA_TOOLS_CONFIG
+
+def update_config(**kwargs):
+    """更新配置"""
+    SCHEMA_TOOLS_CONFIG.update(kwargs)
+
+def validate_config():
+    """验证配置有效性"""
+    errors = []
+    
+    # 检查必要配置
+    if SCHEMA_TOOLS_CONFIG["max_concurrent_tables"] <= 0:
+        errors.append("max_concurrent_tables 必须大于0")
+    
+    if SCHEMA_TOOLS_CONFIG["sample_data_limit"] <= 0:
+        errors.append("sample_data_limit 必须大于0")
+    
+    # 检查处理链配置
+    default_pipeline = SCHEMA_TOOLS_CONFIG["default_pipeline"]
+    available_pipelines = SCHEMA_TOOLS_CONFIG["available_pipelines"]
+    
+    if default_pipeline not in available_pipelines:
+        errors.append(f"default_pipeline '{default_pipeline}' 不在 available_pipelines 中")
+    
+    if errors:
+        raise ValueError("配置验证失败:\n" + "\n".join(f"  - {error}" for error in errors))
+    
+    return True
+
+# 启动时验证配置
+try:
+    validate_config()
+except ValueError as e:
+    print(f"警告: {e}")

+ 3 - 0
schema_tools/prompts/__init__.py

@@ -0,0 +1,3 @@
+"""
+提示词模板目录
+"""

+ 10 - 0
schema_tools/prompts/business_dictionary.txt

@@ -0,0 +1,10 @@
+# 业务词典
+# 在此文件中定义业务相关的专有名词和缩写,帮助LLM更好地理解业务上下文
+
+# 示例:
+# BSS - Business Support System,业务支撑系统
+# SA - Service Area,服务区
+# POS - Point of Sale,销售点
+# SKU - Stock Keeping Unit,库存单位
+
+# 请根据实际业务添加词典内容...

+ 7 - 0
schema_tools/tables.txt

@@ -0,0 +1,7 @@
+# 示例表清单文件
+# 每行一个表名,支持 schema.table 格式
+# 以 # 开头的行为注释
+
+# 服务区相关表
+public.bss_car_day_count
+

+ 135 - 0
schema_tools/test_schema_tools.py

@@ -0,0 +1,135 @@
+"""
+测试Schema Tools模块
+"""
+import asyncio
+import os
+import sys
+from pathlib import Path
+
+# 添加项目根目录到Python路径
+sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
+
+async def test_basic_functionality():
+    """测试基本功能"""
+    print("===== 测试 Schema Tools =====")
+    
+    # 1. 测试配置
+    from schema_tools.config import SCHEMA_TOOLS_CONFIG, validate_config
+    print("\n1. 测试配置验证...")
+    try:
+        validate_config()
+        print("✅ 配置验证通过")
+    except Exception as e:
+        print(f"❌ 配置验证失败: {e}")
+        return
+    
+    # 2. 测试工具注册
+    from schema_tools.tools import ToolRegistry
+    print("\n2. 已注册的工具:")
+    tools = ToolRegistry.list_tools()
+    for tool in tools:
+        print(f"  - {tool}")
+    
+    # 3. 创建测试表清单文件
+    test_tables_file = "test_tables.txt"
+    with open(test_tables_file, 'w', encoding='utf-8') as f:
+        f.write("# 测试表清单\n")
+        f.write("public.users\n")
+        f.write("public.orders\n")
+        f.write("hr.employees\n")
+    print(f"\n3. 创建测试表清单文件: {test_tables_file}")
+    
+    # 4. 测试权限检查(仅模拟)
+    print("\n4. 测试数据库权限检查...")
+    
+    # 这里需要真实的数据库连接字符串
+    # 从环境变量或app_config获取
+    try:
+        import app_config
+        if hasattr(app_config, 'PGVECTOR_CONFIG'):
+            pg_config = app_config.PGVECTOR_CONFIG
+            db_connection = f"postgresql://{pg_config['user']}:{pg_config['password']}@{pg_config['host']}:{pg_config['port']}/{pg_config['dbname']}"
+            print(f"使用PgVector数据库配置")
+        else:
+            print("⚠️ 未找到数据库配置,跳过权限测试")
+            db_connection = None
+    except:
+        print("⚠️ 无法导入app_config,跳过权限测试")
+        db_connection = None
+    
+    if db_connection:
+        from schema_tools.training_data_agent import SchemaTrainingDataAgent
+        
+        try:
+            agent = SchemaTrainingDataAgent(
+                db_connection=db_connection,
+                table_list_file=test_tables_file,
+                business_context="测试业务系统"
+            )
+            
+            permissions = await agent.check_database_permissions()
+            print(f"数据库权限: {permissions}")
+        except Exception as e:
+            print(f"❌ 权限检查失败: {e}")
+    
+    # 清理测试文件
+    if os.path.exists(test_tables_file):
+        os.remove(test_tables_file)
+    
+    print("\n===== 测试完成 =====")
+
+async def test_table_parser():
+    """测试表清单解析器"""
+    print("\n===== 测试表清单解析器 =====")
+    
+    from schema_tools.utils.table_parser import TableListParser
+    
+    parser = TableListParser()
+    
+    # 测试字符串解析
+    test_cases = [
+        "public.users",
+        "hr.employees,sales.orders",
+        "users\norders\nproducts",
+        "schema.table_name"
+    ]
+    
+    for test_str in test_cases:
+        result = parser.parse_string(test_str)
+        print(f"输入: {repr(test_str)}")
+        print(f"结果: {result}")
+        print()
+
+async def test_system_filter():
+    """测试系统表过滤器"""
+    print("\n===== 测试系统表过滤器 =====")
+    
+    from schema_tools.utils.system_filter import SystemTableFilter
+    
+    filter = SystemTableFilter()
+    
+    test_tables = [
+        "pg_class",
+        "information_schema.tables",
+        "public.users",
+        "hr.employees",
+        "pg_temp_1.temp_table",
+        "my_table"
+    ]
+    
+    for table in test_tables:
+        if '.' in table:
+            schema, name = table.split('.', 1)
+        else:
+            schema, name = 'public', table
+        
+        is_system = filter.is_system_table(schema, name)
+        print(f"{table}: {'系统表' if is_system else '用户表'}")
+
+if __name__ == "__main__":
+    print("Schema Tools 测试脚本\n")
+    
+    # 运行测试
+    asyncio.run(test_basic_functionality())
+    asyncio.run(test_table_parser())
+    asyncio.run(test_system_filter())

+ 20 - 0
schema_tools/tools/__init__.py

@@ -0,0 +1,20 @@
+"""
+Agent工具集
+"""
+
+from .base import BaseTool, ToolRegistry
+from .database_inspector import DatabaseInspectorTool
+from .data_sampler import DataSamplerTool
+from .comment_generator import CommentGeneratorTool
+from .ddl_generator import DDLGeneratorTool
+from .doc_generator import DocGeneratorTool
+
+__all__ = [
+    "BaseTool",
+    "ToolRegistry",
+    "DatabaseInspectorTool",
+    "DataSamplerTool", 
+    "CommentGeneratorTool",
+    "DDLGeneratorTool",
+    "DocGeneratorTool"
+]

+ 161 - 0
schema_tools/tools/base.py

@@ -0,0 +1,161 @@
+import asyncio
+import time
+import logging
+from abc import ABC, abstractmethod
+from typing import Dict, Any, Optional, Type, List
+from schema_tools.utils.data_structures import ProcessingResult, TableProcessingContext
+
+class ToolRegistry:
+    """工具注册管理器"""
+    _tools: Dict[str, Type['BaseTool']] = {}
+    _instances: Dict[str, 'BaseTool'] = {}
+    
+    @classmethod
+    def register(cls, name: str):
+        """装饰器:注册工具"""
+        def decorator(tool_class: Type['BaseTool']):
+            cls._tools[name] = tool_class
+            logging.debug(f"注册工具: {name} -> {tool_class.__name__}")
+            return tool_class
+        return decorator
+    
+    @classmethod
+    def get_tool(cls, name: str, **kwargs) -> 'BaseTool':
+        """获取工具实例,支持单例模式"""
+        if name not in cls._instances:
+            if name not in cls._tools:
+                raise ValueError(f"工具 '{name}' 未注册")
+            
+            tool_class = cls._tools[name]
+            
+            # 自动注入vanna实例到需要LLM的工具
+            if hasattr(tool_class, 'needs_llm') and tool_class.needs_llm:
+                from core.vanna_llm_factory import create_vanna_instance
+                kwargs['vn'] = create_vanna_instance()
+                logging.debug(f"为工具 {name} 注入LLM实例")
+            
+            cls._instances[name] = tool_class(**kwargs)
+        
+        return cls._instances[name]
+    
+    @classmethod
+    def list_tools(cls) -> List[str]:
+        """列出所有已注册的工具"""
+        return list(cls._tools.keys())
+    
+    @classmethod
+    def clear_instances(cls):
+        """清除所有工具实例(用于测试)"""
+        cls._instances.clear()
+
+class BaseTool(ABC):
+    """工具基类"""
+    
+    needs_llm: bool = False  # 是否需要LLM实例
+    tool_name: str = ""      # 工具名称
+    
+    def __init__(self, **kwargs):
+        self.logger = logging.getLogger(f"schema_tools.{self.__class__.__name__}")
+        
+        # 如果工具需要LLM,检查是否已注入
+        if self.needs_llm and 'vn' not in kwargs:
+            raise ValueError(f"工具 {self.__class__.__name__} 需要LLM实例但未提供")
+        
+        # 存储vanna实例
+        if 'vn' in kwargs:
+            self.vn = kwargs['vn']
+    
+    @abstractmethod
+    async def execute(self, context: TableProcessingContext) -> ProcessingResult:
+        """
+        执行工具逻辑
+        Args:
+            context: 表处理上下文
+        Returns:
+            ProcessingResult: 处理结果
+        """
+        pass
+    
+    async def _execute_with_timing(self, context: TableProcessingContext) -> ProcessingResult:
+        """带计时的执行包装器"""
+        start_time = time.time()
+        
+        try:
+            self.logger.info(f"开始执行工具: {self.tool_name}")
+            result = await self.execute(context)
+            execution_time = time.time() - start_time
+            result.execution_time = execution_time
+            
+            if result.success:
+                self.logger.info(f"工具 {self.tool_name} 执行成功,耗时: {execution_time:.2f}秒")
+            else:
+                self.logger.error(f"工具 {self.tool_name} 执行失败: {result.error_message}")
+            
+            return result
+            
+        except Exception as e:
+            execution_time = time.time() - start_time
+            self.logger.exception(f"工具 {self.tool_name} 执行异常")
+            
+            return ProcessingResult(
+                success=False,
+                error_message=f"工具执行异常: {str(e)}",
+                execution_time=execution_time
+            )
+    
+    def validate_input(self, context: TableProcessingContext) -> bool:
+        """输入验证(子类可重写)"""
+        return context.table_metadata is not None
+
+
+class PipelineExecutor:
+    """处理链执行器"""
+    
+    def __init__(self, pipeline_config: Dict[str, List[str]]):
+        self.pipeline_config = pipeline_config
+        self.logger = logging.getLogger("schema_tools.PipelineExecutor")
+    
+    async def execute_pipeline(self, pipeline_name: str, context: TableProcessingContext) -> Dict[str, ProcessingResult]:
+        """执行指定的处理链"""
+        if pipeline_name not in self.pipeline_config:
+            raise ValueError(f"未知的处理链: {pipeline_name}")
+        
+        steps = self.pipeline_config[pipeline_name]
+        results = {}
+        
+        self.logger.info(f"开始执行处理链 '{pipeline_name}': {' -> '.join(steps)}")
+        
+        for step_name in steps:
+            try:
+                tool = ToolRegistry.get_tool(step_name)
+                
+                # 验证输入
+                if not tool.validate_input(context):
+                    result = ProcessingResult(
+                        success=False,
+                        error_message=f"工具 {step_name} 输入验证失败"
+                    )
+                else:
+                    result = await tool._execute_with_timing(context)
+                
+                results[step_name] = result
+                context.update_step(step_name, result)
+                
+                # 如果步骤失败且不允许继续,则停止
+                if not result.success:
+                    from schema_tools.config import SCHEMA_TOOLS_CONFIG
+                    if not SCHEMA_TOOLS_CONFIG["continue_on_error"]:
+                        self.logger.error(f"步骤 {step_name} 失败,停止处理链执行")
+                        break
+                    else:
+                        self.logger.warning(f"步骤 {step_name} 失败,继续执行下一步")
+                
+            except Exception as e:
+                self.logger.exception(f"执行步骤 {step_name} 时发生异常")
+                results[step_name] = ProcessingResult(
+                    success=False,
+                    error_message=f"步骤执行异常: {str(e)}"
+                )
+                break
+        
+        return results

+ 402 - 0
schema_tools/tools/comment_generator.py

@@ -0,0 +1,402 @@
+import asyncio
+from typing import List, Dict, Any, Tuple
+from schema_tools.tools.base import BaseTool, ToolRegistry
+from schema_tools.utils.data_structures import ProcessingResult, TableProcessingContext, FieldInfo
+
+@ToolRegistry.register("comment_generator")
+class CommentGeneratorTool(BaseTool):
+    """LLM注释生成工具"""
+    
+    needs_llm = True
+    tool_name = "注释生成器"
+    
+    def __init__(self, **kwargs):
+        super().__init__(**kwargs)
+        self.business_context = kwargs.get('business_context', '')
+        self.business_dictionary = self._load_business_dictionary()
+    
+    async def execute(self, context: TableProcessingContext) -> ProcessingResult:
+        """执行注释生成"""
+        try:
+            table_metadata = context.table_metadata
+            
+            # 生成表注释
+            table_comment_result = await self._generate_table_comment(table_metadata, context.business_context)
+            
+            # 生成字段注释和枚举建议
+            field_results = await self._generate_field_comments_and_enums(table_metadata, context.business_context)
+            
+            # 更新表元数据
+            if table_comment_result['success']:
+                table_metadata.generated_comment = table_comment_result['comment']
+                table_metadata.comment = table_comment_result['comment']
+            
+            # 更新字段信息
+            enum_suggestions = []
+            for i, field in enumerate(table_metadata.fields):
+                if i < len(field_results) and field_results[i]['success']:
+                    field.generated_comment = field_results[i]['comment']
+                    field.comment = field_results[i]['comment']
+                    
+                    # 处理枚举建议
+                    if field_results[i].get('is_enum'):
+                        field.is_enum = True
+                        enum_suggestions.append({
+                            'field_name': field.name,
+                            'suggested_values': field_results[i].get('enum_values', []),
+                            'enum_description': field_results[i].get('enum_description', '')
+                        })
+            
+            # 验证枚举建议
+            if enum_suggestions:
+                validated_enums = await self._validate_enum_suggestions(table_metadata, enum_suggestions)
+                
+                # 更新验证后的枚举信息
+                for enum_info in validated_enums:
+                    field_name = enum_info['field_name']
+                    for field in table_metadata.fields:
+                        if field.name == field_name:
+                            field.enum_values = enum_info['actual_values']
+                            field.enum_description = enum_info['description']
+                            break
+            
+            return ProcessingResult(
+                success=True,
+                data={
+                    'table_comment_generated': table_comment_result['success'],
+                    'field_comments_generated': sum(1 for r in field_results if r['success']),
+                    'enum_fields_detected': len([f for f in table_metadata.fields if f.is_enum]),
+                    'enum_suggestions': enum_suggestions
+                },
+                metadata={'tool': self.tool_name}
+            )
+            
+        except Exception as e:
+            self.logger.exception(f"注释生成失败")
+            return ProcessingResult(
+                success=False,
+                error_message=f"注释生成失败: {str(e)}"
+            )
+    
+    async def _generate_table_comment(self, table_metadata, business_context: str) -> Dict[str, Any]:
+        """生成表注释"""
+        try:
+            prompt = self._build_table_comment_prompt(table_metadata, business_context)
+            
+            # 调用LLM
+            response = await self._call_llm_with_retry(prompt)
+            
+            # 解析响应
+            comment = self._extract_table_comment(response)
+            
+            return {
+                'success': True,
+                'comment': comment,
+                'original_response': response
+            }
+            
+        except Exception as e:
+            self.logger.error(f"表注释生成失败: {e}")
+            return {
+                'success': False,
+                'comment': table_metadata.original_comment or f"{table_metadata.table_name}表",
+                'error': str(e)
+            }
+    
+    async def _generate_field_comments_and_enums(self, table_metadata, business_context: str) -> List[Dict[str, Any]]:
+        """批量生成字段注释和枚举建议"""
+        try:
+            # 构建批量处理的提示词
+            prompt = self._build_field_batch_prompt(table_metadata, business_context)
+            
+            # 调用LLM
+            response = await self._call_llm_with_retry(prompt)
+            
+            # 解析批量响应
+            field_results = self._parse_field_batch_response(response, table_metadata.fields)
+            
+            return field_results
+            
+        except Exception as e:
+            self.logger.error(f"字段注释批量生成失败: {e}")
+            # 返回默认结果
+            return [
+                {
+                    'success': False,
+                    'comment': field.original_comment or field.name,
+                    'is_enum': False,
+                    'error': str(e)
+                }
+                for field in table_metadata.fields
+            ]
+    
+    def _build_table_comment_prompt(self, table_metadata, business_context: str) -> str:
+        """构建表注释生成提示词"""
+        # 准备字段信息摘要
+        fields_summary = []
+        for field in table_metadata.fields[:10]:  # 只显示前10个字段避免过长
+            field_desc = f"- {field.name} ({field.type})"
+            if field.comment:
+                field_desc += f": {field.comment}"
+            fields_summary.append(field_desc)
+        
+        # 准备样例数据摘要
+        sample_summary = ""
+        if table_metadata.sample_data:
+            sample_count = min(3, len(table_metadata.sample_data))
+            sample_summary = f"\n样例数据({sample_count}条):\n"
+            for i, sample in enumerate(table_metadata.sample_data[:sample_count]):
+                sample_str = ", ".join([f"{k}={v}" for k, v in list(sample.items())[:5]])
+                sample_summary += f"{i+1}. {sample_str}\n"
+        
+        prompt = f"""你是一个数据库文档专家。请根据以下信息为数据库表生成简洁、准确的中文注释。
+
+业务背景: {business_context}
+{self.business_dictionary}
+
+表信息:
+- 表名: {table_metadata.table_name}
+- Schema: {table_metadata.schema_name}
+- 现有注释: {table_metadata.original_comment or "无"}
+- 字段数量: {len(table_metadata.fields)}
+- 数据行数: {table_metadata.row_count or "未知"}
+
+主要字段:
+{chr(10).join(fields_summary)}
+
+{sample_summary}
+
+请生成一个简洁、准确的中文表注释,要求:
+1. 如果现有注释是英文,请翻译为中文并改进
+2. 根据字段名称和样例数据推断表的业务用途
+3. 注释长度控制在50字以内
+4. 突出表的核心业务价值
+
+表注释:"""
+        
+        return prompt
+    
+    def _build_field_batch_prompt(self, table_metadata, business_context: str) -> str:
+        """构建字段批量处理提示词"""
+        # 准备字段信息
+        fields_info = []
+        sample_values = {}
+        
+        # 收集字段的样例值
+        for sample in table_metadata.sample_data[:5]:
+            for field_name, value in sample.items():
+                if field_name not in sample_values:
+                    sample_values[field_name] = []
+                if value is not None and len(sample_values[field_name]) < 5:
+                    sample_values[field_name].append(str(value))
+        
+        # 构建字段信息列表
+        for field in table_metadata.fields:
+            field_info = f"{field.name} ({field.type})"
+            if field.original_comment:
+                field_info += f" - 原注释: {field.original_comment}"
+            
+            # 添加样例值
+            if field.name in sample_values and sample_values[field.name]:
+                values_str = ", ".join(sample_values[field.name][:3])
+                field_info += f" - 样例值: {values_str}"
+            
+            fields_info.append(field_info)
+        
+        prompt = f"""你是一个数据库文档专家。请为以下表的所有字段生成中文注释,并识别可能的枚举字段。
+
+业务背景: {business_context}
+{self.business_dictionary}
+
+表名: {table_metadata.schema_name}.{table_metadata.table_name}
+表注释: {table_metadata.comment or "无"}
+
+字段列表:
+{chr(10).join([f"{i+1}. {info}" for i, info in enumerate(fields_info)])}
+
+请按以下JSON格式输出每个字段的分析结果:
+```json
+{{
+  "fields": [
+    {{
+      "name": "字段名",
+      "comment": "中文注释(简洁明确,15字以内)",
+      "is_enum": true/false,
+      "enum_values": ["值1", "值2", "值3"] (如果是枚举),
+      "enum_description": "枚举含义说明" (如果是枚举)
+    }}
+  ]
+}}
+```
+
+注释生成要求:
+1. 如果原注释是英文,翻译为中文并改进
+2. 根据字段名、类型和样例值推断字段含义
+3. 识别可能的枚举字段(如状态、类型、级别等)
+4. 枚举判断标准: VARCHAR类型 + 样例值重复度高 + 字段名暗示分类
+5. 注释要贴近{business_context}的业务场景
+
+请输出JSON格式的分析结果:"""
+        
+        return prompt
+    
+    async def _call_llm_with_retry(self, prompt: str, max_retries: int = 3) -> str:
+        """带重试的LLM调用"""
+        from schema_tools.config import SCHEMA_TOOLS_CONFIG
+        
+        for attempt in range(max_retries):
+            try:
+                # 使用vanna实例的chat_with_llm方法进行自由聊天
+                # 这是专门用于生成训练数据的方法,不会查询向量数据库
+                response = await asyncio.to_thread(
+                    self.vn.chat_with_llm, 
+                    question=prompt,
+                    system_prompt="你是一个专业的数据库文档专家,专门负责生成高质量的中文数据库表和字段注释。"
+                )
+                
+                if response and response.strip():
+                    return response.strip()
+                else:
+                    raise ValueError("LLM返回空响应")
+                    
+            except Exception as e:
+                self.logger.warning(f"LLM调用失败 (尝试 {attempt + 1}/{max_retries}): {e}")
+                if attempt == max_retries - 1:
+                    raise
+                await asyncio.sleep(1)  # 等待1秒后重试
+        
+        raise Exception("LLM调用达到最大重试次数")
+    
+    def _extract_table_comment(self, llm_response: str) -> str:
+        """从LLM响应中提取表注释"""
+        # 简单的文本清理和提取逻辑
+        lines = llm_response.strip().split('\n')
+        
+        # 查找包含实际注释的行
+        for line in lines:
+            line = line.strip()
+            if line and not line.startswith('#') and not line.startswith('*'):
+                # 移除可能的前缀
+                prefixes = ['表注释:', '注释:', '说明:', '表说明:']
+                for prefix in prefixes:
+                    if line.startswith(prefix):
+                        line = line[len(prefix):].strip()
+                
+                if line:
+                    return line[:200]  # 限制长度
+        
+        return llm_response.strip()[:200]
+    
+    def _parse_field_batch_response(self, llm_response: str, fields: List[FieldInfo]) -> List[Dict[str, Any]]:
+        """解析字段批量处理响应"""
+        import json
+        import re
+        
+        try:
+            # 尝试提取JSON部分
+            json_match = re.search(r'```json\s*(.*?)\s*```', llm_response, re.DOTALL)
+            if json_match:
+                json_str = json_match.group(1)
+            else:
+                # 如果没有代码块,尝试直接解析
+                json_str = llm_response
+            
+            # 解析JSON
+            parsed_data = json.loads(json_str)
+            field_data = parsed_data.get('fields', [])
+            
+            # 映射到字段结果
+            results = []
+            for i, field in enumerate(fields):
+                if i < len(field_data):
+                    data = field_data[i]
+                    results.append({
+                        'success': True,
+                        'comment': data.get('comment', field.name),
+                        'is_enum': data.get('is_enum', False),
+                        'enum_values': data.get('enum_values', []),
+                        'enum_description': data.get('enum_description', '')
+                    })
+                else:
+                    # 默认结果
+                    results.append({
+                        'success': False,
+                        'comment': field.original_comment or field.name,
+                        'is_enum': False
+                    })
+            
+            return results
+            
+        except Exception as e:
+            self.logger.error(f"解析字段批量响应失败: {e}")
+            # 返回默认结果
+            return [
+                {
+                    'success': False,
+                    'comment': field.original_comment or field.name,
+                    'is_enum': False,
+                    'error': str(e)
+                }
+                for field in fields
+            ]
+    
+    async def _validate_enum_suggestions(self, table_metadata, enum_suggestions: List[Dict]) -> List[Dict]:
+        """验证枚举建议"""
+        from schema_tools.tools.database_inspector import DatabaseInspectorTool
+        from schema_tools.config import SCHEMA_TOOLS_CONFIG
+        
+        validated_enums = []
+        inspector = ToolRegistry.get_tool("database_inspector")
+        sample_limit = SCHEMA_TOOLS_CONFIG["enum_detection_sample_limit"]
+        
+        for enum_info in enum_suggestions:
+            field_name = enum_info['field_name']
+            
+            try:
+                # 查询字段的所有不同值
+                query = f"""
+                SELECT DISTINCT {field_name} as value, COUNT(*) as count
+                FROM {table_metadata.full_name}
+                WHERE {field_name} IS NOT NULL
+                GROUP BY {field_name}
+                ORDER BY count DESC
+                LIMIT {sample_limit}
+                """
+                
+                async with inspector.connection_pool.acquire() as conn:
+                    rows = await conn.fetch(query)
+                    
+                    actual_values = [str(row['value']) for row in rows]
+                    
+                    # 验证是否真的是枚举(不同值数量合理)
+                    max_enum_values = SCHEMA_TOOLS_CONFIG["enum_max_distinct_values"]
+                    if len(actual_values) <= max_enum_values:
+                        validated_enums.append({
+                            'field_name': field_name,
+                            'actual_values': actual_values,
+                            'suggested_values': enum_info['suggested_values'],
+                            'description': enum_info['enum_description'],
+                            'value_counts': [(row['value'], row['count']) for row in rows]
+                        })
+                        self.logger.info(f"确认字段 {field_name} 为枚举类型,包含 {len(actual_values)} 个值")
+                    else:
+                        self.logger.info(f"字段 {field_name} 不同值过多({len(actual_values)}),不认为是枚举")
+                        
+            except Exception as e:
+                self.logger.warning(f"验证字段 {field_name} 的枚举建议失败: {e}")
+        
+        return validated_enums
+    
+    def _load_business_dictionary(self) -> str:
+        """加载业务词典"""
+        try:
+            import os
+            dict_file = os.path.join(os.path.dirname(__file__), '..', 'prompts', 'business_dictionary.txt')
+            if os.path.exists(dict_file):
+                with open(dict_file, 'r', encoding='utf-8') as f:
+                    content = f.read().strip()
+                    return f"\n业务词典:\n{content}\n" if content else ""
+            return ""
+        except Exception as e:
+            self.logger.warning(f"加载业务词典失败: {e}")
+            return ""

+ 122 - 0
schema_tools/tools/data_sampler.py

@@ -0,0 +1,122 @@
+import random
+from typing import List, Dict, Any
+from schema_tools.tools.base import BaseTool, ToolRegistry
+from schema_tools.utils.data_structures import ProcessingResult, TableProcessingContext, TableMetadata
+
+@ToolRegistry.register("data_sampler")
+class DataSamplerTool(BaseTool):
+    """数据采样工具"""
+    
+    needs_llm = False
+    tool_name = "数据采样器"
+    
+    def __init__(self, **kwargs):
+        super().__init__(**kwargs)
+        self.db_connection = kwargs.get('db_connection')
+    
+    async def execute(self, context: TableProcessingContext) -> ProcessingResult:
+        """执行数据采样"""
+        try:
+            from schema_tools.config import SCHEMA_TOOLS_CONFIG
+            
+            table_metadata = context.table_metadata
+            sample_limit = SCHEMA_TOOLS_CONFIG["sample_data_limit"]
+            large_table_threshold = SCHEMA_TOOLS_CONFIG["large_table_threshold"]
+            
+            # 判断是否为大表,使用不同的采样策略
+            if table_metadata.row_count and table_metadata.row_count > large_table_threshold:
+                sample_data = await self._smart_sample_large_table(table_metadata, sample_limit)
+                self.logger.info(f"大表 {table_metadata.full_name} 使用智能采样策略")
+            else:
+                sample_data = await self._simple_sample(table_metadata, sample_limit)
+            
+            # 更新上下文中的采样数据
+            context.table_metadata.sample_data = sample_data
+            
+            return ProcessingResult(
+                success=True,
+                data={
+                    'sample_count': len(sample_data),
+                    'sampling_strategy': 'smart' if table_metadata.row_count and table_metadata.row_count > large_table_threshold else 'simple'
+                },
+                metadata={'tool': self.tool_name}
+            )
+            
+        except Exception as e:
+            self.logger.exception(f"数据采样失败")
+            return ProcessingResult(
+                success=False,
+                error_message=f"数据采样失败: {str(e)}"
+            )
+    
+    async def _simple_sample(self, table_metadata: TableMetadata, limit: int) -> List[Dict[str, Any]]:
+        """简单采样策略"""
+        from schema_tools.tools.database_inspector import DatabaseInspectorTool
+        
+        # 复用数据库检查工具的连接
+        inspector = ToolRegistry.get_tool("database_inspector")
+        
+        query = f"SELECT * FROM {table_metadata.full_name} LIMIT {limit}"
+        
+        async with inspector.connection_pool.acquire() as conn:
+            rows = await conn.fetch(query)
+            return [dict(row) for row in rows]
+    
+    async def _smart_sample_large_table(self, table_metadata: TableMetadata, limit: int) -> List[Dict[str, Any]]:
+        """智能采样策略(用于大表)"""
+        from schema_tools.tools.database_inspector import DatabaseInspectorTool
+        
+        inspector = ToolRegistry.get_tool("database_inspector")
+        samples_per_section = max(1, limit // 3)
+        
+        samples = []
+        
+        async with inspector.connection_pool.acquire() as conn:
+            # 1. 前N行采样
+            front_query = f"SELECT * FROM {table_metadata.full_name} LIMIT {samples_per_section}"
+            front_rows = await conn.fetch(front_query)
+            samples.extend([dict(row) for row in front_rows])
+            
+            # 2. 随机中间采样(使用TABLESAMPLE)
+            if table_metadata.row_count > samples_per_section * 2:
+                try:
+                    # 计算采样百分比
+                    sample_percent = min(1.0, (samples_per_section * 100.0) / table_metadata.row_count)
+                    middle_query = f"""
+                    SELECT * FROM {table_metadata.full_name} 
+                    TABLESAMPLE SYSTEM({sample_percent}) 
+                    LIMIT {samples_per_section}
+                    """
+                    middle_rows = await conn.fetch(middle_query)
+                    samples.extend([dict(row) for row in middle_rows])
+                except Exception as e:
+                    self.logger.warning(f"TABLESAMPLE采样失败,使用OFFSET采样: {e}")
+                    # 回退到OFFSET采样
+                    offset = random.randint(samples_per_section, table_metadata.row_count - samples_per_section)
+                    offset_query = f"SELECT * FROM {table_metadata.full_name} OFFSET {offset} LIMIT {samples_per_section}"
+                    offset_rows = await conn.fetch(offset_query)
+                    samples.extend([dict(row) for row in offset_rows])
+            
+            # 3. 后N行采样
+            remaining = limit - len(samples)
+            if remaining > 0:
+                # 使用ORDER BY ... DESC来获取最后的行
+                tail_query = f"""
+                SELECT * FROM (
+                    SELECT *, ROW_NUMBER() OVER() as rn 
+                    FROM {table_metadata.full_name}
+                ) sub 
+                WHERE sub.rn > (SELECT COUNT(*) FROM {table_metadata.full_name}) - {remaining}
+                ORDER BY sub.rn
+                """
+                try:
+                    tail_rows = await conn.fetch(tail_query)
+                    # 移除ROW_NUMBER列
+                    for row in tail_rows:
+                        row_dict = dict(row)
+                        row_dict.pop('rn', None)
+                        samples.append(row_dict)
+                except Exception as e:
+                    self.logger.warning(f"尾部采样失败: {e}")
+        
+        return samples[:limit]  # 确保不超过限制

+ 210 - 0
schema_tools/tools/database_inspector.py

@@ -0,0 +1,210 @@
+import asyncio
+import asyncpg
+from typing import List, Dict, Any, Optional
+from schema_tools.tools.base import BaseTool, ToolRegistry
+from schema_tools.utils.data_structures import ProcessingResult, TableProcessingContext, FieldInfo, TableMetadata
+
+@ToolRegistry.register("database_inspector")
+class DatabaseInspectorTool(BaseTool):
+    """数据库元数据检查工具"""
+    
+    needs_llm = False
+    tool_name = "数据库检查器"
+    
+    def __init__(self, **kwargs):
+        super().__init__(**kwargs)
+        self.db_connection = kwargs.get('db_connection')
+        self.connection_pool = None
+    
+    async def execute(self, context: TableProcessingContext) -> ProcessingResult:
+        """执行数据库元数据检查"""
+        try:
+            # 建立数据库连接
+            if not self.connection_pool:
+                await self._create_connection_pool()
+            
+            table_name = context.table_metadata.table_name
+            schema_name = context.table_metadata.schema_name
+            
+            # 获取表的基本信息
+            table_info = await self._get_table_info(schema_name, table_name)
+            if not table_info:
+                return ProcessingResult(
+                    success=False,
+                    error_message=f"表 {schema_name}.{table_name} 不存在或无权限访问"
+                )
+            
+            # 获取字段信息
+            fields = await self._get_table_fields(schema_name, table_name)
+            
+            # 获取表注释
+            table_comment = await self._get_table_comment(schema_name, table_name)
+            
+            # 获取表统计信息
+            stats = await self._get_table_statistics(schema_name, table_name)
+            
+            # 更新表元数据
+            context.table_metadata.original_comment = table_comment
+            context.table_metadata.comment = table_comment
+            context.table_metadata.fields = fields
+            context.table_metadata.row_count = stats.get('row_count')
+            context.table_metadata.table_size = stats.get('table_size')
+            
+            return ProcessingResult(
+                success=True,
+                data={
+                    'fields_count': len(fields),
+                    'table_comment': table_comment,
+                    'row_count': stats.get('row_count'),
+                    'table_size': stats.get('table_size')
+                },
+                metadata={'tool': self.tool_name}
+            )
+            
+        except Exception as e:
+            self.logger.exception(f"数据库检查失败")
+            return ProcessingResult(
+                success=False,
+                error_message=f"数据库检查失败: {str(e)}"
+            )
+    
+    async def _create_connection_pool(self):
+        """创建数据库连接池"""
+        try:
+            self.connection_pool = await asyncpg.create_pool(
+                self.db_connection,
+                min_size=1,
+                max_size=5,
+                command_timeout=30
+            )
+            self.logger.info("数据库连接池创建成功")
+        except Exception as e:
+            self.logger.error(f"创建数据库连接池失败: {e}")
+            raise
+    
+    async def _get_table_info(self, schema_name: str, table_name: str) -> Optional[Dict]:
+        """获取表基本信息"""
+        query = """
+        SELECT schemaname, tablename, tableowner, tablespace, hasindexes, hasrules, hastriggers
+        FROM pg_tables 
+        WHERE schemaname = $1 AND tablename = $2
+        """
+        async with self.connection_pool.acquire() as conn:
+            result = await conn.fetchrow(query, schema_name, table_name)
+            return dict(result) if result else None
+    
+    async def _get_table_fields(self, schema_name: str, table_name: str) -> List[FieldInfo]:
+        """获取表字段信息"""
+        query = """
+        SELECT 
+            c.column_name,
+            c.data_type,
+            c.is_nullable,
+            c.column_default,
+            c.character_maximum_length,
+            c.numeric_precision,
+            c.numeric_scale,
+            pd.description as column_comment,
+            CASE WHEN pk.column_name IS NOT NULL THEN true ELSE false END as is_primary_key,
+            CASE WHEN fk.column_name IS NOT NULL THEN true ELSE false END as is_foreign_key
+        FROM information_schema.columns c
+        LEFT JOIN pg_description pd ON pd.objsubid = c.ordinal_position 
+            AND pd.objoid = (
+                SELECT oid FROM pg_class 
+                WHERE relname = c.table_name 
+                AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = c.table_schema)
+            )
+        LEFT JOIN (
+            SELECT ku.column_name
+            FROM information_schema.table_constraints tc
+            JOIN information_schema.key_column_usage ku ON tc.constraint_name = ku.constraint_name
+            WHERE tc.table_schema = $1 AND tc.table_name = $2 AND tc.constraint_type = 'PRIMARY KEY'
+        ) pk ON pk.column_name = c.column_name
+        LEFT JOIN (
+            SELECT ku.column_name
+            FROM information_schema.table_constraints tc
+            JOIN information_schema.key_column_usage ku ON tc.constraint_name = ku.constraint_name
+            WHERE tc.table_schema = $1 AND tc.table_name = $2 AND tc.constraint_type = 'FOREIGN KEY'
+        ) fk ON fk.column_name = c.column_name
+        WHERE c.table_schema = $1 AND c.table_name = $2
+        ORDER BY c.ordinal_position
+        """
+        
+        fields = []
+        async with self.connection_pool.acquire() as conn:
+            rows = await conn.fetch(query, schema_name, table_name)
+            
+            for row in rows:
+                field = FieldInfo(
+                    name=row['column_name'],
+                    type=row['data_type'],
+                    nullable=row['is_nullable'] == 'YES',
+                    default_value=row['column_default'],
+                    original_comment=row['column_comment'],
+                    comment=row['column_comment'],
+                    is_primary_key=row['is_primary_key'],
+                    is_foreign_key=row['is_foreign_key'],
+                    max_length=row['character_maximum_length'],
+                    precision=row['numeric_precision'],
+                    scale=row['numeric_scale']
+                )
+                fields.append(field)
+        
+        return fields
+    
+    async def _get_table_comment(self, schema_name: str, table_name: str) -> Optional[str]:
+        """获取表注释"""
+        query = """
+        SELECT obj_description(oid) as table_comment
+        FROM pg_class 
+        WHERE relname = $2 
+        AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = $1)
+        """
+        async with self.connection_pool.acquire() as conn:
+            result = await conn.fetchval(query, schema_name, table_name)
+            return result
+    
+    async def _get_table_statistics(self, schema_name: str, table_name: str) -> Dict[str, Any]:
+        """获取表统计信息"""
+        stats_query = """
+        SELECT 
+            schemaname,
+            tablename,
+            attname,
+            n_distinct,
+            most_common_vals,
+            most_common_freqs,
+            histogram_bounds
+        FROM pg_stats 
+        WHERE schemaname = $1 AND tablename = $2
+        """
+        
+        size_query = """
+        SELECT pg_size_pretty(pg_total_relation_size($1::oid)) as table_size,
+               pg_relation_size($1::oid) as table_size_bytes
+        """
+        
+        count_query = f"SELECT COUNT(*) as row_count FROM {schema_name}.{table_name}"
+        
+        stats = {}
+        async with self.connection_pool.acquire() as conn:
+            try:
+                # 获取行数
+                row_count = await conn.fetchval(count_query)
+                stats['row_count'] = row_count
+                
+                # 获取表大小
+                table_oid = await conn.fetchval(
+                    "SELECT oid FROM pg_class WHERE relname = $1 AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = $2)",
+                    table_name, schema_name
+                )
+                if table_oid:
+                    # 确保 table_oid 作为整数传递
+                    size_result = await conn.fetchrow(size_query, int(table_oid))
+                    stats['table_size'] = size_result['table_size']
+                    stats['table_size_bytes'] = size_result['table_size_bytes']
+                
+            except Exception as e:
+                self.logger.warning(f"获取表统计信息失败: {e}")
+        
+        return stats

+ 240 - 0
schema_tools/tools/ddl_generator.py

@@ -0,0 +1,240 @@
+import os
+from typing import List, Dict, Any
+from schema_tools.tools.base import BaseTool, ToolRegistry
+from schema_tools.utils.data_structures import ProcessingResult, TableProcessingContext, FieldInfo, TableMetadata
+from schema_tools.config import SCHEMA_TOOLS_CONFIG
+
+@ToolRegistry.register("ddl_generator")
+class DDLGeneratorTool(BaseTool):
+    """DDL格式生成工具"""
+    
+    needs_llm = False
+    tool_name = "DDL生成器"
+    
+    def __init__(self, **kwargs):
+        super().__init__(**kwargs)
+    
+    async def execute(self, context: TableProcessingContext) -> ProcessingResult:
+        """执行DDL生成"""
+        try:
+            table_metadata = context.table_metadata
+            
+            # 生成DDL内容
+            ddl_content = self._generate_ddl_content(table_metadata)
+            
+            # 确定文件名和路径
+            filename = context.file_manager.get_safe_filename(
+                table_metadata.schema_name,
+                table_metadata.table_name,
+                SCHEMA_TOOLS_CONFIG["ddl_file_suffix"]
+            )
+            
+            # 确定子目录
+            subdirectory = "ddl" if SCHEMA_TOOLS_CONFIG["create_subdirectories"] else None
+            filepath = context.file_manager.get_full_path(filename, subdirectory)
+            
+            # 写入文件
+            with open(filepath, 'w', encoding='utf-8') as f:
+                f.write(ddl_content)
+            
+            self.logger.info(f"DDL文件已生成: {filepath}")
+            
+            return ProcessingResult(
+                success=True,
+                data={
+                    'filename': filename,
+                    'filepath': filepath,
+                    'content_length': len(ddl_content),
+                    'ddl_content': ddl_content  # 保存内容供后续工具使用
+                },
+                metadata={'tool': self.tool_name}
+            )
+            
+        except Exception as e:
+            self.logger.exception(f"DDL生成失败")
+            return ProcessingResult(
+                success=False,
+                error_message=f"DDL生成失败: {str(e)}"
+            )
+    
+    def _generate_ddl_content(self, table_metadata: TableMetadata) -> str:
+        """生成DDL内容"""
+        lines = []
+        
+        # 表头注释 - 只显示表名,不加解释和字数统计
+        if table_metadata.comment:
+            # 提取表名部分(去掉解释和字数统计)
+            comment = table_metadata.comment
+            # 去掉可能的字数统计 (XX字)
+            import re
+            comment = re.sub(r'[((]\d+字[))]', '', comment)
+            # 只取第一句话或逗号前的部分
+            if ',' in comment:
+                table_name_part = comment.split(',')[0]
+            elif '。' in comment:
+                table_name_part = comment.split('。')[0]
+            else:
+                table_name_part = comment.strip()
+            lines.append(f"-- 中文名: {table_name_part}")
+            lines.append(f"-- 描述: {comment}")
+        else:
+            lines.append(f"-- 中文名: {table_metadata.table_name}")
+        
+        # CREATE TABLE语句
+        lines.append(f"create table {table_metadata.full_name} (")
+        
+        # 字段定义
+        field_lines = []
+        for field in table_metadata.fields:
+            field_line = self._generate_field_line(field)
+            field_lines.append(field_line)
+        
+        # 主键定义
+        primary_keys = [f.name for f in table_metadata.fields if f.is_primary_key]
+        if primary_keys:
+            field_lines.append(f"  primary key ({', '.join(primary_keys)})")
+        
+        # 组合字段行
+        lines.extend([line if i == len(field_lines) - 1 else line + ","
+                     for i, line in enumerate(field_lines)])
+        
+        lines.append(");")
+        
+        return '\n'.join(lines)
+    
+    def _generate_field_line(self, field: FieldInfo) -> str:
+        """生成字段定义行"""
+        parts = [f"  {field.name}"]
+        
+        # 字段类型
+        field_type = self._format_field_type(field)
+        parts.append(field_type)
+        
+        # NOT NULL约束
+        if not field.nullable:
+            parts.append("not null")
+        
+        # 默认值
+        if field.default_value and not self._should_skip_default(field.default_value):
+            parts.append(f"default {self._format_default_value(field.default_value)}")
+        
+        # 组合字段定义
+        field_def = ' '.join(parts)
+        
+        # 添加注释
+        comment = self._format_field_comment(field)
+        if comment:
+            # 计算对齐空格(减少到30个字符对齐)
+            padding = max(1, 30 - len(field_def))
+            field_line = f"{field_def}{' ' * padding}-- {comment}"
+        else:
+            field_line = field_def
+        
+        return field_line
+    
+    def _format_field_type(self, field: FieldInfo) -> str:
+        """格式化字段类型"""
+        field_type = field.type.lower()
+        
+        # 处理带长度的类型
+        if field_type in ['character varying', 'varchar'] and field.max_length:
+            return f"varchar({field.max_length})"
+        elif field_type == 'character' and field.max_length:
+            return f"char({field.max_length})"
+        elif field_type == 'numeric' and field.precision:
+            if field.scale:
+                return f"numeric({field.precision},{field.scale})"
+            else:
+                return f"numeric({field.precision})"
+        elif field_type == 'timestamp without time zone':
+            return "timestamp"
+        elif field_type == 'timestamp with time zone':
+            return "timestamptz"
+        elif field_type in ['integer', 'int']:
+            return "integer"
+        elif field_type in ['bigint', 'int8']:
+            return "bigint"
+        elif field_type in ['smallint', 'int2']:
+            return "smallint"
+        elif field_type in ['double precision', 'float8']:
+            return "double precision"
+        elif field_type in ['real', 'float4']:
+            return "real"
+        elif field_type == 'boolean':
+            return "boolean"
+        elif field_type == 'text':
+            return "text"
+        elif field_type == 'date':
+            return "date"
+        elif field_type == 'time without time zone':
+            return "time"
+        elif field_type == 'time with time zone':
+            return "timetz"
+        elif field_type == 'json':
+            return "json"
+        elif field_type == 'jsonb':
+            return "jsonb"
+        elif field_type == 'uuid':
+            return "uuid"
+        elif field_type.startswith('timestamp(') and 'without time zone' in field_type:
+            # 处理 timestamp(3) without time zone
+            precision = field_type.split('(')[1].split(')')[0]
+            return f"timestamp({precision})"
+        else:
+            return field_type
+    
+    def _format_default_value(self, default_value: str) -> str:
+        """格式化默认值"""
+        # 移除可能的类型转换
+        if '::' in default_value:
+            default_value = default_value.split('::')[0]
+        
+        # 处理函数调用
+        if default_value.lower() in ['now()', 'current_timestamp']:
+            return 'current_timestamp'
+        elif default_value.lower() == 'current_date':
+            return 'current_date'
+        
+        # 处理字符串值
+        if not (default_value.startswith("'") and default_value.endswith("'")):
+            # 检查是否为数字或布尔值
+            if default_value.lower() in ['true', 'false']:
+                return default_value.lower()
+            elif default_value.replace('.', '').replace('-', '').isdigit():
+                return default_value
+            else:
+                # 其他情况加引号
+                return f"'{default_value}'"
+        
+        return default_value
+    
+    def _should_skip_default(self, default_value: str) -> bool:
+        """判断是否应跳过默认值"""
+        # 跳过序列默认值
+        if 'nextval(' in default_value.lower():
+            return True
+        
+        # 跳过空字符串
+        if default_value.strip() in ['', "''", '""']:
+            return True
+        
+        return False
+    
+    def _format_field_comment(self, field: FieldInfo) -> str:
+        """格式化字段注释"""
+        comment_parts = []
+        
+        # 基础注释
+        if field.comment:
+            comment_parts.append(field.comment)
+        
+        # 主键标识
+        if field.is_primary_key:
+            comment_parts.append("主键")
+        
+        # 外键标识
+        if field.is_foreign_key:
+            comment_parts.append("外键")
+        
+        # 去掉小括号,直接返回注释内容
+        return ','.join(comment_parts) if comment_parts else ""

+ 269 - 0
schema_tools/tools/doc_generator.py

@@ -0,0 +1,269 @@
+import os
+from typing import List, Dict, Any
+from schema_tools.tools.base import BaseTool, ToolRegistry
+from schema_tools.utils.data_structures import ProcessingResult, TableProcessingContext, FieldInfo, TableMetadata
+from schema_tools.config import SCHEMA_TOOLS_CONFIG
+
+@ToolRegistry.register("doc_generator")
+class DocGeneratorTool(BaseTool):
+    """MD文档生成工具"""
+    
+    needs_llm = False
+    tool_name = "文档生成器"
+    
+    def __init__(self, **kwargs):
+        super().__init__(**kwargs)
+    
+    async def execute(self, context: TableProcessingContext) -> ProcessingResult:
+        """执行MD文档生成"""
+        try:
+            table_metadata = context.table_metadata
+            
+            # 获取DDL生成结果(如果有)
+            ddl_result = context.step_results.get('ddl_generator')
+            ddl_content = ddl_result.data.get('ddl_content', '') if ddl_result and ddl_result.success else ''
+            
+            # 生成MD内容
+            md_content = self._generate_md_content(table_metadata, ddl_content)
+            
+            # 确定文件名和路径
+            filename = context.file_manager.get_safe_filename(
+                table_metadata.schema_name,
+                table_metadata.table_name,
+                SCHEMA_TOOLS_CONFIG["doc_file_suffix"]
+            )
+            
+            # 确定子目录
+            subdirectory = "docs" if SCHEMA_TOOLS_CONFIG["create_subdirectories"] else None
+            filepath = context.file_manager.get_full_path(filename, subdirectory)
+            
+            # 写入文件
+            with open(filepath, 'w', encoding='utf-8') as f:
+                f.write(md_content)
+            
+            self.logger.info(f"MD文档已生成: {filepath}")
+            
+            return ProcessingResult(
+                success=True,
+                data={
+                    'filename': filename,
+                    'filepath': filepath,
+                    'content_length': len(md_content)
+                },
+                metadata={'tool': self.tool_name}
+            )
+            
+        except Exception as e:
+            self.logger.exception(f"MD文档生成失败")
+            return ProcessingResult(
+                success=False,
+                error_message=f"MD文档生成失败: {str(e)}"
+            )
+    
+    def _generate_md_content(self, table_metadata: TableMetadata, ddl_content: str) -> str:
+        """生成MD文档内容"""
+        lines = []
+        
+        # 标题 - 只显示表名,不加解释和字数统计
+        if table_metadata.comment:
+            # 提取表名部分(去掉解释和字数统计)
+            comment = table_metadata.comment
+            # 去掉可能的字数统计 (XX字)
+            import re
+            comment = re.sub(r'[((]\d+字[))]', '', comment)
+            # 只取第一句话或逗号前的部分
+            if ',' in comment:
+                table_name_part = comment.split(',')[0]
+            elif '。' in comment:
+                table_name_part = comment.split('。')[0]
+            else:
+                table_name_part = comment.strip()
+            lines.append(f"## {table_metadata.table_name}({table_name_part})")
+            # 表描述
+            lines.append(f"{table_metadata.table_name} 表{comment}")
+        else:
+            lines.append(f"## {table_metadata.table_name}(数据表)")
+            lines.append(f"{table_metadata.table_name} 表")
+        
+        # 字段列表(去掉前面的空行)
+        lines.append("字段列表:")
+        for field in table_metadata.fields:
+            field_line = self._generate_field_line(field, table_metadata)
+            lines.append(field_line)
+        
+        # 字段补充说明(去掉前面的空行)
+        supplementary_info = self._generate_supplementary_info(table_metadata)
+        if supplementary_info:
+            lines.append("字段补充说明:")
+            lines.extend(supplementary_info)
+        
+        # DDL语句(可选)
+        if ddl_content and SCHEMA_TOOLS_CONFIG.get("include_ddl_in_doc", False):
+            lines.append("### DDL语句")
+            lines.append("```sql")
+            lines.append(ddl_content)
+            lines.append("```")
+            lines.append("")
+        
+        # 删除表统计信息部分
+        
+        return '\n'.join(lines)
+    
+    def _generate_field_line(self, field: FieldInfo, table_metadata: TableMetadata) -> str:
+        """生成字段说明行"""
+        # 基础信息
+        parts = [f"- {field.name}"]
+        
+        # 类型信息
+        type_info = self._format_field_type_for_doc(field)
+        parts.append(f"({type_info})")
+        
+        # 注释
+        if field.comment:
+            parts.append(f"- {field.comment}")
+        
+        # 约束信息
+        constraints = []
+        if field.is_primary_key:
+            constraints.append("主键")
+        if field.is_foreign_key:
+            constraints.append("外键")
+        if not field.nullable:
+            constraints.append("非空")
+        
+        if constraints:
+            parts.append(f"[{', '.join(constraints)}]")
+        
+        # 示例值(枚举类型显示更多,其他类型只显示2个)
+        sample_values = self._get_field_sample_values(field.name, table_metadata)
+        if sample_values:
+            if field.is_enum:
+                # 枚举类型最多显示10个
+                display_values = sample_values[:10]
+            else:
+                # 其他类型只显示2个
+                display_values = sample_values[:2]
+            sample_str = f"[示例: {', '.join(display_values)}]"
+            parts.append(sample_str)
+        
+        return ' '.join(parts)
+    
+    def _format_field_type_for_doc(self, field: FieldInfo) -> str:
+        """为文档格式化字段类型"""
+        if field.type.lower() in ['character varying', 'varchar'] and field.max_length:
+            return f"varchar({field.max_length})"
+        elif field.type.lower() == 'numeric' and field.precision:
+            if field.scale:
+                return f"numeric({field.precision},{field.scale})"
+            else:
+                return f"numeric({field.precision})"
+        elif 'timestamp' in field.type.lower():
+            if '(' in field.type:
+                # 提取精度
+                precision = field.type.split('(')[1].split(')')[0]
+                return f"timestamp({precision})"
+            return "timestamp"
+        else:
+            return field.type
+    
+    def _get_field_sample_values(self, field_name: str, table_metadata: TableMetadata) -> List[str]:
+        """获取字段的示例值"""
+        sample_values = []
+        seen_values = set()
+        
+        for sample in table_metadata.sample_data:
+            if field_name in sample:
+                value = sample[field_name]
+                if value is not None:
+                    str_value = str(value)
+                    if str_value not in seen_values:
+                        seen_values.add(str_value)
+                        sample_values.append(str_value)
+                        if len(sample_values) >= 3:
+                            break
+        
+        return sample_values
+    
+    def _generate_supplementary_info(self, table_metadata: TableMetadata) -> List[str]:
+        """生成字段补充说明"""
+        info_lines = []
+        
+        # 主键信息
+        primary_keys = [f.name for f in table_metadata.fields if f.is_primary_key]
+        if primary_keys:
+            if len(primary_keys) == 1:
+                info_lines.append(f"- {primary_keys[0]} 为主键")
+            else:
+                info_lines.append(f"- 复合主键:{', '.join(primary_keys)}")
+        
+        # 外键信息
+        foreign_keys = [(f.name, f.comment) for f in table_metadata.fields if f.is_foreign_key]
+        for fk_name, fk_comment in foreign_keys:
+            if fk_comment and '关联' in fk_comment:
+                info_lines.append(f"- {fk_name} {fk_comment}")
+            else:
+                info_lines.append(f"- {fk_name} 为外键")
+        
+        # 枚举字段信息(包括逻辑枚举类型)
+        enum_fields = [f for f in table_metadata.fields if f.is_enum and f.enum_values]
+        for field in enum_fields:
+            values_str = '、'.join(field.enum_values)
+            # 不显示取值数量,因为可能不完整
+            info_lines.append(f"- {field.name} 为枚举字段,包含取值:{values_str}")
+            # 不显示enum_description,因为它通常是重复的描述
+        
+        # 检查逻辑枚举(字段名暗示但未被识别为枚举的字段)
+        logical_enum_keywords = ["状态", "类型", "级别", "方向", "品类", "模式", "格式", "性别"]
+        for field in table_metadata.fields:
+            if not field.is_enum:  # 只检查未被识别为枚举的字段
+                field_name_lower = field.name.lower()
+                if any(keyword in field_name_lower for keyword in logical_enum_keywords):
+                    # 获取该字段的示例值来判断是否可能是逻辑枚举
+                    sample_values = self._get_field_sample_values(field.name, table_metadata)
+                    if sample_values and len(sample_values) <= 10:  # 如果样例值数量较少,可能是逻辑枚举
+                        values_str = '、'.join(sample_values[:10])
+                        info_lines.append(f"- {field.name} 疑似枚举字段,当前取值:{values_str}")
+        
+        # 特殊字段说明
+        for field in table_metadata.fields:
+            # UUID字段
+            if field.type.lower() == 'uuid':
+                info_lines.append(f"- {field.name} 使用 UUID 编码")
+            
+            # 时间戳字段
+            elif 'timestamp' in field.type.lower() and field.default_value:
+                if 'now()' in field.default_value.lower() or 'current_timestamp' in field.default_value.lower():
+                    info_lines.append(f"- {field.name} 自动记录当前时间")
+            
+            # JSON字段
+            elif field.type.lower() in ['json', 'jsonb']:
+                info_lines.append(f"- {field.name} 存储JSON格式数据")
+        
+        # 表关联说明
+        if table_metadata.table_name.endswith('_rel') or table_metadata.table_name.endswith('_relation'):
+            info_lines.append(f"- 本表是关联表,用于多对多关系映射")
+        
+        return info_lines
+    
+    def _generate_statistics_info(self, table_metadata: TableMetadata) -> List[str]:
+        """生成表统计信息"""
+        stats_lines = []
+        
+        if table_metadata.row_count is not None:
+            stats_lines.append(f"- 数据行数:{table_metadata.row_count:,}")
+        
+        if table_metadata.table_size:
+            stats_lines.append(f"- 表大小:{table_metadata.table_size}")
+        
+        # 字段统计
+        total_fields = len(table_metadata.fields)
+        nullable_fields = sum(1 for f in table_metadata.fields if f.nullable)
+        enum_fields = sum(1 for f in table_metadata.fields if f.is_enum)
+        
+        stats_lines.append(f"- 字段总数:{total_fields}")
+        if nullable_fields > 0:
+            stats_lines.append(f"- 可空字段:{nullable_fields}")
+        if enum_fields > 0:
+            stats_lines.append(f"- 枚举字段:{enum_fields}")
+        
+        return stats_lines

+ 310 - 0
schema_tools/training_data_agent.py

@@ -0,0 +1,310 @@
+import asyncio
+import time
+import logging
+import os
+from typing import List, Dict, Any, Optional
+from pathlib import Path
+
+from schema_tools.tools.base import ToolRegistry, PipelineExecutor
+from schema_tools.utils.data_structures import TableMetadata, TableProcessingContext, ProcessingResult
+from schema_tools.utils.file_manager import FileNameManager
+from schema_tools.utils.system_filter import SystemTableFilter
+from schema_tools.utils.permission_checker import DatabasePermissionChecker
+from schema_tools.utils.table_parser import TableListParser
+from schema_tools.utils.logger import setup_logging
+from schema_tools.config import SCHEMA_TOOLS_CONFIG
+
+class SchemaTrainingDataAgent:
+    """Schema训练数据生成AI Agent"""
+    
+    def __init__(self, 
+                 db_connection: str,
+                 table_list_file: str,
+                 business_context: str = None,
+                 output_dir: str = None,
+                 pipeline: str = "full"):
+        
+        self.db_connection = db_connection
+        self.table_list_file = table_list_file
+        self.business_context = business_context or "数据库管理系统"
+        self.pipeline = pipeline
+        
+        # 配置管理
+        self.config = SCHEMA_TOOLS_CONFIG
+        self.output_dir = output_dir or self.config["output_directory"]
+        
+        # 初始化组件
+        self.file_manager = FileNameManager(self.output_dir)
+        self.system_filter = SystemTableFilter()
+        self.table_parser = TableListParser()
+        self.pipeline_executor = PipelineExecutor(self.config["available_pipelines"])
+        
+        # 统计信息
+        self.stats = {
+            'total_tables': 0,
+            'processed_tables': 0,
+            'failed_tables': 0,
+            'skipped_tables': 0,
+            'start_time': None,
+            'end_time': None
+        }
+        
+        self.failed_tables = []
+        self.logger = logging.getLogger("schema_tools.Agent")
+    
+    async def generate_training_data(self) -> Dict[str, Any]:
+        """主入口:生成训练数据"""
+        try:
+            self.stats['start_time'] = time.time()
+            self.logger.info("🚀 开始生成Schema训练数据")
+            
+            # 1. 初始化
+            await self._initialize()
+            
+            # 2. 检查数据库权限
+            await self._check_database_permissions()
+            
+            # 3. 解析表清单
+            tables = await self._parse_table_list()
+            
+            # 4. 过滤系统表
+            user_tables = self._filter_system_tables(tables)
+            
+            # 5. 并发处理表
+            results = await self._process_tables_concurrently(user_tables)
+            
+            # 6. 设置结束时间
+            self.stats['end_time'] = time.time()
+            
+            # 7. 生成总结报告
+            report = self._generate_summary_report(results)
+            
+            self.logger.info("✅ Schema训练数据生成完成")
+            
+            return report
+            
+        except Exception as e:
+            self.stats['end_time'] = time.time()
+            self.logger.exception("❌ Schema训练数据生成失败")
+            raise
+    
+    async def _initialize(self):
+        """初始化Agent"""
+        # 创建输出目录
+        os.makedirs(self.output_dir, exist_ok=True)
+        if self.config["create_subdirectories"]:
+            os.makedirs(os.path.join(self.output_dir, "ddl"), exist_ok=True)
+            os.makedirs(os.path.join(self.output_dir, "docs"), exist_ok=True)
+            os.makedirs(os.path.join(self.output_dir, "logs"), exist_ok=True)
+        
+        # 初始化数据库工具
+        database_tool = ToolRegistry.get_tool("database_inspector", db_connection=self.db_connection)
+        await database_tool._create_connection_pool()
+        
+        self.logger.info(f"初始化完成,输出目录: {self.output_dir}")
+    
+    async def _check_database_permissions(self):
+        """检查数据库权限"""
+        if not self.config["check_permissions"]:
+            return
+        
+        inspector = ToolRegistry.get_tool("database_inspector")
+        checker = DatabasePermissionChecker(inspector)
+        
+        permissions = await checker.check_permissions()
+        
+        if not permissions['connect']:
+            raise Exception("无法连接到数据库")
+        
+        if self.config["require_select_permission"] and not permissions['select_data']:
+            if not self.config["allow_readonly_database"]:
+                raise Exception("数据库查询权限不足")
+            else:
+                self.logger.warning("数据库为只读或权限受限,部分功能可能受影响")
+        
+        self.logger.info(f"数据库权限检查完成: {permissions}")
+    
+    async def _parse_table_list(self) -> List[str]:
+        """解析表清单文件"""
+        tables = self.table_parser.parse_file(self.table_list_file)
+        self.stats['total_tables'] = len(tables)
+        self.logger.info(f"📋 从清单文件读取到 {len(tables)} 个表")
+        return tables
+    
+    def _filter_system_tables(self, tables: List[str]) -> List[str]:
+        """过滤系统表"""
+        if not self.config["filter_system_tables"]:
+            return tables
+        
+        user_tables = self.system_filter.filter_user_tables(tables)
+        filtered_count = len(tables) - len(user_tables)
+        
+        if filtered_count > 0:
+            self.logger.info(f"🔍 过滤了 {filtered_count} 个系统表,保留 {len(user_tables)} 个用户表")
+            self.stats['skipped_tables'] += filtered_count
+        
+        return user_tables
+    
+    async def _process_tables_concurrently(self, tables: List[str]) -> List[Dict[str, Any]]:
+        """并发处理表"""
+        max_concurrent = self.config["max_concurrent_tables"]
+        semaphore = asyncio.Semaphore(max_concurrent)
+        
+        self.logger.info(f"🔄 开始并发处理 {len(tables)} 个表 (最大并发: {max_concurrent})")
+        
+        # 创建任务
+        tasks = [
+            self._process_single_table_with_semaphore(semaphore, table_spec)
+            for table_spec in tables
+        ]
+        
+        # 并发执行
+        results = await asyncio.gather(*tasks, return_exceptions=True)
+        
+        # 统计结果
+        successful = sum(1 for r in results if isinstance(r, dict) and r.get('success', False))
+        failed = len(results) - successful
+        
+        self.stats['processed_tables'] = successful
+        self.stats['failed_tables'] = failed
+        
+        self.logger.info(f"📊 处理完成: 成功 {successful} 个,失败 {failed} 个")
+        
+        return [r for r in results if isinstance(r, dict)]
+    
+    async def _process_single_table_with_semaphore(self, semaphore: asyncio.Semaphore, table_spec: str) -> Dict[str, Any]:
+        """带信号量的单表处理"""
+        async with semaphore:
+            return await self._process_single_table(table_spec)
+    
+    async def _process_single_table(self, table_spec: str) -> Dict[str, Any]:
+        """处理单个表"""
+        start_time = time.time()
+        
+        try:
+            # 解析表名
+            if '.' in table_spec:
+                schema_name, table_name = table_spec.split('.', 1)
+            else:
+                schema_name, table_name = 'public', table_spec
+            
+            full_name = f"{schema_name}.{table_name}"
+            self.logger.info(f"🔍 开始处理表: {full_name}")
+            
+            # 创建表元数据
+            table_metadata = TableMetadata(
+                schema_name=schema_name,
+                table_name=table_name,
+                full_name=full_name
+            )
+            
+            # 创建处理上下文
+            context = TableProcessingContext(
+                table_metadata=table_metadata,
+                business_context=self.business_context,
+                output_dir=self.output_dir,
+                pipeline=self.pipeline,
+                vn=None,  # 将在工具中注入
+                file_manager=self.file_manager,
+                start_time=start_time
+            )
+            
+            # 执行处理链
+            step_results = await self.pipeline_executor.execute_pipeline(self.pipeline, context)
+            
+            # 计算总体成功状态
+            success = all(result.success for result in step_results.values())
+            
+            execution_time = time.time() - start_time
+            
+            if success:
+                self.logger.info(f"✅ 表 {full_name} 处理成功,耗时: {execution_time:.2f}秒")
+            else:
+                self.logger.error(f"❌ 表 {full_name} 处理失败,耗时: {execution_time:.2f}秒")
+                self.failed_tables.append(full_name)
+            
+            return {
+                'success': success,
+                'table_name': full_name,
+                'execution_time': execution_time,
+                'step_results': {k: v.to_dict() for k, v in step_results.items()},
+                'metadata': {
+                    'fields_count': len(table_metadata.fields),
+                    'row_count': table_metadata.row_count,
+                    'enum_fields': len([f for f in table_metadata.fields if f.is_enum])
+                }
+            }
+            
+        except Exception as e:
+            execution_time = time.time() - start_time
+            error_msg = f"表 {table_spec} 处理异常: {str(e)}"
+            self.logger.exception(error_msg)
+            self.failed_tables.append(table_spec)
+            
+            return {
+                'success': False,
+                'table_name': table_spec,
+                'execution_time': execution_time,
+                'error_message': error_msg,
+                'step_results': {}
+            }
+    
+    def _generate_summary_report(self, results: List[Dict[str, Any]]) -> Dict[str, Any]:
+        """生成总结报告"""
+        total_time = self.stats['end_time'] - self.stats['start_time']
+        
+        # 计算统计信息
+        successful_results = [r for r in results if r.get('success', False)]
+        failed_results = [r for r in results if not r.get('success', False)]
+        
+        total_fields = sum(r.get('metadata', {}).get('fields_count', 0) for r in successful_results)
+        total_enum_fields = sum(r.get('metadata', {}).get('enum_fields', 0) for r in successful_results)
+        
+        avg_execution_time = sum(r.get('execution_time', 0) for r in results) / len(results) if results else 0
+        
+        report = {
+            'summary': {
+                'total_tables': self.stats['total_tables'],
+                'processed_successfully': len(successful_results),
+                'failed': len(failed_results),
+                'skipped_system_tables': self.stats['skipped_tables'],
+                'total_execution_time': total_time,
+                'average_table_time': avg_execution_time
+            },
+            'statistics': {
+                'total_fields_processed': total_fields,
+                'enum_fields_detected': total_enum_fields,
+                'files_generated': len(successful_results) * (2 if self.pipeline == 'full' else 1)
+            },
+            'failed_tables': self.failed_tables,
+            'detailed_results': results,
+            'configuration': {
+                'pipeline': self.pipeline,
+                'business_context': self.business_context,
+                'output_directory': self.output_dir,
+                'max_concurrent_tables': self.config['max_concurrent_tables']
+            }
+        }
+        
+        # 输出总结
+        self.logger.info(f"📊 处理总结:")
+        self.logger.info(f"  ✅ 成功: {report['summary']['processed_successfully']} 个表")
+        self.logger.info(f"  ❌ 失败: {report['summary']['failed']} 个表")
+        self.logger.info(f"  ⏭️  跳过: {report['summary']['skipped_system_tables']} 个系统表")
+        self.logger.info(f"  📁 生成文件: {report['statistics']['files_generated']} 个")
+        self.logger.info(f"  🕐 总耗时: {total_time:.2f} 秒")
+        
+        if self.failed_tables:
+            self.logger.warning(f"❌ 失败的表: {', '.join(self.failed_tables)}")
+        
+        # 写入文件名映射报告
+        self.file_manager.write_mapping_report()
+        
+        return report
+    
+    async def check_database_permissions(self) -> Dict[str, bool]:
+        """检查数据库权限(供外部调用)"""
+        inspector = ToolRegistry.get_tool("database_inspector", db_connection=self.db_connection)
+        await inspector._create_connection_pool()
+        checker = DatabasePermissionChecker(inspector)
+        return await checker.check_permissions()

+ 25 - 0
schema_tools/utils/__init__.py

@@ -0,0 +1,25 @@
+"""
+工具函数模块
+"""
+
+from .data_structures import (
+    FieldType, ProcessingStatus, FieldInfo, 
+    TableMetadata, ProcessingResult, TableProcessingContext
+)
+from .table_parser import TableListParser
+from .file_manager import FileNameManager
+from .system_filter import SystemTableFilter
+from .permission_checker import DatabasePermissionChecker
+from .large_table_handler import LargeTableHandler
+from .logger import setup_logging
+
+__all__ = [
+    # 数据结构
+    "FieldType", "ProcessingStatus", "FieldInfo", 
+    "TableMetadata", "ProcessingResult", "TableProcessingContext",
+    # 工具类
+    "TableListParser", "FileNameManager", "SystemTableFilter",
+    "DatabasePermissionChecker", "LargeTableHandler",
+    # 函数
+    "setup_logging"
+]

+ 135 - 0
schema_tools/utils/data_structures.py

@@ -0,0 +1,135 @@
+from dataclasses import dataclass, field
+from typing import List, Dict, Optional, Any, Union
+from enum import Enum
+import hashlib
+import json
+
+class FieldType(Enum):
+    """字段类型枚举"""
+    INTEGER = "integer"
+    VARCHAR = "varchar"
+    TEXT = "text"
+    TIMESTAMP = "timestamp"
+    DATE = "date"
+    BOOLEAN = "boolean"
+    NUMERIC = "numeric"
+    ENUM = "enum"
+    JSON = "json"
+    UUID = "uuid"
+    OTHER = "other"
+
+class ProcessingStatus(Enum):
+    """处理状态枚举"""
+    PENDING = "pending"
+    RUNNING = "running"
+    SUCCESS = "success"
+    FAILED = "failed"
+    SKIPPED = "skipped"
+
+@dataclass
+class FieldInfo:
+    """字段信息标准结构"""
+    name: str
+    type: str
+    nullable: bool
+    default_value: Optional[str] = None
+    comment: Optional[str] = None
+    original_comment: Optional[str] = None  # 原始注释
+    generated_comment: Optional[str] = None  # LLM生成的注释
+    is_primary_key: bool = False
+    is_foreign_key: bool = False
+    is_enum: bool = False
+    enum_values: Optional[List[str]] = None
+    enum_description: Optional[str] = None
+    max_length: Optional[int] = None
+    precision: Optional[int] = None
+    scale: Optional[int] = None
+    
+    def to_dict(self) -> Dict[str, Any]:
+        """转换为字典格式"""
+        return {
+            'name': self.name,
+            'type': self.type,
+            'nullable': self.nullable,
+            'default_value': self.default_value,
+            'comment': self.comment,
+            'is_primary_key': self.is_primary_key,
+            'is_foreign_key': self.is_foreign_key,
+            'is_enum': self.is_enum,
+            'enum_values': self.enum_values
+        }
+
+@dataclass
+class TableMetadata:
+    """表元数据标准结构"""
+    schema_name: str
+    table_name: str
+    full_name: str  # schema.table_name
+    comment: Optional[str] = None
+    original_comment: Optional[str] = None  # 原始注释
+    generated_comment: Optional[str] = None  # LLM生成的注释
+    fields: List[FieldInfo] = field(default_factory=list)
+    sample_data: List[Dict[str, Any]] = field(default_factory=list)
+    row_count: Optional[int] = None
+    table_size: Optional[str] = None  # 表大小(如 "1.2 MB")
+    created_date: Optional[str] = None
+    
+    @property
+    def safe_file_name(self) -> str:
+        """生成安全的文件名"""
+        if self.schema_name.lower() == 'public':
+            return self.table_name
+        return f"{self.schema_name}__{self.table_name}".replace('.', '__').replace('-', '_').replace(' ', '_')
+    
+    def get_metadata_hash(self) -> str:
+        """计算元数据哈希值,用于增量更新判断"""
+        hash_data = {
+            'schema_name': self.schema_name,
+            'table_name': self.table_name,
+            'fields': [f.to_dict() for f in self.fields],
+            'comment': self.original_comment
+        }
+        return hashlib.md5(json.dumps(hash_data, sort_keys=True).encode()).hexdigest()
+
+@dataclass
+class ProcessingResult:
+    """工具处理结果标准结构"""
+    success: bool
+    data: Optional[Any] = None
+    error_message: Optional[str] = None
+    warnings: List[str] = field(default_factory=list)
+    execution_time: Optional[float] = None
+    metadata: Dict[str, Any] = field(default_factory=dict)
+    
+    def add_warning(self, warning: str):
+        """添加警告信息"""
+        self.warnings.append(warning)
+    
+    def to_dict(self) -> Dict[str, Any]:
+        """转换为字典格式"""
+        return {
+            'success': self.success,
+            'data': self.data,
+            'error_message': self.error_message,
+            'warnings': self.warnings,
+            'execution_time': self.execution_time,
+            'metadata': self.metadata
+        }
+
+@dataclass
+class TableProcessingContext:
+    """表处理上下文"""
+    table_metadata: TableMetadata
+    business_context: str
+    output_dir: str
+    pipeline: str
+    vn: Any  # vanna实例
+    file_manager: Any
+    current_step: str = "initialized"
+    step_results: Dict[str, ProcessingResult] = field(default_factory=dict)
+    start_time: Optional[float] = None
+    
+    def update_step(self, step_name: str, result: ProcessingResult):
+        """更新步骤结果"""
+        self.current_step = step_name
+        self.step_results[step_name] = result

+ 153 - 0
schema_tools/utils/file_manager.py

@@ -0,0 +1,153 @@
+import os
+import logging
+from typing import Dict, Set, Optional
+from pathlib import Path
+
+class FileNameManager:
+    """文件名管理器,处理文件命名和冲突"""
+    
+    def __init__(self, output_dir: str):
+        self.output_dir = output_dir
+        self.used_names: Set[str] = set()
+        self.name_mapping: Dict[str, str] = {}  # 原始名 -> 实际文件名
+        self.logger = logging.getLogger("schema_tools.FileNameManager")
+        
+        # 扫描已存在的文件
+        self._scan_existing_files()
+    
+    def _scan_existing_files(self):
+        """扫描输出目录中已存在的文件"""
+        if not os.path.exists(self.output_dir):
+            return
+        
+        for root, dirs, files in os.walk(self.output_dir):
+            for file in files:
+                if file.endswith(('.ddl', '.md')):
+                    self.used_names.add(file)
+    
+    def get_safe_filename(self, schema_name: str, table_name: str, suffix: str) -> str:
+        """
+        生成安全的文件名,避免冲突
+        
+        Args:
+            schema_name: Schema名称
+            table_name: 表名
+            suffix: 文件后缀(如 .ddl 或 _detail.md)
+            
+        Returns:
+            安全的文件名
+        """
+        # 生成基础文件名
+        base_name = self._generate_base_name(schema_name, table_name)
+        
+        # 添加后缀
+        if suffix.startswith('.'):
+            filename = f"{base_name}{suffix}"
+        else:
+            filename = f"{base_name}{suffix}"
+        
+        # 检查冲突并生成唯一名称
+        unique_filename = self._ensure_unique_filename(filename)
+        
+        # 记录映射关系
+        original_key = f"{schema_name}.{table_name}"
+        self.name_mapping[original_key] = unique_filename
+        self.used_names.add(unique_filename)
+        
+        return unique_filename
+    
+    def _generate_base_name(self, schema_name: str, table_name: str) -> str:
+        """
+        生成基础文件名
+        
+        规则:
+        - public.table_name → table_name
+        - schema.table_name → schema__table_name  
+        - 特殊字符替换: . → __, - → _, 空格 → _
+        """
+        if schema_name.lower() == 'public':
+            safe_name = table_name
+        else:
+            safe_name = f"{schema_name}__{table_name}"
+        
+        # 替换特殊字符
+        replacements = {
+            '.': '__',
+            '-': '_',
+            ' ': '_',
+            '/': '_',
+            '\\': '_',
+            ':': '_',
+            '*': '_',
+            '?': '_',
+            '"': '_',
+            '<': '_',
+            '>': '_',
+            '|': '_'
+        }
+        
+        for old_char, new_char in replacements.items():
+            safe_name = safe_name.replace(old_char, new_char)
+        
+        # 移除连续的下划线
+        while '__' in safe_name:
+            safe_name = safe_name.replace('__', '_')
+        
+        return safe_name
+    
+    def _ensure_unique_filename(self, filename: str) -> str:
+        """确保文件名唯一性"""
+        if filename not in self.used_names:
+            return filename
+        
+        # 如果重名,添加数字后缀
+        base, ext = os.path.splitext(filename)
+        counter = 1
+        
+        while True:
+            unique_name = f"{base}_{counter}{ext}"
+            if unique_name not in self.used_names:
+                self.logger.warning(f"文件名冲突,'{filename}' 重命名为 '{unique_name}'")
+                return unique_name
+            counter += 1
+    
+    def get_full_path(self, filename: str, subdirectory: Optional[str] = None) -> str:
+        """
+        获取完整文件路径
+        
+        Args:
+            filename: 文件名
+            subdirectory: 子目录(如 'ddl' 或 'docs')
+            
+        Returns:
+            完整路径
+        """
+        if subdirectory:
+            full_path = os.path.join(self.output_dir, subdirectory, filename)
+        else:
+            full_path = os.path.join(self.output_dir, filename)
+        
+        # 确保目录存在
+        os.makedirs(os.path.dirname(full_path), exist_ok=True)
+        
+        return full_path
+    
+    def get_mapping_report(self) -> Dict[str, str]:
+        """获取文件名映射报告"""
+        return self.name_mapping.copy()
+    
+    def write_mapping_report(self):
+        """写入文件名映射报告"""
+        report_path = os.path.join(self.output_dir, "filename_mapping.txt")
+        
+        try:
+            with open(report_path, 'w', encoding='utf-8') as f:
+                f.write("# 文件名映射报告\n")
+                f.write("# 格式: 原始表名 -> 实际文件名\n\n")
+                
+                for original, actual in sorted(self.name_mapping.items()):
+                    f.write(f"{original} -> {actual}\n")
+            
+            self.logger.info(f"文件名映射报告已保存到: {report_path}")
+        except Exception as e:
+            self.logger.error(f"写入文件名映射报告失败: {e}")

+ 162 - 0
schema_tools/utils/large_table_handler.py

@@ -0,0 +1,162 @@
+import logging
+import random
+from typing import List, Dict, Any, Optional
+from schema_tools.config import SCHEMA_TOOLS_CONFIG
+
+class LargeTableHandler:
+    """大表处理策略"""
+    
+    def __init__(self):
+        self.logger = logging.getLogger("schema_tools.LargeTableHandler")
+        self.large_table_threshold = SCHEMA_TOOLS_CONFIG.get("large_table_threshold", 1000000)
+        self.skip_large_tables = SCHEMA_TOOLS_CONFIG.get("skip_large_tables", False)
+        self.max_table_size = SCHEMA_TOOLS_CONFIG.get("max_table_size", 10000000)
+    
+    def should_skip_table(self, row_count: Optional[int]) -> bool:
+        """
+        判断是否应跳过表
+        
+        Args:
+            row_count: 表行数
+            
+        Returns:
+            是否跳过
+        """
+        if not self.skip_large_tables or row_count is None:
+            return False
+        
+        if row_count > self.max_table_size:
+            self.logger.warning(f"表行数({row_count})超过最大限制({self.max_table_size}),将跳过处理")
+            return True
+        
+        return False
+    
+    def is_large_table(self, row_count: Optional[int]) -> bool:
+        """
+        判断是否为大表
+        
+        Args:
+            row_count: 表行数
+            
+        Returns:
+            是否为大表
+        """
+        if row_count is None:
+            return False
+        
+        return row_count > self.large_table_threshold
+    
+    async def get_smart_sample(self, db_inspector, table_name: str, schema_name: str, 
+                               row_count: Optional[int], limit: int = 20) -> List[Dict[str, Any]]:
+        """
+        智能采样策略
+        
+        Args:
+            db_inspector: 数据库检查工具实例
+            table_name: 表名
+            schema_name: Schema名
+            row_count: 表行数
+            limit: 采样数量限制
+            
+        Returns:
+            采样数据列表
+        """
+        full_table_name = f"{schema_name}.{table_name}"
+        
+        # 如果不是大表,使用简单采样
+        if not self.is_large_table(row_count):
+            return await self._simple_sample(db_inspector, full_table_name, limit)
+        
+        self.logger.info(f"表 {full_table_name} 有 {row_count} 行,使用智能采样策略")
+        
+        # 大表使用分层采样
+        return await self._stratified_sample(db_inspector, full_table_name, row_count, limit)
+    
+    async def _simple_sample(self, db_inspector, full_table_name: str, limit: int) -> List[Dict[str, Any]]:
+        """简单采样策略"""
+        query = f"SELECT * FROM {full_table_name} LIMIT {limit}"
+        
+        async with db_inspector.connection_pool.acquire() as conn:
+            rows = await conn.fetch(query)
+            return [dict(row) for row in rows]
+    
+    async def _stratified_sample(self, db_inspector, full_table_name: str, 
+                                  row_count: int, limit: int) -> List[Dict[str, Any]]:
+        """分层采样策略(用于大表)"""
+        samples_per_section = max(1, limit // 3)
+        samples = []
+        
+        async with db_inspector.connection_pool.acquire() as conn:
+            # 1. 前N行采样
+            front_query = f"SELECT * FROM {full_table_name} LIMIT {samples_per_section}"
+            front_rows = await conn.fetch(front_query)
+            samples.extend([dict(row) for row in front_rows])
+            
+            # 2. 随机中间采样
+            if row_count > samples_per_section * 2:
+                try:
+                    # 使用TABLESAMPLE进行随机采样
+                    sample_percent = min(1.0, (samples_per_section * 100.0) / row_count)
+                    middle_query = f"""
+                    SELECT * FROM {full_table_name} 
+                    TABLESAMPLE SYSTEM({sample_percent}) 
+                    LIMIT {samples_per_section}
+                    """
+                    middle_rows = await conn.fetch(middle_query)
+                    samples.extend([dict(row) for row in middle_rows])
+                except Exception as e:
+                    self.logger.warning(f"TABLESAMPLE采样失败,使用OFFSET采样: {e}")
+                    # 回退到OFFSET采样
+                    offset = random.randint(samples_per_section, row_count - samples_per_section)
+                    offset_query = f"SELECT * FROM {full_table_name} OFFSET {offset} LIMIT {samples_per_section}"
+                    offset_rows = await conn.fetch(offset_query)
+                    samples.extend([dict(row) for row in offset_rows])
+            
+            # 3. 后N行采样
+            remaining = limit - len(samples)
+            if remaining > 0 and row_count > limit:
+                # 使用OFFSET获取最后的行
+                offset = max(0, row_count - remaining)
+                tail_query = f"SELECT * FROM {full_table_name} OFFSET {offset} LIMIT {remaining}"
+                tail_rows = await conn.fetch(tail_query)
+                samples.extend([dict(row) for row in tail_rows])
+        
+        self.logger.info(f"智能采样完成,获取了 {len(samples)} 条数据")
+        return samples[:limit]  # 确保不超过限制
+    
+    def get_sampling_strategy_info(self, row_count: Optional[int]) -> Dict[str, Any]:
+        """
+        获取采样策略信息
+        
+        Args:
+            row_count: 表行数
+            
+        Returns:
+            策略信息字典
+        """
+        if row_count is None:
+            return {
+                'strategy': 'simple',
+                'reason': '未知表大小',
+                'is_large_table': False
+            }
+        
+        if self.should_skip_table(row_count):
+            return {
+                'strategy': 'skip',
+                'reason': f'表太大({row_count}行),超过限制({self.max_table_size}行)',
+                'is_large_table': True
+            }
+        
+        if self.is_large_table(row_count):
+            return {
+                'strategy': 'smart',
+                'reason': f'大表({row_count}行),使用智能采样',
+                'is_large_table': True
+            }
+        
+        return {
+            'strategy': 'simple',
+            'reason': f'普通表({row_count}行),使用简单采样',
+            'is_large_table': False
+        }

+ 174 - 0
schema_tools/utils/logger.py

@@ -0,0 +1,174 @@
+import logging
+import os
+import sys
+from datetime import datetime
+from typing import Optional
+
+def setup_logging(verbose: bool = False, log_file: Optional[str] = None, log_dir: Optional[str] = None):
+    """
+    设置日志系统
+    
+    Args:
+        verbose: 是否启用详细日志
+        log_file: 日志文件名
+        log_dir: 日志目录
+    """
+    # 确定日志级别
+    log_level = logging.DEBUG if verbose else logging.INFO
+    
+    # 创建根logger
+    root_logger = logging.getLogger()
+    root_logger.setLevel(log_level)
+    
+    # 清除已有的处理器
+    root_logger.handlers.clear()
+    
+    # 设置日志格式
+    console_format = "%(asctime)s [%(levelname)s] %(message)s"
+    file_format = "%(asctime)s [%(levelname)s] [%(name)s] %(message)s"
+    date_format = "%Y-%m-%d %H:%M:%S"
+    
+    # 控制台处理器
+    console_handler = logging.StreamHandler(sys.stdout)
+    console_handler.setLevel(log_level)
+    console_formatter = logging.Formatter(console_format, datefmt=date_format)
+    console_handler.setFormatter(console_formatter)
+    root_logger.addHandler(console_handler)
+    
+    # 文件处理器(如果指定)
+    if log_file:
+        # 确定日志文件路径
+        if log_dir:
+            os.makedirs(log_dir, exist_ok=True)
+            log_path = os.path.join(log_dir, log_file)
+        else:
+            log_path = log_file
+        
+        # 添加时间戳到日志文件名
+        base_name, ext = os.path.splitext(log_path)
+        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+        log_path = f"{base_name}_{timestamp}{ext}"
+        
+        file_handler = logging.FileHandler(log_path, encoding='utf-8')
+        file_handler.setLevel(log_level)
+        file_formatter = logging.Formatter(file_format, datefmt=date_format)
+        file_handler.setFormatter(file_formatter)
+        root_logger.addHandler(file_handler)
+        
+        # 记录日志文件位置
+        root_logger.info(f"日志文件: {os.path.abspath(log_path)}")
+    
+    # 设置schema_tools模块的日志级别
+    schema_tools_logger = logging.getLogger("schema_tools")
+    schema_tools_logger.setLevel(log_level)
+    
+    # 设置第三方库的日志级别(避免过多输出)
+    logging.getLogger("asyncio").setLevel(logging.WARNING)
+    logging.getLogger("asyncpg").setLevel(logging.WARNING)
+    logging.getLogger("openai").setLevel(logging.WARNING)
+    logging.getLogger("httpx").setLevel(logging.WARNING)
+    logging.getLogger("urllib3").setLevel(logging.WARNING)
+    
+    # 返回schema_tools的logger
+    return schema_tools_logger
+
+class ColoredFormatter(logging.Formatter):
+    """带颜色的日志格式化器(用于控制台)"""
+    
+    # ANSI颜色代码
+    COLORS = {
+        'DEBUG': '\033[36m',     # 青色
+        'INFO': '\033[32m',      # 绿色
+        'WARNING': '\033[33m',   # 黄色
+        'ERROR': '\033[31m',     # 红色
+        'CRITICAL': '\033[35m',  # 紫色
+    }
+    RESET = '\033[0m'
+    
+    def format(self, record):
+        # 保存原始级别名
+        levelname = record.levelname
+        
+        # 添加颜色
+        if levelname in self.COLORS:
+            record.levelname = f"{self.COLORS[levelname]}{levelname}{self.RESET}"
+        
+        # 格式化消息
+        formatted = super().format(record)
+        
+        # 恢复原始级别名
+        record.levelname = levelname
+        
+        return formatted
+
+def get_colored_console_handler(level=logging.INFO):
+    """获取带颜色的控制台处理器"""
+    handler = logging.StreamHandler(sys.stdout)
+    handler.setLevel(level)
+    
+    # 检查是否支持颜色(Windows需要特殊处理)
+    if sys.platform == "win32":
+        try:
+            import colorama
+            colorama.init()
+            use_color = True
+        except ImportError:
+            use_color = False
+    else:
+        # Unix/Linux/Mac通常支持ANSI颜色
+        use_color = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty()
+    
+    if use_color:
+        formatter = ColoredFormatter(
+            "%(asctime)s [%(levelname)s] %(message)s",
+            datefmt="%Y-%m-%d %H:%M:%S"
+        )
+    else:
+        formatter = logging.Formatter(
+            "%(asctime)s [%(levelname)s] %(message)s",
+            datefmt="%Y-%m-%d %H:%M:%S"
+        )
+    
+    handler.setFormatter(formatter)
+    return handler
+
+class TableProcessingLogger:
+    """表处理专用日志器"""
+    
+    def __init__(self, logger_name: str = "schema_tools.TableProcessor"):
+        self.logger = logging.getLogger(logger_name)
+        self.current_table = None
+        self.start_time = None
+    
+    def start_table(self, table_name: str):
+        """开始处理表"""
+        self.current_table = table_name
+        self.start_time = datetime.now()
+        self.logger.info(f"{'='*60}")
+        self.logger.info(f"开始处理表: {table_name}")
+        self.logger.info(f"开始时间: {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}")
+    
+    def end_table(self, success: bool = True):
+        """结束处理表"""
+        if self.start_time:
+            duration = (datetime.now() - self.start_time).total_seconds()
+            status = "成功" if success else "失败"
+            self.logger.info(f"处理{status},耗时: {duration:.2f}秒")
+        self.logger.info(f"{'='*60}\n")
+        self.current_table = None
+        self.start_time = None
+    
+    def log_step(self, step_name: str, message: str = None):
+        """记录处理步骤"""
+        if message:
+            self.logger.info(f"  [{step_name}] {message}")
+        else:
+            self.logger.info(f"  [{step_name}]")
+    
+    def log_warning(self, message: str):
+        """记录警告"""
+        self.logger.warning(f"  ⚠ {message}")
+    
+    def log_error(self, message: str):
+        """记录错误"""
+        self.logger.error(f"  ✗ {message}")

+ 167 - 0
schema_tools/utils/permission_checker.py

@@ -0,0 +1,167 @@
+import logging
+from typing import Dict, Optional
+import asyncio
+
+class DatabasePermissionChecker:
+    """数据库权限检查器"""
+    
+    def __init__(self, db_inspector):
+        self.db_inspector = db_inspector
+        self.logger = logging.getLogger("schema_tools.DatabasePermissionChecker")
+        self._permission_cache: Optional[Dict[str, bool]] = None
+    
+    async def check_permissions(self) -> Dict[str, bool]:
+        """
+        检查数据库权限
+        
+        Returns:
+            权限字典,包含:
+            - connect: 是否可连接
+            - select_metadata: 是否可查询元数据
+            - select_data: 是否可查询数据
+            - is_readonly: 是否为只读
+        """
+        if self._permission_cache is not None:
+            return self._permission_cache
+        
+        permissions = {
+            'connect': False,
+            'select_metadata': False,
+            'select_data': False,
+            'is_readonly': False
+        }
+        
+        try:
+            # 检查连接权限
+            if await self._test_connection():
+                permissions['connect'] = True
+                self.logger.info("✓ 数据库连接成功")
+            else:
+                self.logger.error("✗ 无法连接到数据库")
+                return permissions
+            
+            # 检查元数据查询权限
+            if await self._test_metadata_access():
+                permissions['select_metadata'] = True
+                self.logger.info("✓ 元数据查询权限正常")
+            else:
+                self.logger.warning("⚠ 元数据查询权限受限")
+            
+            # 检查数据查询权限
+            if await self._test_data_access():
+                permissions['select_data'] = True
+                self.logger.info("✓ 数据查询权限正常")
+            else:
+                self.logger.warning("⚠ 数据查询权限受限")
+            
+            # 检查是否为只读库
+            if await self._test_write_permission():
+                permissions['is_readonly'] = False
+                self.logger.info("✓ 数据库可读写")
+            else:
+                permissions['is_readonly'] = True
+                self.logger.info("ℹ 数据库为只读模式")
+            
+            self._permission_cache = permissions
+            return permissions
+            
+        except Exception as e:
+            self.logger.exception(f"权限检查失败: {e}")
+            return permissions
+    
+    async def _test_connection(self) -> bool:
+        """测试数据库连接"""
+        try:
+            # 尝试获取数据库版本
+            query = "SELECT version()"
+            async with self.db_inspector.connection_pool.acquire() as conn:
+                version = await conn.fetchval(query)
+                self.logger.debug(f"数据库版本: {version}")
+                return True
+        except Exception as e:
+            self.logger.error(f"连接测试失败: {e}")
+            return False
+    
+    async def _test_metadata_access(self) -> bool:
+        """测试元数据访问权限"""
+        try:
+            query = """
+            SELECT schemaname, tablename 
+            FROM pg_tables 
+            WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
+            LIMIT 1
+            """
+            async with self.db_inspector.connection_pool.acquire() as conn:
+                result = await conn.fetch(query)
+                return True
+        except Exception as e:
+            self.logger.error(f"元数据访问测试失败: {e}")
+            return False
+    
+    async def _test_data_access(self) -> bool:
+        """测试数据访问权限"""
+        try:
+            # 尝试查询一个简单的数据
+            query = "SELECT 1 as test"
+            async with self.db_inspector.connection_pool.acquire() as conn:
+                result = await conn.fetchval(query)
+                return result == 1
+        except Exception as e:
+            self.logger.error(f"数据访问测试失败: {e}")
+            return False
+    
+    async def _test_write_permission(self) -> bool:
+        """测试写入权限(通过创建临时表)"""
+        try:
+            async with self.db_inspector.connection_pool.acquire() as conn:
+                # 开启事务
+                async with conn.transaction():
+                    # 尝试创建临时表
+                    await conn.execute("""
+                        CREATE TEMP TABLE _schema_tools_permission_test (
+                            id INTEGER PRIMARY KEY,
+                            test_value TEXT
+                        )
+                    """)
+                    
+                    # 尝试插入数据
+                    await conn.execute("""
+                        INSERT INTO _schema_tools_permission_test (id, test_value) 
+                        VALUES (1, 'test')
+                    """)
+                    
+                    # 清理(事务结束时临时表会自动删除)
+                    await conn.execute("DROP TABLE IF EXISTS _schema_tools_permission_test")
+                    
+                return True
+        except Exception as e:
+            # 写入失败通常意味着只读权限
+            self.logger.debug(f"写入权限测试失败(可能是只读库): {e}")
+            return False
+    
+    def get_permission_summary(self) -> str:
+        """获取权限摘要信息"""
+        if self._permission_cache is None:
+            return "权限未检查"
+        
+        perms = self._permission_cache
+        
+        if not perms['connect']:
+            return "❌ 无法连接到数据库"
+        
+        if perms['select_metadata'] and perms['select_data']:
+            mode = "只读" if perms['is_readonly'] else "读写"
+            return f"✅ 权限正常({mode}模式)"
+        elif perms['select_metadata']:
+            return "⚠️ 仅有元数据查询权限"
+        else:
+            return "❌ 权限不足"
+    
+    def require_minimum_permissions(self) -> bool:
+        """检查是否满足最低权限要求"""
+        if self._permission_cache is None:
+            return False
+        
+        # 最低要求:能连接和查询元数据
+        return (self._permission_cache['connect'] and 
+                self._permission_cache['select_metadata'])

+ 108 - 0
schema_tools/utils/system_filter.py

@@ -0,0 +1,108 @@
+import logging
+from typing import List, Set
+from schema_tools.config import SCHEMA_TOOLS_CONFIG
+
+class SystemTableFilter:
+    """系统表过滤器"""
+    
+    # PostgreSQL系统表前缀
+    PG_SYSTEM_PREFIXES = [
+        'pg_', 'information_schema', 'sql_', 'cardinal_number',
+        'character_data', 'sql_identifier', 'time_stamp', 'yes_or_no'
+    ]
+    
+    # 系统schema
+    SYSTEM_SCHEMAS = [
+        'information_schema', 'pg_catalog', 'pg_toast', 
+        'pg_temp_1', 'pg_toast_temp_1', 'pg_temp', 'pg_toast_temp'
+    ]
+    
+    def __init__(self):
+        self.logger = logging.getLogger("schema_tools.SystemTableFilter")
+        
+        # 加载自定义配置
+        self.custom_prefixes = SCHEMA_TOOLS_CONFIG.get("custom_system_prefixes", [])
+        self.custom_schemas = SCHEMA_TOOLS_CONFIG.get("custom_system_schemas", [])
+    
+    def is_system_table(self, schema_name: str, table_name: str) -> bool:
+        """
+        判断是否为系统表
+        
+        Args:
+            schema_name: Schema名称
+            table_name: 表名
+            
+        Returns:
+            是否为系统表
+        """
+        # 检查系统schema
+        all_system_schemas = self.SYSTEM_SCHEMAS + self.custom_schemas
+        if schema_name.lower() in [s.lower() for s in all_system_schemas]:
+            return True
+        
+        # 检查表名前缀
+        table_lower = table_name.lower()
+        all_prefixes = self.PG_SYSTEM_PREFIXES + self.custom_prefixes
+        
+        for prefix in all_prefixes:
+            if table_lower.startswith(prefix.lower()):
+                return True
+        
+        # 检查临时表模式
+        if schema_name.lower().startswith('pg_temp') or schema_name.lower().startswith('pg_toast_temp'):
+            return True
+        
+        return False
+    
+    def filter_user_tables(self, table_list: List[str]) -> List[str]:
+        """
+        过滤出用户表
+        
+        Args:
+            table_list: 表名列表(可能包含schema)
+            
+        Returns:
+            用户表列表
+        """
+        user_tables = []
+        filtered_tables = []
+        
+        for table_spec in table_list:
+            # 解析schema和表名
+            if '.' in table_spec:
+                schema, table = table_spec.split('.', 1)
+            else:
+                schema, table = 'public', table_spec
+            
+            if self.is_system_table(schema, table):
+                filtered_tables.append(table_spec)
+                self.logger.debug(f"过滤系统表: {table_spec}")
+            else:
+                user_tables.append(table_spec)
+        
+        if filtered_tables:
+            self.logger.info(f"过滤了 {len(filtered_tables)} 个系统表,保留 {len(user_tables)} 个用户表")
+            if len(filtered_tables) <= 10:
+                self.logger.debug(f"被过滤的系统表: {', '.join(filtered_tables)}")
+        
+        return user_tables
+    
+    def get_system_prefixes(self) -> Set[str]:
+        """获取所有系统表前缀"""
+        return set(self.PG_SYSTEM_PREFIXES + self.custom_prefixes)
+    
+    def get_system_schemas(self) -> Set[str]:
+        """获取所有系统schema"""
+        return set(self.SYSTEM_SCHEMAS + self.custom_schemas)
+    
+    def add_custom_prefix(self, prefix: str):
+        """添加自定义系统表前缀"""
+        if prefix not in self.custom_prefixes:
+            self.custom_prefixes.append(prefix)
+            self.logger.info(f"添加自定义系统表前缀: {prefix}")
+    
+    def add_custom_schema(self, schema: str):
+        """添加自定义系统schema"""
+        if schema not in self.custom_schemas:
+            self.custom_schemas.append(schema)
+            self.logger.info(f"添加自定义系统schema: {schema}")

+ 114 - 0
schema_tools/utils/table_parser.py

@@ -0,0 +1,114 @@
+import os
+import logging
+from typing import List
+
+class TableListParser:
+    """表清单解析器"""
+    
+    def __init__(self):
+        self.logger = logging.getLogger("schema_tools.TableListParser")
+    
+    def parse_file(self, file_path: str) -> List[str]:
+        """
+        解析表清单文件
+        
+        Args:
+            file_path: 表清单文件路径
+            
+        Returns:
+            表名列表
+            
+        Raises:
+            FileNotFoundError: 文件不存在
+            ValueError: 文件格式错误
+        """
+        if not os.path.exists(file_path):
+            raise FileNotFoundError(f"表清单文件不存在: {file_path}")
+        
+        tables = []
+        
+        try:
+            with open(file_path, 'r', encoding='utf-8') as f:
+                for line_num, line in enumerate(f, 1):
+                    # 移除空白字符
+                    line = line.strip()
+                    
+                    # 跳过空行和注释行
+                    if not line or line.startswith('#') or line.startswith('--'):
+                        continue
+                    
+                    # 验证表名格式
+                    if self._validate_table_name(line):
+                        tables.append(line)
+                        self.logger.debug(f"解析到表: {line}")
+                    else:
+                        self.logger.warning(f"第 {line_num} 行: 无效的表名格式: {line}")
+            
+            if not tables:
+                raise ValueError("表清单文件中没有有效的表名")
+            
+            self.logger.info(f"成功解析 {len(tables)} 个表")
+            return tables
+            
+        except Exception as e:
+            self.logger.error(f"解析表清单文件失败: {e}")
+            raise
+    
+    def _validate_table_name(self, table_name: str) -> bool:
+        """
+        验证表名格式
+        
+        Args:
+            table_name: 表名
+            
+        Returns:
+            是否合法
+        """
+        # 基本验证:不能为空,不能包含特殊字符
+        if not table_name:
+            return False
+        
+        # 禁止的字符
+        forbidden_chars = [';', '(', ')', '[', ']', '{', '}', '*', '?', '!', '@', '#', '$', '%', '^', '&']
+        for char in forbidden_chars:
+            if char in table_name:
+                return False
+        
+        # 表名格式:schema.table 或 table
+        parts = table_name.split('.')
+        if len(parts) > 2:
+            return False
+        
+        # 每部分都不能为空
+        for part in parts:
+            if not part:
+                return False
+        
+        return True
+    
+    def parse_string(self, tables_str: str) -> List[str]:
+        """
+        解析表名字符串(用于测试或命令行输入)
+        
+        Args:
+            tables_str: 表名字符串,逗号或换行分隔
+            
+        Returns:
+            表名列表
+        """
+        tables = []
+        
+        # 支持逗号和换行分隔
+        for separator in [',', '\n']:
+            if separator in tables_str:
+                parts = tables_str.split(separator)
+                break
+        else:
+            parts = [tables_str]
+        
+        for part in parts:
+            table_name = part.strip()
+            if table_name and self._validate_table_name(table_name):
+                tables.append(table_name)
+        
+        return tables