pgvector_restore_api_design.md 24 KB

PgVector 恢复备份 API 设计文档

概述

为系统添加两个专用的 pgvector 表恢复备份 API,与现有的 /api/v0/data_pipeline/vector/backup API 相对应。这两个API将把导出为CSV的文件重新写回到PostgreSQL数据库中,充分复用现有的数据库连接和配置机制。

📋 路径使用说明

重要结论:经过技术分析,恢复备份API 不需要绝对路径

技术原因

  1. PostgreSQL COPY FROM STDIN:恢复时使用 cursor.copy_expert("COPY table FROM STDIN WITH CSV", file_object)
  2. 文件对象处理:Python使用相对路径打开文件对象即可,无需绝对路径
  3. 与备份不同:备份时需要绝对路径是为了Python文件写入操作,而非PostgreSQL要求

API设计优化

  • 列表API:只返回相对路径(./data_pipeline/training_data/...
  • 恢复API:只接收相对路径参数
  • 跨平台兼容:使用 Path 对象处理路径,响应统一使用Unix风格路径

API 端点概览

API 端点 功能
备份文件列表API GET /api/v0/data_pipeline/vector/restore/list 列出可用的备份文件
备份恢复API POST /api/v0/data_pipeline/vector/restore 执行备份恢复操作

API 1: 备份文件列表 API

基本信息

  • 端点: GET /api/v0/data_pipeline/vector/restore/list
  • 方法: GET
  • 内容类型: application/json
  • 认证: 无(当前版本)

请求参数(查询参数)

参数名 类型 必需 默认值 说明
global_only boolean false 仅查询全局备份目录(training_data/vector_bak/)
task_id string null 指定task_id,仅查询该任务下的备份文件

参数逻辑

  • 不传任何参数:查询所有备份目录
  • 仅传 global_only=true:仅查询 training_data/vector_bak/
  • 仅传 task_id=xxx:仅查询指定任务的备份文件
  • 同时传递两个参数:task_id 优先级更高

扫描目录逻辑

扫描范围

  1. 全局备份目录: ./data_pipeline/training_data/vector_bak/
  2. 任务相关目录:
    • ./data_pipeline/training_data/task_*/vector_bak/
    • ./data_pipeline/training_data/manual_*/vector_bak/

文件筛选条件

  • 必须同时存在 langchain_pg_collection_*.csvlangchain_pg_embedding_*.csv
  • 文件名格式:langchain_pg_{table}_{timestamp}.csv
  • 时间戳格式:YYYYMMDD_HHMMSS

请求示例

# 1. 查询所有备份文件
curl "http://localhost:8084/api/v0/data_pipeline/vector/restore/list"

# 2. 仅查询全局备份
curl "http://localhost:8084/api/v0/data_pipeline/vector/restore/list?global_only=true"

# 3. 查询特定任务的备份
curl "http://localhost:8084/api/v0/data_pipeline/vector/restore/list?task_id=task_20250721_213627"

响应格式

成功响应 (200)

{
  "code": 200,
  "success": true,
  "message": "操作成功",
  "data": {
    "response": "成功扫描到 3 个备份位置,共 4 个备份集",
         "backup_locations": [
       {
         "type": "global",
         "relative_path": "./data_pipeline/training_data/vector_bak",
         "backups": [
          {
            "timestamp": "20250722_010318",
            "collection_file": "langchain_pg_collection_20250722_010318.csv",
            "embedding_file": "langchain_pg_embedding_20250722_010318.csv",
            "collection_size": "209 B",
            "embedding_size": "819 KB",
            "backup_date": "2025-07-22 01:03:18",
            "has_log": true,
            "log_file": "vector_backup_log.txt"
          }
        ]
      },
             {
         "type": "task",
         "task_id": "task_20250721_213627",
         "relative_path": "./data_pipeline/training_data/task_20250721_213627/vector_bak",
         "backups": [
          {
            "timestamp": "20250721_215758",
            "collection_file": "langchain_pg_collection_20250721_215758.csv",
            "embedding_file": "langchain_pg_embedding_20250721_215758.csv",
            "collection_size": "209 B",
            "embedding_size": "764 KB",
            "backup_date": "2025-07-21 21:57:58",
            "has_log": true,
            "log_file": "vector_backup_log.txt"
          }
        ]
      },
             {
         "type": "task",
         "task_id": "task_20250721_183935",
         "relative_path": "./data_pipeline/training_data/task_20250721_183935/vector_bak",
         "backups": [
          {
            "timestamp": "20250721_201447",
            "collection_file": "langchain_pg_collection_20250721_201447.csv",
            "embedding_file": "langchain_pg_embedding_20250721_201447.csv",
            "collection_size": "210 B",
            "embedding_size": "780 KB",
            "backup_date": "2025-07-21 20:14:47",
            "has_log": true,
            "log_file": "vector_backup_log.txt"
          }
        ]
      }
    ],
    "summary": {
      "total_locations": 3,
      "total_backup_sets": 4,
      "global_backups": 1,
      "task_backups": 3,
      "scan_time": "2025-07-22T10:30:45+08:00"
    },
    "timestamp": "2025-07-22T10:30:45+08:00"
  }
}

无备份文件响应 (200)

{
  "code": 200,
  "success": true,
  "message": "操作成功",
  "data": {
    "response": "未找到任何可用的备份文件",
    "backup_locations": [],
    "summary": {
      "total_locations": 0,
      "total_backup_sets": 0,
      "global_backups": 0,
      "task_backups": 0,
      "scan_time": "2025-07-22T10:30:45+08:00"
    },
    "timestamp": "2025-07-22T10:30:45+08:00"
  }
}

错误响应 (400/500)

{
  "code": 400,
  "success": false,
  "message": "请求参数错误",
  "data": {
    "response": "无效的task_id格式,只能包含字母、数字和下划线",
    "error_type": "INVALID_PARAMS",
    "timestamp": "2025-07-22T10:30:45+08:00"
  }
}

API 2: 备份恢复 API

基本信息

  • 端点: POST /api/v0/data_pipeline/vector/restore
  • 方法: POST
  • 内容类型: application/json
  • 认证: 无(当前版本)

请求参数

参数名 类型 必需 默认值 说明
backup_path string - 备份文件所在的目录路径(相对路径)
timestamp string - 备份文件的时间戳(用于确定具体文件)
tables array[string] ["langchain_pg_collection", "langchain_pg_embedding"] 要恢复的表名列表
pg_conn string null PostgreSQL连接字符串,不提供则从config获取
truncate_before_restore boolean false 恢复前是否清空目标表

请求示例

1. 恢复所有表(推荐用法)

curl -X POST http://localhost:8084/api/v0/data_pipeline/vector/restore \
  -H "Content-Type: application/json" \
  -d '{
    "backup_path": "./data_pipeline/training_data/task_20250721_213627/vector_bak",
    "timestamp": "20250721_215758",
    "truncate_before_restore": true
  }'

