vector_table_management_unification_refactor.md 20 KB

Vector表管理功能统一化重构方案

概述

当前系统中Vector表管理功能存在两套不同的实现路径,导致脚本模式和API模式的行为不一致。本文档提出统一化重构方案,将Vector表管理功能统一收敛到 training_load 步骤中,实现脚本模式和API模式的完全一致。

最终设计决定

基于用户反馈,以下为最终确定的参数设计:

参数定义

  • --backup-vector-tables:备份vector表数据(默认值:false
  • --truncate-vector-tables:清空vector表数据,自动启用备份(默认值:false
  • --skip-training:跳过训练文件处理,仅执行Vector表管理(默认值:false

参数语义

  • 参数依赖--truncate-vector-tables=true 自动设置 --backup-vector-tables=true(单向依赖,backup 不会触发 truncate
  • 冲突处理:当 --skip-training=true 但未指定任何Vector操作时,记录警告并跳过所有处理(宽松模式)
  • 参数移除:删除现有的 --skip-training-load 参数,统一使用 --skip-training

参数组合行为表

backup_vector_tables truncate_vector_tables skip_training 实际行为 推荐
false false false 正常训练文件处理,无Vector操作
true false false 备份Vector表 + 训练文件处理
false true false 备份Vector表 + 清空Vector表 + 训练文件处理¹
true true false 备份Vector表 + 清空Vector表 + 训练文件处理² ⚠️
true false true 仅备份Vector表,跳过训练文件处理
false true true 仅备份+清空Vector表,跳过训练文件处理¹
true true true 仅备份+清空Vector表,跳过训练文件处理² ⚠️
false false true ⚠️ 警告:未指定Vector操作,跳过所有处理

注释

  • ¹ truncate 自动启用 backup,这是预期行为
  • ² 同时指定两个参数是冗余的,建议只使用 truncate_vector_tables

推荐用法

🎯 最佳实践

# ✅ 仅备份Vector表数据
--backup-vector-tables

# ✅ 清空Vector表数据(自动包含备份)
--truncate-vector-tables

# ❌ 避免冗余:不要同时使用
--backup-vector-tables --truncate-vector-tables  # 冗余

# ❌ 避免无意义的组合
--skip-training  # 没有Vector操作,什么都不做

当前现状分析

1. 脚本模式现状 ✅(已完成)

完整模式

python -m data_pipeline.schema_workflow --backup-vector-tables --truncate-vector-tables

当前调用链

schema_workflow.py::main()
↓
SchemaWorkflowOrchestrator(backup_vector_tables=True, truncate_vector_tables=True)
↓
execute_complete_workflow()
├─ _execute_vector_table_management()  # 独立执行Vector表管理
└─ _execute_step_4_training_data_load()
   └─ process_training_files(backup_vector_tables=False, truncate_vector_tables=False)  # 禁用以避免重复

单步模式

python -m data_pipeline.trainer.run_training --backup-vector-tables --truncate-vector-tables

当前调用链

run_training.py::main()
↓
process_training_files(backup_vector_tables=True, truncate_vector_tables=True)
↓
VectorTableManager.execute_vector_management()  # 在函数内部执行

2. API模式现状 ⚠️(部分完成)

完整模式

{
    "execution_mode": "complete",
    "backup_vector_tables": true,
    "truncate_vector_tables": true
}

当前调用链

unified_api.py::execute_data_pipeline_task()
↓
SimpleWorkflowExecutor(backup_vector_tables=True, truncate_vector_tables=True)
↓
execute_complete_workflow()
↓
SchemaWorkflowOrchestrator.execute_complete_workflow()
├─ _execute_vector_table_management()  # 独立执行Vector表管理
└─ _execute_step_4_training_data_load()
   └─ process_training_files(backup_vector_tables=False, truncate_vector_tables=False)  # 禁用以避免重复

单步模式 ❌(有问题)

{
    "execution_mode": "step",
    "step_name": "training_load",
    "backup_vector_tables": true,
    "truncate_vector_tables": true
}

当前调用链(有问题)

unified_api.py::execute_data_pipeline_task()
↓
SimpleWorkflowExecutor(backup_vector_tables=True, truncate_vector_tables=True)
↓
execute_single_step("training_load")
↓
SchemaWorkflowOrchestrator._execute_step_4_training_data_load()
↓
process_training_files(backup_vector_tables=False, truncate_vector_tables=False)  # ❌ 硬编码False

问题:API单步模式中,用户传递的Vector表管理参数被硬编码的 False 覆盖,导致功能失效。

3. 当前实现的问题

  1. 双重实现路径:完整模式使用独立的 _execute_vector_table_management(),单步模式使用 process_training_files() 内部实现
  2. 行为不一致:脚本模式和API模式的Vector表管理执行时机不同
  3. 代码复杂:需要维护避免重复执行的逻辑
  4. API单步模式缺陷:硬编码参数导致功能失效

统一化重构方案

设计原则

  1. 统一收敛:将Vector表管理功能统一收敛到 training_load 步骤中
  2. 逻辑一致:脚本模式和API模式使用完全相同的调用路径
  3. 职责清晰:Vector表操作作为训练准备工作,归属于训练步骤
  4. 合理约束:跳过训练步骤时不执行Vector表管理(逻辑合理)

核心修改

1. 删除独立的Vector表管理调用

修改文件data_pipeline/schema_workflow.py

删除代码

# 删除这段独立的Vector表管理调用
if self.backup_vector_tables or self.truncate_vector_tables:
    await self._execute_vector_table_management()

2. 修改training_load步骤参数传递

修改文件data_pipeline/schema_workflow.py

当前代码

# 禁用vector管理参数以避免重复执行
load_successful, _ = process_training_files(training_data_dir, self.task_id, 
                                           backup_vector_tables=False, 
                                           truncate_vector_tables=False)

修改为

# 传递Vector表管理参数到training步骤
load_successful, vector_stats = process_training_files(training_data_dir, self.task_id, 
                                                       backup_vector_tables=self.backup_vector_tables, 
                                                       truncate_vector_tables=self.truncate_vector_tables)

# 记录Vector表管理结果到工作流状态
if vector_stats:
    self.workflow_state["artifacts"]["vector_management"] = vector_stats

3. 增强process_training_files函数

修改文件data_pipeline/trainer/run_training.py

新增参数支持

def process_training_files(data_path, task_id=None, 
                          backup_vector_tables=False, 
                          truncate_vector_tables=False,
                          skip_training=False):  # 新增参数
    """
    处理指定路径下的所有训练文件
    
    Args:
        data_path (str): 训练数据目录路径
        task_id (str): 任务ID,用于日志记录
        backup_vector_tables (bool): 是否备份vector表数据
        truncate_vector_tables (bool): 是否清空vector表数据
        skip_training (bool): 是否跳过训练文件处理,仅执行Vector表管理
    
    Returns:
        tuple: (处理成功标志, Vector表管理统计信息)
    """
    
    # Vector表管理(前置步骤)
    vector_stats = None
    if backup_vector_tables or truncate_vector_tables:
        # 执行Vector表管理...
        vector_stats = asyncio.run(vector_manager.execute_vector_management(...))
        
        # 如果是跳过训练模式,跳过训练文件处理
        if skip_training:
            log_message("✅ Vector表管理完成,跳过训练文件处理(skip_training=True)")
            return True, vector_stats
    elif skip_training:
        # 如果设置了skip_training但没有Vector操作,记录警告并跳过
        log_message("⚠️ 设置了skip_training=True但未指定Vector操作,跳过所有处理")
        return True, None
    
    # 继续训练文件处理...
    return process_successful, vector_stats

重构后的调用链条

1. 脚本完整模式

schema_workflow.py::main()
↓
SchemaWorkflowOrchestrator.execute_complete_workflow()
↓ (删除独立Vector表管理调用)
↓
_execute_step_4_training_data_load()
↓
process_training_files(backup_vector_tables=True, truncate_vector_tables=True)
↓
VectorTableManager.execute_vector_management() + 训练文件处理

2. API完整模式

API请求
↓
SimpleWorkflowExecutor.execute_complete_workflow()
↓
SchemaWorkflowOrchestrator.execute_complete_workflow()
↓ (删除独立Vector表管理调用)
↓
_execute_step_4_training_data_load()
↓
process_training_files(backup_vector_tables=True, truncate_vector_tables=True)
↓
VectorTableManager.execute_vector_management() + 训练文件处理

3. 单步模式(脚本/API完全一致)

training_load步骤
↓
process_training_files(backup_vector_tables=True, truncate_vector_tables=True)
↓
VectorTableManager.execute_vector_management() + 训练文件处理

4. 单独备份模式(新功能)

training_load步骤
↓
process_training_files(backup_vector_tables=True, skip_training=True)
↓
VectorTableManager.execute_vector_management()
↓ (跳过训练文件处理)

详细修改清单

文件1:data_pipeline/schema_workflow.py

修改1:删除独立Vector表管理调用

位置execute_complete_workflow() 方法,约165-167行

删除

# 新增:独立的Vector表管理(在训练加载之前或替代训练加载)
if self.backup_vector_tables or self.truncate_vector_tables:
    await self._execute_vector_table_management()

修改2:修改training_load步骤参数传递

位置_execute_step_4_training_data_load() 方法,约454-456行

# 禁用vector管理参数以避免重复执行
load_successful, _ = process_training_files(training_data_dir, self.task_id, 
                                           backup_vector_tables=False, 
                                           truncate_vector_tables=False)

改为

# 传递Vector表管理参数到training步骤
load_successful, vector_stats = process_training_files(training_data_dir, self.task_id, 
                                                       backup_vector_tables=self.backup_vector_tables, 
                                                       truncate_vector_tables=self.truncate_vector_tables)

# 记录Vector表管理结果到工作流状态
if vector_stats:
    self.workflow_state["artifacts"]["vector_management"] = vector_stats

修改3:更新注释和文档

更新相关方法的注释,说明Vector表管理现在统一在training步骤中执行。

文件2:data_pipeline/trainer/run_training.py

修改1:函数签名扩展

位置process_training_files() 函数定义,约336行

def process_training_files(data_path, task_id=None, backup_vector_tables=False, truncate_vector_tables=False):

改为

def process_training_files(data_path, task_id=None, backup_vector_tables=False, truncate_vector_tables=False, skip_training=False):

修改2:返回值修改

位置:函数结尾,约463行

return process_successful

改为

return process_successful, vector_stats

修改3:支持skip_training参数

位置:Vector表管理代码块之后

添加

# 如果是跳过训练模式,跳过训练文件处理
if skip_training:
    log_message("✅ Vector表管理完成,跳过训练文件处理(skip_training=True)")
    return True, vector_stats
elif skip_training and not (backup_vector_tables or truncate_vector_tables):
    # 如果设置了skip_training但没有Vector操作,记录警告并跳过
    log_message("⚠️ 设置了skip_training=True但未指定Vector操作,跳过所有处理")
    return True, None

修改4:更新main函数调用

位置main() 函数中调用process_training_files的地方

process_successful = process_training_files(data_path, args.task_id, 
                                           args.backup_vector_tables, 
                                           args.truncate_vector_tables)

改为

process_successful, vector_stats = process_training_files(data_path, args.task_id, 
                                                         args.backup_vector_tables, 
                                                         args.truncate_vector_tables)

文件3:data_pipeline/api/simple_workflow.py

无需修改

API层面不需要修改,因为参数传递已经正确实现,修改底层的 _execute_step_4_training_data_load() 即可。

新增功能

1. skip_training参数

用途:允许跳过训练文件处理,仅执行Vector表管理

使用场景

  • 单独备份Vector表数据
  • 单独清空Vector表数据
  • 在不需要重新训练的情况下管理Vector表

API使用示例

// 仅备份Vector表
{
    "execution_mode": "step",
    "step_name": "training_load", 
    "backup_vector_tables": true,
    "skip_training": true
}

// 清空Vector表(自动包含备份)
{
    "execution_mode": "step",
    "step_name": "training_load", 
    "truncate_vector_tables": true,
    "skip_training": true
}

脚本使用示例

# 仅备份Vector表
python -m data_pipeline.trainer.run_training \
  --task-id manual_20250720_130541 \
  --backup-vector-tables \
  --skip-training

# 清空Vector表(自动包含备份)
python -m data_pipeline.trainer.run_training \
  --task-id manual_20250720_130541 \
  --truncate-vector-tables \
  --skip-training

2. 统一的返回值格式

重构后,所有调用路径都返回相同格式的Vector表管理统计信息:

{
    "backup_performed": true,
    "truncate_performed": true,
    "tables_backed_up": {
        "langchain_pg_collection": {
            "row_count": 1234,
            "file_size": "45.6 KB",
            "backup_file": "langchain_pg_collection_20250720_121007.csv"
        },
        "langchain_pg_embedding": {
            "row_count": 12345,
            "file_size": "2.1 MB",
            "backup_file": "langchain_pg_embedding_20250720_121007.csv"
        }
    },
    "truncate_results": {
        "langchain_pg_embedding": {
            "success": true,
            "rows_before": 12345,
            "rows_after": 0
        }
    },
    "duration": 12.5,
    "backup_directory": "/path/to/task/vector_bak"
}

约束和限制

1. 合理约束

跳过training_load步骤时无法执行Vector表管理

这个约束是合理的,因为:

  • Vector表操作本质上是训练准备工作
  • 不训练就不需要备份和清空Vector表
  • 保持功能的逻辑一致性

示例

# 这种情况下不会执行Vector表管理(合理,因为删除了--skip-training-load参数)
python -m data_pipeline.schema_workflow \
  --db-connection "..." \
  --table-list tables.txt \
  --business-context "系统" \
  --backup-vector-tables  # 会在training_load步骤中执行

2. 兼容性变化

⚠️ 破坏性变化

  • 删除 --skip-training-load 参数,改为 --skip-training
  • 原有使用 --skip-training-load 的脚本需要更新

向后兼容

  • 现有的API调用不受影响(除了新增参数支持)
  • 新增的 skip_training 参数默认为 False

行为变化

  • 完整模式中Vector表管理的执行时机变化(从独立执行改为在training步骤中执行)
  • 对用户来说功能和结果完全一致

测试验证

1. 功能测试

脚本模式测试

# 完整模式 + Vector表管理
python -m data_pipeline.schema_workflow \
  --db-connection "postgresql://..." \
  --table-list tables.txt \
  --business-context "测试系统" \
  --backup-vector-tables \
  --truncate-vector-tables

# 单步模式 + Vector表管理
python -m data_pipeline.trainer.run_training \
  --task-id task_20250721_113010 \
  --backup-vector-tables

# 仅备份Vector表(不清空数据)
python -m data_pipeline.trainer.run_training \
  --task-id task_20250721_113010 \
  --backup-vector-tables \
  --skip-training

# 清空Vector表(自动包含备份)
python -m data_pipeline.trainer.run_training \
  --task-id task_20250721_113010 \
  --truncate-vector-tables \
  --skip-training

# ❌ 错误示例:不要同时指定两个参数
# python -m data_pipeline.trainer.run_training \
#   --task-id task_20250721_113010 \
#   --backup-vector-tables \
#   --truncate-vector-tables \
#   --skip-training

API模式测试

# 完整模式 + 清空Vector表(包含自动备份)
curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks/task_20250721_113010/execute \
  -H "Content-Type: application/json" \
  -d '{
    "execution_mode": "complete",
    "truncate_vector_tables": true
  }'

# 单步模式 + 仅备份Vector表
curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks/task_20250721_113010/execute \
  -H "Content-Type: application/json" \
  -d '{
    "execution_mode": "step",
    "step_name": "training_load",
    "backup_vector_tables": true
  }'

# 仅备份Vector表,跳过训练
curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks/task_20250721_113010/execute \
  -H "Content-Type: application/json" \
  -d '{
    "execution_mode": "step",
    "step_name": "training_load",
    "backup_vector_tables": true,
    "skip_training": true
  }'

# 清空Vector表,跳过训练(自动包含备份)
curl -X POST http://localhost:8084/api/v0/data_pipeline/tasks/task_20250721_113010/execute \
  -H "Content-Type: application/json" \
  -d '{
    "execution_mode": "step",
    "step_name": "training_load",
    "truncate_vector_tables": true,
    "skip_training": true
  }'

