为系统添加一个专用的 pgvector 表备份 API,支持备份 langchain_pg_collection
和 langchain_pg_embedding
两张表。该 API 充分复用现有的成熟备份功能,仅需要薄薄的API封装层。
现有的 VectorTableManager
已经非常完善:
cursor.itersize = batch_size
支持大数据量导出.tmp
文件,成功后重命名为.csv
,保证数据完整性vector_bak
目录vector_backup_log.txt
备份日志VectorTableManager
已包含完善的数据库连接优先级处理POST /api/v0/data_pipeline/vector/backup
参数名 | 类型 | 必需 | 默认值 | 说明 |
---|---|---|---|---|
task_id |
string | 否 | null | 任务ID,如果提供则在该task目录下创建备份 |
db_connection |
string | 否 | null | PostgreSQL连接字符串,不提供则从config.py获取 |
truncate_vector_tables |
boolean | 否 | false | 备份完成后是否清空vector表 |
backup_vector_tables |
boolean | 否 | true | 是否执行备份操作(默认为true,不需要显式设置) |
curl -X POST http://localhost:8084/api/v0/data_pipeline/vector/backup \
-H "Content-Type: application/json" \
-d '{}'
行为: 在 data_pipeline/training_data/vector_bak/
目录下创建备份,使用默认数据库连接。
curl -X POST http://localhost:8084/api/v0/data_pipeline/vector/backup \
-H "Content-Type: application/json" \
-d '{
"task_id": "task_20250721_213627",
"truncate_vector_tables": false
}'
curl -X POST http://localhost:8084/api/v0/data_pipeline/vector/backup \
-H "Content-Type: application/json" \
-d '{
"truncate_vector_tables": false
}'
curl -X POST http://localhost:8084/api/v0/data_pipeline/vector/backup \
-H "Content-Type: application/json" \
-d '{
"task_id": "task_20250721_213627",
"truncate_vector_tables": true
}'
curl -X POST http://localhost:8084/api/v0/data_pipeline/vector/backup \
-H "Content-Type: application/json" \
-d '{
"task_id": "task_20250721_213627",
"db_connection": "postgresql://user:password@localhost:5432/dbname",
"truncate_vector_tables": false
}'
HTTP状态码: 200
使用 common/result.py
的 success_response()
格式:
{
"code": 200,
"success": true,
"message": "操作成功",
"data": {
"response": "Vector表备份完成",
"backup_performed": true,
"truncate_performed": false,
"backup_directory": "/path/to/training_data/task_20250721_213627/vector_bak",
"tables_backed_up": {
"langchain_pg_collection": {
"success": true,
"row_count": 4,
"file_size": "209.0 B",
"backup_file": "langchain_pg_collection_20250721_234914.csv",
"duration": 0.105
},
"langchain_pg_embedding": {
"success": true,
"row_count": 58,
"file_size": "764.0 KB",
"backup_file": "langchain_pg_embedding_20250721_234914.csv",
"duration": 0.312
}
},
"truncate_results": {
"langchain_pg_embedding": {
"success": true,
"rows_before": 58,
"rows_after": 0,
"duration": 0.068
}
},
"errors": [],
"duration": 0.498,
"timestamp": "2025-07-21T23:49:14+08:00"
}
}
HTTP状态码: 400/404/500
使用 common/result.py
的相应错误响应方法:
{
"code": 400,
"success": false,
"message": "请求参数错误",
"data": {
"response": "无效的task_id格式,只能包含字母、数字和下划线",
"error_type": "INVALID_PARAMS",
"timestamp": "2025-07-21T23:49:14+08:00"
}
}
{
"code": 404,
"success": false,
"message": "资源未找到",
"data": {
"response": "指定的任务目录不存在: task_20250721_999999",
"error_type": "RESOURCE_NOT_FOUND",
"timestamp": "2025-07-21T23:49:14+08:00"
}
}
{
"code": 500,
"success": false,
"message": "系统内部错误",
"data": {
"response": "数据库连接失败,请检查连接配置",
"error_type": "DATABASE_ERROR",
"can_retry": true,
"timestamp": "2025-07-21T23:49:14+08:00"
}
}
data_pipeline/training_data/{task_id}/vector_bak/
{}
调用)data_pipeline/training_data/vector_bak/
备份文件使用时间戳命名:
langchain_pg_collection_{YYYYMMDD_HHMMSS}.csv
langchain_pg_embedding_{YYYYMMDD_HHMMSS}.csv
示例:
langchain_pg_collection_20250721_234914.csv
langchain_pg_embedding_20250721_234914.csv
API支持两种连接方式:
db_connection
参数VectorTableManager
自动处理)postgresql://username:password@host:port/database
cursor.itersize
分批读取,支持大数据量vector_backup_log.txt
备份日志文件bad_request_response()
: 参数错误not_found_response()
: 任务不存在internal_error_response()
: 系统内部错误service_unavailable_response()
: 数据库服务不可用# 在 unified_api.py 中直接添加路由,无需新建文件
@app.route('/api/v0/data_pipeline/vector/backup', methods=['POST'])
def backup_pgvector_tables():
"""专用的pgvector表备份API - 直接复用VectorTableManager"""
try:
# 支持空参数调用 {}
req = request.get_json(force=True) if request.is_json else {}
# 解析参数(全部可选)
task_id = req.get('task_id')
db_connection = req.get('db_connection')
truncate_vector_tables = req.get('truncate_vector_tables', False)
backup_vector_tables = req.get('backup_vector_tables', True)
# 参数验证
if task_id and not re.match(r'^[a-zA-Z0-9_]+$', task_id):
return jsonify(bad_request_response(
"无效的task_id格式,只能包含字母、数字和下划线"
)), 400
# 确定备份目录
if task_id:
# 验证task_id目录是否存在
task_dir = Path(f"data_pipeline/training_data/{task_id}")
if not task_dir.exists():
return jsonify(not_found_response(
f"指定的任务目录不存在: {task_id}"
)), 404
backup_base_dir = str(task_dir)
else:
# 使用training_data根目录(支持空参数调用)
backup_base_dir = "data_pipeline/training_data"
# 直接使用现有的VectorTableManager
from data_pipeline.trainer.vector_table_manager import VectorTableManager
# 临时修改数据库连接配置(如果提供了自定义连接)
original_config = None
if db_connection:
from data_pipeline.config import SCHEMA_TOOLS_CONFIG
original_config = SCHEMA_TOOLS_CONFIG.get("default_db_connection")
SCHEMA_TOOLS_CONFIG["default_db_connection"] = db_connection
try:
# 使用现有的成熟管理器
vector_manager = VectorTableManager(
task_output_dir=backup_base_dir,
task_id=task_id or "api_backup"
)
# 执行备份(完全复用现有逻辑)
result = vector_manager.execute_vector_management(
backup=backup_vector_tables,
truncate=truncate_vector_tables
)
# 使用 common/result.py 的标准格式
return jsonify(success_response(
response_text="Vector表备份完成",
data=result
)), 200
finally:
# 恢复原始配置
if original_config is not None:
SCHEMA_TOOLS_CONFIG["default_db_connection"] = original_config
except Exception as e:
logger.error(f"Vector表备份失败: {str(e)}")
return jsonify(internal_error_response(
"Vector表备份失败,请稍后重试"
)), 500
# 现有文件,无需修改
data_pipeline/
├── trainer/
│ ├── vector_table_manager.py # ✅ 复用:现有成熟备份逻辑
│ └── ...
└── config.py # ✅ 复用:现有配置管理
common/
└── result.py # ✅ 复用:标准响应格式
# 仅需修改一个文件
unified_api.py # ✅ 修改:添加新路由(约50行代码)
整个API实现只需要:
总计不超过50行代码!
功能 | 现有execute API | 新的backup API |
---|---|---|
用途 | 完整工作流执行的一部分 | 专用的vector表备份 |
复杂度 | 复杂(包含多个步骤) | 简单(仅备份功能) |
执行时机 | 工作流的特定步骤 | 任何时候独立执行 |
参数依赖 | 需要完整的任务配置 | 仅需要备份相关参数(支持空参数) |
核心逻辑 | 相同的VectorTableManager | 相同的VectorTableManager |
响应格式 | common/result.py | common/result.py |
VectorTableManager
的完整备份逻辑common/result.py
标准响应格式.tmp
→ .csv
重命名保证文件完整性# 每日定时备份到独立目录(支持大数据量)
curl -X POST http://localhost:8084/api/v0/data_pipeline/vector/backup \
-H "Content-Type: application/json" \
-d '{}'
# 在特定任务执行前备份(流式处理,不会阻塞)
curl -X POST http://localhost:8084/api/v0/data_pipeline/vector/backup \
-H "Content-Type: application/json" \
-d '{
"task_id": "task_20250721_213627",
"truncate_vector_tables": true
}'
# 备份现有数据用于迁移(支持TB级数据)
curl -X POST http://localhost:8084/api/v0/data_pipeline/vector/backup \
-H "Content-Type: application/json" \
-d '{
"db_connection": "postgresql://source_user:pass@source_host:5432/source_db"
}'
这个极简化的专用pgvector备份API将:
✅ 100%复用现有成熟功能 - 无重复开发
✅ 仅需50行新代码 - 最小化实现成本
✅ 支持TB级大数据量 - 流式处理能力
✅ 完美兼容现有系统 - 零影响集成
✅ 提供简单独立接口 - 专用备份功能
✅ 使用标准响应格式 - 复用common/result.py
✅ 支持空参数调用 - 最简单的使用方式
这是一个真正充分利用现有功能的设计方案!