2. 仅恢复特定表

curl -X POST http://localhost:8084/api/v0/data_pipeline/vector/restore \
  -H "Content-Type: application/json" \
  -d '{
    "backup_path": "./data_pipeline/training_data/vector_bak",
    "timestamp": "20250722_010318",
    "tables": ["langchain_pg_embedding"],
    "truncate_before_restore": false
  }'

3. 使用自定义数据库连接

curl -X POST http://localhost:8084/api/v0/data_pipeline/vector/restore \
  -H "Content-Type: application/json" \
  -d '{
    "backup_path": "./data_pipeline/training_data/vector_bak",
    "timestamp": "20250722_010318",
    "pg_conn": "postgresql://user:password@localhost:5432/target_db",
    "truncate_before_restore": true
  }'

响应格式

成功响应 (200)

{
  "code": 200,
  "success": true,
  "message": "操作成功",
  "data": {
    "response": "Vector表恢复完成",
    "restore_performed": true,
    "truncate_performed": true,
    "backup_info": {
      "backup_path": "./data_pipeline/training_data/task_20250721_213627/vector_bak",
      "timestamp": "20250721_215758",
      "backup_date": "2025-07-21 21:57:58"
    },
    "truncate_results": {
      "langchain_pg_collection": {
        "success": true,
        "rows_before": 4,
        "rows_after": 0,
        "duration": 0.025
      },
      "langchain_pg_embedding": {
        "success": true,
        "rows_before": 58,
        "rows_after": 0,
        "duration": 0.063
      }
    },
    "restore_results": {
      "langchain_pg_collection": {
        "success": true,
        "source_file": "langchain_pg_collection_20250721_215758.csv",
        "rows_restored": 4,
        "file_size": "209 B",
        "duration": 0.145
      },
      "langchain_pg_embedding": {
        "success": true,
        "source_file": "langchain_pg_embedding_20250721_215758.csv",
        "rows_restored": 58,
        "file_size": "764 KB",
        "duration": 0.678
      }
    },
    "errors": [],
    "duration": 0.911,
    "timestamp": "2025-07-22T10:35:20+08:00"
  }
}

