当前系统中Vector表管理功能存在两套不同的实现路径,导致脚本模式和API模式的行为不一致。本文档提出统一化重构方案,将Vector表管理功能统一收敛到 training_load
步骤中,实现脚本模式和API模式的完全一致。
基于用户反馈,以下为最终确定的参数设计:
--backup-vector-tables
:备份vector表数据(默认值:false
)--truncate-vector-tables
:清空vector表数据,自动启用备份(默认值:false
)--skip-training
:跳过训练文件处理,仅执行Vector表管理(默认值:false
)--truncate-vector-tables=true
自动设置 --backup-vector-tables=true
(单向依赖,backup
不会触发 truncate
)--skip-training=true
但未指定任何Vector操作时,记录警告并跳过所有处理(宽松模式)--skip-training-load
参数,统一使用 --skip-training
backup_vector_tables | truncate_vector_tables | skip_training | 实际行为 | 推荐 |
---|---|---|---|---|
false | false | false | 正常训练文件处理,无Vector操作 | ✅ |
true | false | false | 备份Vector表 + 训练文件处理 | ✅ |
false | true | false | 备份Vector表 + 清空Vector表 + 训练文件处理¹ | ✅ |
true | true | false | 备份Vector表 + 清空Vector表 + 训练文件处理² | ⚠️ |
true | false | true | 仅备份Vector表,跳过训练文件处理 | ✅ |
false | true | true | 仅备份+清空Vector表,跳过训练文件处理¹ | ✅ |
true | true | true | 仅备份+清空Vector表,跳过训练文件处理² | ⚠️ |
false | false | true | ⚠️ 警告:未指定Vector操作,跳过所有处理 | ❌ |
注释:
truncate
自动启用 backup
,这是预期行为truncate_vector_tables
🎯 最佳实践:
# ✅ 仅备份Vector表数据
--backup-vector-tables
# ✅ 清空Vector表数据(自动包含备份)
--truncate-vector-tables
# ❌ 避免冗余:不要同时使用
--backup-vector-tables --truncate-vector-tables # 冗余
# ❌ 避免无意义的组合
--skip-training # 没有Vector操作,什么都不做
python -m data_pipeline.schema_workflow --backup-vector-tables --truncate-vector-tables
当前调用链:
schema_workflow.py::main()
↓
SchemaWorkflowOrchestrator(backup_vector_tables=True, truncate_vector_tables=True)
↓
execute_complete_workflow()
├─ _execute_vector_table_management() # 独立执行Vector表管理
└─ _execute_step_4_training_data_load()
└─ process_training_files(backup_vector_tables=False, truncate_vector_tables=False) # 禁用以避免重复
python -m data_pipeline.trainer.run_training --backup-vector-tables --truncate-vector-tables
当前调用链:
run_training.py::main()
↓
process_training_files(backup_vector_tables=True, truncate_vector_tables=True)
↓
VectorTableManager.execute_vector_management() # 在函数内部执行
{
"execution_mode": "complete",
"backup_vector_tables": true,
"truncate_vector_tables": true
}
当前调用链:
unified_api.py::execute_data_pipeline_task()
↓
SimpleWorkflowExecutor(backup_vector_tables=True, truncate_vector_tables=True)
↓
execute_complete_workflow()
↓
SchemaWorkflowOrchestrator.execute_complete_workflow()
├─ _execute_vector_table_management() # 独立执行Vector表管理
└─ _execute_step_4_training_data_load()
└─ process_training_files(backup_vector_tables=False, truncate_vector_tables=False) # 禁用以避免重复
{
"execution_mode": "step",
"step_name": "training_load",
"backup_vector_tables": true,
"truncate_vector_tables": true
}
当前调用链(有问题):
unified_api.py::execute_data_pipeline_task()
↓
SimpleWorkflowExecutor(backup_vector_tables=True, truncate_vector_tables=True)
↓
execute_single_step("training_load")
↓
SchemaWorkflowOrchestrator._execute_step_4_training_data_load()
↓
process_training_files(backup_vector_tables=False, truncate_vector_tables=False) # ❌ 硬编码False
问题:API单步模式中,用户传递的Vector表管理参数被硬编码的 False
覆盖,导致功能失效。
_execute_vector_table_management()
,单步模式使用 process_training_files()
内部实现training_load
步骤中修改文件:data_pipeline/schema_workflow.py
删除代码:
# 删除这段独立的Vector表管理调用
if self.backup_vector_tables or self.truncate_vector_tables:
await self._execute_vector_table_management()
修改文件:data_pipeline/schema_workflow.py
当前代码:
# 禁用vector管理参数以避免重复执行
load_successful, _ = process_training_files(training_data_dir, self.task_id,
backup_vector_tables=False,
truncate_vector_tables=False)
修改为:
# 传递Vector表管理参数到training步骤
load_successful, vector_stats = process_training_files(training_data_dir, self.task_id,
backup_vector_tables=self.backup_vector_tables,
truncate_vector_tables=self.truncate_vector_tables)
# 记录Vector表管理结果到工作流状态
if vector_stats:
self.workflow_state["artifacts"]["vector_management"] = vector_stats
修改文件:data_pipeline/trainer/run_training.py
新增参数支持:
def process_training_files(data_path, task_id=None,
backup_vector_tables=False,
truncate_vector_tables=False,
skip_training=False): # 新增参数
"""
处理指定路径下的所有训练文件
Args:
data_path (str): 训练数据目录路径
task_id (str): 任务ID,用于日志记录
backup_vector_tables (bool): 是否备份vector表数据
truncate_vector_tables (bool): 是否清空vector表数据
skip_training (bool): 是否跳过训练文件处理,仅执行Vector表管理
Returns:
tuple: (处理成功标志, Vector表管理统计信息)
"""
# Vector表管理(前置步骤)
vector_stats = None
if backup_vector_tables or truncate_vector_tables:
# 执行Vector表管理...
vector_stats = asyncio.run(vector_manager.execute_vector_management(...))
# 如果是跳过训练模式,跳过训练文件处理
if skip_training:
log_message("✅ Vector表管理完成,跳过训练文件处理(skip_training=True)")
return True, vector_stats
elif skip_training:
# 如果设置了skip_training但没有Vector操作,记录警告并跳过
log_message("⚠️ 设置了skip_training=True但未指定Vector操作,跳过所有处理")
return True, None
# 继续训练文件处理...
return process_successful, vector_stats
schema_workflow.py::main()
↓
SchemaWorkflowOrchestrator.execute_complete_workflow()
↓ (删除独立Vector表管理调用)
↓
_execute_step_4_training_data_load()
↓
process_training_files(backup_vector_tables=True, truncate_vector_tables=True)
↓
VectorTableManager.execute_vector_management() + 训练文件处理
API请求
↓
SimpleWorkflowExecutor.execute_complete_workflow()
↓
SchemaWorkflowOrchestrator.execute_complete_workflow()
↓ (删除独立Vector表管理调用)
↓
_execute_step_4_training_data_load()
↓
process_training_files(backup_vector_tables=True, truncate_vector_tables=True)
↓
VectorTableManager.execute_vector_management() + 训练文件处理
training_load步骤
↓
process_training_files(backup_vector_tables=True, truncate_vector_tables=True)
↓
VectorTableManager.execute_vector_management() + 训练文件处理
training_load步骤
↓
process_training_files(backup_vector_tables=True, skip_training=True)
↓
VectorTableManager.execute_vector_management()
↓ (跳过训练文件处理)
data_pipeline/schema_workflow.py
位置:execute_complete_workflow()
方法,约165-167行
删除:
# 新增:独立的Vector表管理(在训练加载之前或替代训练加载)
if self.backup_vector_tables or self.truncate_vector_tables:
await self._execute_vector_table_management()
位置:_execute_step_4_training_data_load()
方法,约454-456行
从:
# 禁用vector管理参数以避免重复执行
load_successful, _ = process_training_files(training_data_dir, self.task_id,
backup_vector_tables=False,
truncate_vector_tables=False)
改为:
# 传递Vector表管理参数到training步骤
load_successful, vector_stats = process_training_files(training_data_dir, self.task_id,
backup_vector_tables=self.backup_vector_tables,
truncate_vector_tables=self.truncate_vector_tables)
# 记录Vector表管理结果到工作流状态
if vector_stats:
self.workflow_state["artifacts"]["vector_management"] = vector_stats
更新相关方法的注释,说明Vector表管理现在统一在training步骤中执行。
data_pipeline/trainer/run_training.py
位置:process_training_files()
函数定义,约336行
从:
def process_training_files(data_path, task_id=None, backup_vector_tables=False, truncate_vector_tables=False):
改为:
def process_training_files(data_path, task_id=None, backup_vector_tables=False, truncate_vector_tables=False, skip_training=False):
位置:函数结尾,约463行
从:
return process_successful
改为:
return process_successful, vector_stats
位置:Vector表管理代码块之后
添加:
# 如果是跳过训练模式,跳过训练文件处理
if skip_training:
log_message("✅ Vector表管理完成,跳过训练文件处理(skip_training=True)")
return True, vector_stats
elif skip_training and not (backup_vector_tables or truncate_vector_tables):
# 如果设置了skip_training但没有Vector操作,记录警告并跳过
log_message("⚠️ 设置了skip_training=True但未指定Vector操作,跳过所有处理")
return True, None
位置:main()
函数中调用process_training_files的地方
从:
process_successful = process_training_files(data_path, args.task_id,
args.backup_vector_tables,
args.truncate_vector_tables)
改为:
process_successful, vector_stats = process_training_files(data_path, args.task_id,
args.backup_vector_tables,
args.truncate_vector_tables)
data_pipeline/api/simple_workflow.py
API层面不需要修改,因为参数传递已经正确实现,修改底层的 _execute_step_4_training_data_load()
即可。
用途:允许跳过训练文件处理,仅执行Vector表管理
使用场景:
API使用示例:
// 仅备份Vector表
{
"execution_mode": "step",
"step_name": "training_load",
"backup_vector_tables": true,
"skip_training": true
}
// 清空Vector表(自动包含备份)
{
"execution_mode": "step",
"step_name": "training_load",
"truncate_vector_tables": true,
"skip_training": true
}
脚本使用示例:
# 仅备份Vector表
python -m data_pipeline.trainer.run_training \
--task-id manual_20250720_130541 \
--backup-vector-tables \
--skip-training
# 清空Vector表(自动包含备份)
python -m data_pipeline.trainer.run_training \
--task-id manual_20250720_130541 \
--truncate-vector-tables \
--skip-training
重构后,所有调用路径都返回相同格式的Vector表管理统计信息:
{
"backup_performed": true,
"truncate_performed": true,
"tables_backed_up": {
"langchain_pg_collection": {
"row_count": 1234,
"file_size": "45.6 KB",
"backup_file": "langchain_pg_collection_20250720_121007.csv"
},
"langchain_pg_embedding": {
"row_count": 12345,
"file_size": "2.1 MB",
"backup_file": "langchain_pg_embedding_20250720_121007.csv"
}
},
"truncate_results": {
"langchain_pg_embedding": {
"success": true,
"rows_before": 12345,
"rows_after": 0
}
},
"duration": 12.5,
"backup_directory": "/path/to/task/vector_bak"
}
跳过training_load步骤时无法执行Vector表管理
这个约束是合理的,因为:
示例:
# 这种情况下不会执行Vector表管理(合理,因为删除了--skip-training-load参数)
python -m data_pipeline.schema_workflow \
--db-connection "..." \
--table-list tables.txt \
--business-context "系统" \
--backup-vector-tables # 会在training_load步骤中执行
⚠️ 破坏性变化:
--skip-training-load
参数,改为 --skip-training
--skip-training-load
的脚本需要更新向后兼容:
skip_training
参数默认为 False
行为变化:
# 完整模式 + Vector表管理
python -m data_pipeline.schema_workflow \
--db-connection "postgresql://..." \
--table-list tables.txt \
--business-context "测试系统" \
--backup-vector-tables \
--truncate-vector-tables
# 单步模式 + Vector表管理
python -m data_pipeline.trainer.run_training \
--task-id task_20250721_113010 \
--backup-vector-tables
# 仅备份Vector表(不清空数据)
python -m data_pipeline.trainer.run_training \
--task-id task_20250721_113010 \
--backup-vector-tables \
--skip-training
# 清空Vector表(自动包含备份)
python -m data_pipeline.trainer.run_training \
--task-id task_20250721_113010 \
--truncate-vector-tables \
--skip-training
# ❌ 错误示例:不要同时指定两个参数
# python -m data_pipeline.trainer.run_training \
# --task-id task_20250721_113010 \
# --backup-vector-tables \
# --truncate-vector-tables \
# --skip-training
# 完整模式 + 清空Vector表(包含自动备份)
curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks/task_20250721_113010/execute \
-H "Content-Type: application/json" \
-d '{
"execution_mode": "complete",
"truncate_vector_tables": true
}'
# 单步模式 + 仅备份Vector表
curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks/task_20250721_113010/execute \
-H "Content-Type: application/json" \
-d '{
"execution_mode": "step",
"step_name": "training_load",
"backup_vector_tables": true
}'
# 仅备份Vector表,跳过训练
curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks/task_20250721_113010/execute \
-H "Content-Type: application/json" \
-d '{
"execution_mode": "step",
"step_name": "training_load",
"backup_vector_tables": true,
"skip_training": true
}'
# 清空Vector表,跳过训练(自动包含备份)
curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks/task_20250721_113010/execute \
-H "Content-Type: application/json" \
-d '{
"execution_mode": "step",
"step_name": "training_load",
"truncate_vector_tables": true,
"skip_training": true
}'
验证点:
data_pipeline/schema_workflow.py
data_pipeline/trainer/run_training.py
skip_training
参数支持--skip-training-load
参数本次重构将实现Vector表管理功能的统一化,解决当前脚本模式和API模式行为不一致的问题,同时简化代码结构,提高系统的可维护性和扩展性。重构后的系统将具有更清晰的职责分离和更一致的用户体验。