2. 回归测试

验证点

  • 现有功能不受影响
  • Vector表管理功能正常工作
  • 备份文件位置和格式正确
  • 日志记录完整
  • 错误处理正常

实施计划

阶段1:核心重构

  1. 修改 data_pipeline/schema_workflow.py
  2. 修改 data_pipeline/trainer/run_training.py
  3. 基础功能测试

阶段2:功能增强

  1. 添加 skip_training 参数支持
  2. 删除 --skip-training-load 参数
  3. 完善错误处理和日志记录
  4. 更新相关文档

阶段3:全面测试

  1. 脚本模式测试
  2. API模式测试
  3. 边界条件测试
  4. 回归测试

阶段4:文档更新

  1. 更新用户文档
  2. 更新API文档
  3. 更新开发文档

预期效果

1. 代码质量提升

  • 消除重复实现
  • 简化调用逻辑
  • 提高代码可维护性

2. 用户体验改善

  • 脚本模式和API模式行为一致
  • 功能逻辑更加清晰
  • 支持更灵活的使用场景

3. 系统架构优化

  • 职责分离更清晰
  • 模块间耦合度降低
  • 扩展性更好

风险评估

1. 低风险

  • 主要是内部实现调整
  • 对外接口保持兼容
  • 核心功能逻辑不变

2. 风险控制

  • 分阶段实施
  • 充分测试验证
  • 保留回滚机制

3. 应急预案

  • 保留原有代码备份
  • 准备快速回滚方案
  • 监控重要指标

总结

本次重构将实现Vector表管理功能的统一化,解决当前脚本模式和API模式行为不一致的问题,同时简化代码结构,提高系统的可维护性和扩展性。重构后的系统将具有更清晰的职责分离和更一致的用户体验。