部分失败响应 (200)

{
  "code": 200,
  "success": true,
  "message": "操作成功",
  "data": {
    "response": "Vector表恢复部分完成,部分表恢复失败",
    "restore_performed": true,
    "truncate_performed": false,
    "backup_info": {
      "backup_path": "./data_pipeline/training_data/vector_bak",
      "timestamp": "20250722_010318",
      "backup_date": "2025-07-22 01:03:18"
    },
    "restore_results": {
      "langchain_pg_collection": {
        "success": true,
        "source_file": "langchain_pg_collection_20250722_010318.csv",
        "rows_restored": 4,
        "file_size": "209 B",
        "duration": 0.134
      },
      "langchain_pg_embedding": {
        "success": false,
        "source_file": "langchain_pg_embedding_20250722_010318.csv",
        "error": "文件读取失败: [Errno 2] No such file or directory"
      }
    },
    "errors": ["langchain_pg_embedding表恢复失败"],
    "duration": 0.234,
    "timestamp": "2025-07-22T10:35:20+08:00"
  }
}

错误响应

文件不存在 (404)
{
  "code": 404,
  "success": false,
  "message": "资源未找到",
  "data": {
    "response": "备份文件不存在: ./data_pipeline/training_data/vector_bak/langchain_pg_collection_20250722_999999.csv",
    "error_type": "RESOURCE_NOT_FOUND",
    "timestamp": "2025-07-22T10:35:20+08:00"
  }
}
参数错误 (400)
{
  "code": 400,
  "success": false,
  "message": "请求参数错误",
  "data": {
    "response": "缺少必需参数: backup_path, timestamp",
    "error_type": "MISSING_REQUIRED_PARAMS",
    "missing_params": ["backup_path", "timestamp"],
    "timestamp": "2025-07-22T10:35:20+08:00"
  }
}
数据库错误 (500)
{
  "code": 500,
  "success": false,
  "message": "系统内部错误",
  "data": {
    "response": "数据库连接失败,请检查连接配置",
    "error_type": "DATABASE_ERROR",
    "can_retry": true,
    "timestamp": "2025-07-22T10:35:20+08:00"
  }
}

功能详细设计

1. 文件扫描逻辑(列表API)

目录扫描策略

  1. 基础目录: 从 data_pipeline/config.pyoutput_directory 配置获取
  2. 全局备份: 扫描 {output_directory}/vector_bak/
  3. 任务备份: 扫描 {output_directory}/task_*/vector_bak/{output_directory}/manual_*/vector_bak/

文件匹配算法

def find_backup_sets(backup_dir):
    """查找备份集(同时存在collection和embedding文件的时间戳)"""
    collection_files = glob.glob(f"{backup_dir}/langchain_pg_collection_*.csv")
    embedding_files = glob.glob(f"{backup_dir}/langchain_pg_embedding_*.csv")
    
    # 提取时间戳
    collection_timestamps = set(extract_timestamp(f) for f in collection_files)
    embedding_timestamps = set(extract_timestamp(f) for f in embedding_files)
    
    # 找到同时存在两个文件的时间戳
    valid_timestamps = collection_timestamps & embedding_timestamps
    
    return sorted(valid_timestamps, reverse=True)  # 最新的在前

