为系统添加两个专用的 pgvector 表恢复备份 API,与现有的 /api/v0/data_pipeline/vector/backup
API 相对应。这两个API将把导出为CSV的文件重新写回到PostgreSQL数据库中,充分复用现有的数据库连接和配置机制。
重要结论:经过技术分析,恢复备份API 不需要绝对路径!
cursor.copy_expert("COPY table FROM STDIN WITH CSV", file_object)
./data_pipeline/training_data/...
)Path
对象处理路径,响应统一使用Unix风格路径API | 端点 | 功能 |
---|---|---|
备份文件列表API | GET /api/v0/data_pipeline/vector/restore/list |
列出可用的备份文件 |
备份恢复API | POST /api/v0/data_pipeline/vector/restore |
执行备份恢复操作 |
GET /api/v0/data_pipeline/vector/restore/list
参数名 | 类型 | 必需 | 默认值 | 说明 |
---|---|---|---|---|
global_only |
boolean | 否 | false | 仅查询全局备份目录(training_data/vector_bak/) |
task_id |
string | 否 | null | 指定task_id,仅查询该任务下的备份文件 |
参数逻辑:
global_only=true
:仅查询 training_data/vector_bak/
task_id=xxx
:仅查询指定任务的备份文件task_id
优先级更高./data_pipeline/training_data/vector_bak/
./data_pipeline/training_data/task_*/vector_bak/
./data_pipeline/training_data/manual_*/vector_bak/
langchain_pg_collection_*.csv
和 langchain_pg_embedding_*.csv
langchain_pg_{table}_{timestamp}.csv
YYYYMMDD_HHMMSS
# 1. 查询所有备份文件
curl "http://localhost:8084/api/v0/data_pipeline/vector/restore/list"
# 2. 仅查询全局备份
curl "http://localhost:8084/api/v0/data_pipeline/vector/restore/list?global_only=true"
# 3. 查询特定任务的备份
curl "http://localhost:8084/api/v0/data_pipeline/vector/restore/list?task_id=task_20250721_213627"
{
"code": 200,
"success": true,
"message": "操作成功",
"data": {
"response": "成功扫描到 3 个备份位置,共 4 个备份集",
"backup_locations": [
{
"type": "global",
"relative_path": "./data_pipeline/training_data/vector_bak",
"backups": [
{
"timestamp": "20250722_010318",
"collection_file": "langchain_pg_collection_20250722_010318.csv",
"embedding_file": "langchain_pg_embedding_20250722_010318.csv",
"collection_size": "209 B",
"embedding_size": "819 KB",
"backup_date": "2025-07-22 01:03:18",
"has_log": true,
"log_file": "vector_backup_log.txt"
}
]
},
{
"type": "task",
"task_id": "task_20250721_213627",
"relative_path": "./data_pipeline/training_data/task_20250721_213627/vector_bak",
"backups": [
{
"timestamp": "20250721_215758",
"collection_file": "langchain_pg_collection_20250721_215758.csv",
"embedding_file": "langchain_pg_embedding_20250721_215758.csv",
"collection_size": "209 B",
"embedding_size": "764 KB",
"backup_date": "2025-07-21 21:57:58",
"has_log": true,
"log_file": "vector_backup_log.txt"
}
]
},
{
"type": "task",
"task_id": "task_20250721_183935",
"relative_path": "./data_pipeline/training_data/task_20250721_183935/vector_bak",
"backups": [
{
"timestamp": "20250721_201447",
"collection_file": "langchain_pg_collection_20250721_201447.csv",
"embedding_file": "langchain_pg_embedding_20250721_201447.csv",
"collection_size": "210 B",
"embedding_size": "780 KB",
"backup_date": "2025-07-21 20:14:47",
"has_log": true,
"log_file": "vector_backup_log.txt"
}
]
}
],
"summary": {
"total_locations": 3,
"total_backup_sets": 4,
"global_backups": 1,
"task_backups": 3,
"scan_time": "2025-07-22T10:30:45+08:00"
},
"timestamp": "2025-07-22T10:30:45+08:00"
}
}
{
"code": 200,
"success": true,
"message": "操作成功",
"data": {
"response": "未找到任何可用的备份文件",
"backup_locations": [],
"summary": {
"total_locations": 0,
"total_backup_sets": 0,
"global_backups": 0,
"task_backups": 0,
"scan_time": "2025-07-22T10:30:45+08:00"
},
"timestamp": "2025-07-22T10:30:45+08:00"
}
}
{
"code": 400,
"success": false,
"message": "请求参数错误",
"data": {
"response": "无效的task_id格式,只能包含字母、数字和下划线",
"error_type": "INVALID_PARAMS",
"timestamp": "2025-07-22T10:30:45+08:00"
}
}
POST /api/v0/data_pipeline/vector/restore
参数名 | 类型 | 必需 | 默认值 | 说明 |
---|---|---|---|---|
backup_path |
string | 是 | - | 备份文件所在的目录路径(相对路径) |
timestamp |
string | 是 | - | 备份文件的时间戳(用于确定具体文件) |
tables |
array[string] | 否 | ["langchain_pg_collection", "langchain_pg_embedding"] | 要恢复的表名列表 |
pg_conn |
string | 否 | null | PostgreSQL连接字符串,不提供则从config获取 |
truncate_before_restore |
boolean | 否 | false | 恢复前是否清空目标表 |
curl -X POST http://localhost:8084/api/v0/data_pipeline/vector/restore \
-H "Content-Type: application/json" \
-d '{
"backup_path": "./data_pipeline/training_data/task_20250721_213627/vector_bak",
"timestamp": "20250721_215758",
"truncate_before_restore": true
}'
curl -X POST http://localhost:8084/api/v0/data_pipeline/vector/restore \
-H "Content-Type: application/json" \
-d '{
"backup_path": "./data_pipeline/training_data/vector_bak",
"timestamp": "20250722_010318",
"tables": ["langchain_pg_embedding"],
"truncate_before_restore": false
}'
curl -X POST http://localhost:8084/api/v0/data_pipeline/vector/restore \
-H "Content-Type: application/json" \
-d '{
"backup_path": "./data_pipeline/training_data/vector_bak",
"timestamp": "20250722_010318",
"pg_conn": "postgresql://user:password@localhost:5432/target_db",
"truncate_before_restore": true
}'
{
"code": 200,
"success": true,
"message": "操作成功",
"data": {
"response": "Vector表恢复完成",
"restore_performed": true,
"truncate_performed": true,
"backup_info": {
"backup_path": "./data_pipeline/training_data/task_20250721_213627/vector_bak",
"timestamp": "20250721_215758",
"backup_date": "2025-07-21 21:57:58"
},
"truncate_results": {
"langchain_pg_collection": {
"success": true,
"rows_before": 4,
"rows_after": 0,
"duration": 0.025
},
"langchain_pg_embedding": {
"success": true,
"rows_before": 58,
"rows_after": 0,
"duration": 0.063
}
},
"restore_results": {
"langchain_pg_collection": {
"success": true,
"source_file": "langchain_pg_collection_20250721_215758.csv",
"rows_restored": 4,
"file_size": "209 B",
"duration": 0.145
},
"langchain_pg_embedding": {
"success": true,
"source_file": "langchain_pg_embedding_20250721_215758.csv",
"rows_restored": 58,
"file_size": "764 KB",
"duration": 0.678
}
},
"errors": [],
"duration": 0.911,
"timestamp": "2025-07-22T10:35:20+08:00"
}
}
{
"code": 200,
"success": true,
"message": "操作成功",
"data": {
"response": "Vector表恢复部分完成,部分表恢复失败",
"restore_performed": true,
"truncate_performed": false,
"backup_info": {
"backup_path": "./data_pipeline/training_data/vector_bak",
"timestamp": "20250722_010318",
"backup_date": "2025-07-22 01:03:18"
},
"restore_results": {
"langchain_pg_collection": {
"success": true,
"source_file": "langchain_pg_collection_20250722_010318.csv",
"rows_restored": 4,
"file_size": "209 B",
"duration": 0.134
},
"langchain_pg_embedding": {
"success": false,
"source_file": "langchain_pg_embedding_20250722_010318.csv",
"error": "文件读取失败: [Errno 2] No such file or directory"
}
},
"errors": ["langchain_pg_embedding表恢复失败"],
"duration": 0.234,
"timestamp": "2025-07-22T10:35:20+08:00"
}
}
{
"code": 404,
"success": false,
"message": "资源未找到",
"data": {
"response": "备份文件不存在: ./data_pipeline/training_data/vector_bak/langchain_pg_collection_20250722_999999.csv",
"error_type": "RESOURCE_NOT_FOUND",
"timestamp": "2025-07-22T10:35:20+08:00"
}
}
{
"code": 400,
"success": false,
"message": "请求参数错误",
"data": {
"response": "缺少必需参数: backup_path, timestamp",
"error_type": "MISSING_REQUIRED_PARAMS",
"missing_params": ["backup_path", "timestamp"],
"timestamp": "2025-07-22T10:35:20+08:00"
}
}
{
"code": 500,
"success": false,
"message": "系统内部错误",
"data": {
"response": "数据库连接失败,请检查连接配置",
"error_type": "DATABASE_ERROR",
"can_retry": true,
"timestamp": "2025-07-22T10:35:20+08:00"
}
}
data_pipeline/config.py
的 output_directory
配置获取{output_directory}/vector_bak/
{output_directory}/task_*/vector_bak/
和 {output_directory}/manual_*/vector_bak/
def find_backup_sets(backup_dir):
"""查找备份集(同时存在collection和embedding文件的时间戳)"""
collection_files = glob.glob(f"{backup_dir}/langchain_pg_collection_*.csv")
embedding_files = glob.glob(f"{backup_dir}/langchain_pg_embedding_*.csv")
# 提取时间戳
collection_timestamps = set(extract_timestamp(f) for f in collection_files)
embedding_timestamps = set(extract_timestamp(f) for f in embedding_files)
# 找到同时存在两个文件的时间戳
valid_timestamps = collection_timestamps & embedding_timestamps
return sorted(valid_timestamps, reverse=True) # 最新的在前
Path
对象处理路径,自动适配Windows和Linux/
)进行返回,确保API响应的一致性_format_file_size()
函数 def restore_table_from_csv(self, table_name: str, csv_file: Path) -> Dict[str, Any]:
"""从CSV文件恢复表数据 - 使用相对路径即可"""
try:
start_time = time.time()
with self.get_connection() as conn:
with conn.cursor() as cursor:
# 使用COPY FROM STDIN命令高效导入(不需要绝对路径)
with open(csv_file, 'r', encoding='utf-8') as f:
# 跳过CSV头部
next(f)
cursor.copy_expert(
f"COPY {table_name} FROM STDIN WITH CSV",
f
)
# 验证导入结果
cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
rows_restored = cursor.fetchone()[0]
duration = time.time() - start_time
file_size = csv_file.stat().st_size
return {
"success": True,
"source_file": csv_file.name,
"rows_restored": rows_restored,
"file_size": self._format_file_size(file_size),
"duration": duration
}
except Exception as e:
return {
"success": False,
"source_file": csv_file.name,
"error": str(e)
}
truncate_before_restore=true
,在数据导入失败时不进行自动回滚pg_conn
data_pipeline.config.SCHEMA_TOOLS_CONFIG.default_db_connection
app_config.PGVECTOR_CONFIG
postgresql://username:password@host:port/database
# 临时修改数据库连接(恢复API中使用)
original_config = None
if pg_conn:
from data_pipeline.config import SCHEMA_TOOLS_CONFIG
original_config = SCHEMA_TOOLS_CONFIG.get("default_db_connection")
SCHEMA_TOOLS_CONFIG["default_db_connection"] = pg_conn
try:
# 执行恢复操作
pass
finally:
# 恢复原始配置
if original_config is not None:
SCHEMA_TOOLS_CONFIG["default_db_connection"] = original_config
核心实现位置:在 unified_api.py
中直接添加两个新路由
# 在 unified_api.py 中添加以下两个路由:
@app.route('/api/v0/data_pipeline/vector/restore/list', methods=['GET'])
def list_vector_backups():
"""列出可用的vector表备份文件"""
# 实现列表API逻辑
@app.route('/api/v0/data_pipeline/vector/restore', methods=['POST'])
def restore_vector_tables():
"""恢复vector表数据"""
# 实现恢复API逻辑
# 新增核心实现类
data_pipeline/api/
├── vector_restore_manager.py # 新增:VectorRestoreManager类
└── ...
# 复用现有文件
data_pipeline/
├── config.py # 复用:配置管理
└── trainer/
└── vector_table_manager.py # 参考:数据库连接逻辑
common/
└── result.py # 复用:标准响应格式
# 修改现有文件
unified_api.py # 修改:添加两个新路由(约100行代码)
文件位置: data_pipeline/api/vector_restore_manager.py
class VectorRestoreManager:
"""Vector表恢复管理器 - 仿照VectorTableManager设计"""
def __init__(self, base_output_dir: str = None):
"""初始化恢复管理器,复用现有配置机制"""
def scan_backup_files(self, global_only: bool = False, task_id: str = None) -> Dict[str, Any]:
"""扫描可用的备份文件"""
def restore_from_backup(self, backup_path: str, timestamp: str,
tables: List[str] = None, pg_conn: str = None,
truncate_before_restore: bool = False) -> Dict[str, Any]:
"""从备份文件恢复数据"""
def get_connection(self):
"""获取数据库连接 - 完全复用VectorTableManager的连接逻辑"""
def _restore_table_from_csv(self, table_name: str, csv_file: Path) -> Dict[str, Any]:
"""从CSV文件恢复单个表 - 使用COPY FROM STDIN"""
文件位置: unified_api.py
(在现有备份API附近添加)
@app.route('/api/v0/data_pipeline/vector/restore/list', methods=['GET'])
def list_vector_backups():
"""列出可用的vector表备份文件 - 约40行代码"""
try:
# 解析查询参数
global_only = request.args.get('global_only', 'false').lower() == 'true'
task_id = request.args.get('task_id')
# 使用VectorRestoreManager扫描
restore_manager = VectorRestoreManager()
result = restore_manager.scan_backup_files(global_only, task_id)
# 返回标准格式
return jsonify(success_response(
response_text=f"成功扫描到 {len(result['backup_locations'])} 个备份位置",
data=result
)), 200
except Exception as e:
return jsonify(internal_error_response("扫描备份文件失败")), 500
@app.route('/api/v0/data_pipeline/vector/restore', methods=['POST'])
def restore_vector_tables():
"""恢复vector表数据 - 约60行代码"""
try:
req = request.get_json()
# 参数解析和验证...
# 执行恢复
restore_manager = VectorRestoreManager()
result = restore_manager.restore_from_backup(...)
# 返回结果
return jsonify(success_response(
response_text="Vector表恢复完成",
data=result
)), 200
except Exception as e:
return jsonify(internal_error_response("Vector表恢复失败")), 500
组件 | 文件 | 工作量 | 说明 |
---|---|---|---|
核心类 | data_pipeline/api/vector_restore_manager.py |
新增 ~200行 | 扫描和恢复逻辑 |
API路由 | unified_api.py |
新增 ~100行 | 两个路由函数 |
总计 | ~300行代码 | 复用现有架构 |
data_pipeline/config.py
的 output_directory
配置vector_table_management
配置节VectorTableManager
的数据库连接逻辑common/result.py
的标准响应格式Path
对象进行安全的路径处理data_pipeline/config.py
配置正确# 1. 列出源环境的备份
curl "http://source-server:8084/api/v0/data_pipeline/vector/restore/list"
# 2. 复制备份文件到目标环境
# 3. 在目标环境恢复数据
curl -X POST http://target-server:8084/api/v0/data_pipeline/vector/restore \
-d '{"backup_path": "./data_pipeline/training_data/vector_bak", "timestamp": "20250722_010318"}'
# 1. 查找回滚点
curl "http://localhost:8084/api/v0/data_pipeline/vector/restore/list?task_id=task_20250721_213627"
# 2. 恢复到指定时间点
curl -X POST http://localhost:8084/api/v0/data_pipeline/vector/restore \
-d '{"backup_path": "./data_pipeline/training_data/task_20250721_213627/vector_bak", "timestamp": "20250721_215758", "truncate_before_restore": true}'
# 仅恢复embedding表
curl -X POST http://localhost:8084/api/v0/data_pipeline/vector/restore \
-d '{"backup_path": "./data_pipeline/training_data/vector_bak", "timestamp": "20250722_010318", "tables": ["langchain_pg_embedding"], "truncate_before_restore": false}'
这两个恢复备份API将提供:
✅ 完整的备份文件管理 - 智能扫描和列出所有可用备份
✅ 灵活的恢复选项 - 支持全量/部分恢复、清空/追加模式
✅ 跨平台兼容性 - 同时支持Windows和Ubuntu系统
✅ 高性能数据处理 - 使用PostgreSQL COPY命令高效导入
✅ 完善的错误处理 - 详细的错误信息和恢复建议
✅ 标准化API设计 - 复用现有的响应格式和错误处理
✅ 安全的文件操作 - 防止路径遍历和文件安全风险
✅ 与现有系统兼容 - 完全兼容现有备份文件格式
这个设计充分利用了现有的系统组件,提供了完整而强大的vector表备份恢复功能!