跨平台兼容性

  • 使用 Path 对象处理路径,自动适配Windows和Linux
  • 相对路径始终使用Unix风格(/)进行返回,确保API响应的一致性
  • 文件大小格式化使用统一的 _format_file_size() 函数

2. 数据恢复逻辑(恢复API)

恢复流程

  1. 参数验证: 验证备份路径、时间戳、表名等
  2. 文件检查: 确认备份文件存在且可读
  3. 数据库连接: 建立目标数据库连接
  4. 表清空(可选): 执行 TRUNCATE 操作
  5. 数据导入: 使用 PostgreSQL COPY 命令导入CSV
  6. 结果验证: 检查导入的行数是否正确
  7. 日志记录: 记录详细的恢复操作日志

数据导入实现

 def restore_table_from_csv(self, table_name: str, csv_file: Path) -> Dict[str, Any]:
     """从CSV文件恢复表数据 - 使用相对路径即可"""
     try:
         start_time = time.time()
         
         with self.get_connection() as conn:
             with conn.cursor() as cursor:
                 # 使用COPY FROM STDIN命令高效导入(不需要绝对路径)
                 with open(csv_file, 'r', encoding='utf-8') as f:
                     # 跳过CSV头部
                     next(f)
                     cursor.copy_expert(
                         f"COPY {table_name} FROM STDIN WITH CSV",
                         f
                     )
                 
                 # 验证导入结果
                 cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
                 rows_restored = cursor.fetchone()[0]
         
         duration = time.time() - start_time
         file_size = csv_file.stat().st_size
         
         return {
             "success": True,
             "source_file": csv_file.name,
             "rows_restored": rows_restored,
             "file_size": self._format_file_size(file_size),
             "duration": duration
         }
         
     except Exception as e:
         return {
             "success": False,
             "source_file": csv_file.name,
             "error": str(e)
         }

错误处理策略

  • 文件级错误: 文件不存在、权限不足、格式错误
  • 数据库级错误: 连接失败、表不存在、权限不足
  • 数据级错误: CSV格式不匹配、数据类型错误、约束冲突

回滚策略

  • 如果 truncate_before_restore=true,在数据导入失败时不进行自动回滚
  • 建议用户在重要操作前先创建备份
  • 提供详细的错误信息帮助用户手动修复

3. 数据库连接管理

连接优先级

  1. 显式连接: 请求参数中的 pg_conn
  2. 配置连接: data_pipeline.config.SCHEMA_TOOLS_CONFIG.default_db_connection
  3. 默认连接: app_config.PGVECTOR_CONFIG

连接字符串格式

postgresql://username:password@host:port/database

临时连接配置

# 临时修改数据库连接(恢复API中使用)
original_config = None
if pg_conn:
    from data_pipeline.config import SCHEMA_TOOLS_CONFIG
    original_config = SCHEMA_TOOLS_CONFIG.get("default_db_connection")
    SCHEMA_TOOLS_CONFIG["default_db_connection"] = pg_conn

try:
    # 执行恢复操作
    pass
finally:
    # 恢复原始配置
    if original_config is not None:
        SCHEMA_TOOLS_CONFIG["default_db_connection"] = original_config

4. 性能优化

大文件处理

  • 使用 PostgreSQL 的 COPY 命令进行高效批量导入
  • 支持大型CSV文件(GB级别)的流式处理
  • 避免将整个文件加载到内存中

并发考虑

  • 单个API调用中串行处理多个表(避免锁竞争)
  • 支持多个API调用并发执行(不同的备份恢复操作)

内存优化

  • 使用流式CSV读取,逐行处理
  • 避免缓存大量数据在内存中
  • 及时释放数据库连接和文件句柄

实现架构

实现方式(与现有备份API保持一致)

核心实现位置:在 unified_api.py 中直接添加两个新路由

# 在 unified_api.py 中添加以下两个路由:

@app.route('/api/v0/data_pipeline/vector/restore/list', methods=['GET'])
def list_vector_backups():
    """列出可用的vector表备份文件"""
    # 实现列表API逻辑
    
@app.route('/api/v0/data_pipeline/vector/restore', methods=['POST'])
def restore_vector_tables():
    """恢复vector表数据"""
    # 实现恢复API逻辑

文件结构(极简实现)

# 新增核心实现类
data_pipeline/api/
├── vector_restore_manager.py # 新增:VectorRestoreManager类
└── ...

# 复用现有文件
data_pipeline/
├── config.py                 # 复用:配置管理
└── trainer/
    └── vector_table_manager.py  # 参考:数据库连接逻辑

common/
└── result.py                 # 复用:标准响应格式

# 修改现有文件  
unified_api.py                # 修改:添加两个新路由(约100行代码)

实现架构详细说明

1. VectorRestoreManager 类 (新增文件)

文件位置: data_pipeline/api/vector_restore_manager.py

class VectorRestoreManager:
    """Vector表恢复管理器 - 仿照VectorTableManager设计"""
    
    def __init__(self, base_output_dir: str = None):
        """初始化恢复管理器,复用现有配置机制"""
        
    def scan_backup_files(self, global_only: bool = False, task_id: str = None) -> Dict[str, Any]:
        """扫描可用的备份文件"""
        
    def restore_from_backup(self, backup_path: str, timestamp: str, 
                          tables: List[str] = None, pg_conn: str = None,
                          truncate_before_restore: bool = False) -> Dict[str, Any]:
        """从备份文件恢复数据"""
        
    def get_connection(self):
        """获取数据库连接 - 完全复用VectorTableManager的连接逻辑"""
        
    def _restore_table_from_csv(self, table_name: str, csv_file: Path) -> Dict[str, Any]:
        """从CSV文件恢复单个表 - 使用COPY FROM STDIN"""

2. API路由实现 (修改现有文件)

文件位置: unified_api.py (在现有备份API附近添加)

@app.route('/api/v0/data_pipeline/vector/restore/list', methods=['GET'])
def list_vector_backups():
    """列出可用的vector表备份文件 - 约40行代码"""
    try:
        # 解析查询参数
        global_only = request.args.get('global_only', 'false').lower() == 'true'
        task_id = request.args.get('task_id')
        
        # 使用VectorRestoreManager扫描
        restore_manager = VectorRestoreManager()
        result = restore_manager.scan_backup_files(global_only, task_id)
        
        # 返回标准格式
        return jsonify(success_response(
            response_text=f"成功扫描到 {len(result['backup_locations'])} 个备份位置",
            data=result
        )), 200
        
    except Exception as e:
        return jsonify(internal_error_response("扫描备份文件失败")), 500
    
@app.route('/api/v0/data_pipeline/vector/restore', methods=['POST'])
def restore_vector_tables():
    """恢复vector表数据 - 约60行代码"""
    try:
        req = request.get_json()
        # 参数解析和验证...
        
        # 执行恢复
        restore_manager = VectorRestoreManager()
        result = restore_manager.restore_from_backup(...)
        
        # 返回结果
        return jsonify(success_response(
            response_text="Vector表恢复完成",
            data=result
        )), 200
        
    except Exception as e:
        return jsonify(internal_error_response("Vector表恢复失败")), 500

实现工作量总结

组件 文件 工作量 说明
核心类 data_pipeline/api/vector_restore_manager.py 新增 ~200行 扫描和恢复逻辑
API路由 unified_api.py 新增 ~100行 两个路由函数
总计 ~300行代码 复用现有架构

实现步骤

  1. 创建VectorRestoreManager类 - 仿照现有VectorTableManager
  2. 在unified_api.py中添加两个路由 - 紧邻现有备份API
  3. 测试验证 - 确保与现有备份文件兼容

与现有系统的集成

1. 配置复用

  • 复用 data_pipeline/config.pyoutput_directory 配置
  • 复用现有的数据库连接配置机制
  • 复用 vector_table_management 配置节

2. 工具复用

  • 复用 VectorTableManager 的数据库连接逻辑
  • 复用 common/result.py 的标准响应格式
  • 复用现有的日志记录机制 [[memory:3840221]]

3. 文件格式兼容

  • 完全兼容现有备份API生成的CSV文件格式
  • 支持所有现有的备份文件命名规范
  • 与现有备份日志格式保持一致

4. 错误处理统一

  • 使用相同的错误分类和响应码
  • 复用现有的参数验证逻辑
  • 保持错误消息的一致性

安全考虑

1. 路径安全

  • 验证备份路径,防止路径遍历攻击
  • 限制只能访问训练数据目录下的文件
  • 使用相对路径和 Path 对象进行安全的路径处理

2. 文件安全

  • 验证CSV文件格式,防止恶意文件
  • 检查文件大小限制,防止资源耗尽
  • 使用安全的文件读取方式

3. 数据库安全

  • 使用参数化查询,防止SQL注入
  • 验证表名,限制只能操作指定的vector表
  • 正确处理数据库连接,避免连接泄露

4. 输入验证

  • 严格验证所有API参数
  • 使用正则表达式验证task_id格式
  • 检查时间戳格式的有效性

测试策略

1. 单元测试

  • 文件扫描逻辑测试
  • CSV解析和恢复逻辑测试
  • 错误处理流程测试

2. 集成测试

  • 端到端备份和恢复流程测试
  • 与现有备份API的兼容性测试
  • 跨平台路径处理测试

3. 性能测试

  • 大文件恢复性能测试
  • 并发恢复操作测试
  • 内存使用情况监控

4. 错误场景测试

  • 文件不存在情况
  • 数据库连接失败情况
  • 磁盘空间不足情况

部署说明

1. 依赖要求

  • Python 3.8+
  • psycopg2-binary
  • 现有的项目依赖

2. 配置要求

  • 确保 data_pipeline/config.py 配置正确
  • 确保数据库连接配置可用
  • 确保目标数据库有相应的表结构

3. 权限要求

  • 文件系统读取权限(访问备份文件)
  • 数据库写入权限(INSERT、TRUNCATE)
  • 临时文件创建权限

使用场景

1. 数据迁移

# 1. 列出源环境的备份
curl "http://source-server:8084/api/v0/data_pipeline/vector/restore/list"

# 2. 复制备份文件到目标环境

# 3. 在目标环境恢复数据
curl -X POST http://target-server:8084/api/v0/data_pipeline/vector/restore \
  -d '{"backup_path": "./data_pipeline/training_data/vector_bak", "timestamp": "20250722_010318"}'

2. 数据回滚

# 1. 查找回滚点
curl "http://localhost:8084/api/v0/data_pipeline/vector/restore/list?task_id=task_20250721_213627"

# 2. 恢复到指定时间点
curl -X POST http://localhost:8084/api/v0/data_pipeline/vector/restore \
  -d '{"backup_path": "./data_pipeline/training_data/task_20250721_213627/vector_bak", "timestamp": "20250721_215758", "truncate_before_restore": true}'

3. 部分数据恢复

# 仅恢复embedding表
curl -X POST http://localhost:8084/api/v0/data_pipeline/vector/restore \
  -d '{"backup_path": "./data_pipeline/training_data/vector_bak", "timestamp": "20250722_010318", "tables": ["langchain_pg_embedding"], "truncate_before_restore": false}'

总结

这两个恢复备份API将提供:

完整的备份文件管理 - 智能扫描和列出所有可用备份
灵活的恢复选项 - 支持全量/部分恢复、清空/追加模式
跨平台兼容性 - 同时支持Windows和Ubuntu系统
高性能数据处理 - 使用PostgreSQL COPY命令高效导入
完善的错误处理 - 详细的错误信息和恢复建议
标准化API设计 - 复用现有的响应格式和错误处理
安全的文件操作 - 防止路径遍历和文件安全风险
与现有系统兼容 - 完全兼容现有备份文件格式

这个设计充分利用了现有的系统组件,提供了完整而强大的vector表备份恢复功能!