当调用 Data Pipeline API 的完整工作流执行接口时,如果任务目录下已存在文件,需要自动备份这些文件,避免被新生成的文件覆盖。
POST /api/v0/data_pipeline/tasks/{task_id}/execute
{"execution_mode": "complete"}
./data_pipeline/training_data/{task_id}/
下是否存在文件table_list.txt
文件(用户上传的输入文件,需要保留)file_bak_{YYYYMMDD_HHMMSS}
file_bak_20250121_094400
table_list.txt
之外的所有文件移动到备份目录*.ddl
、*.md
、*.json
、*.log
、*.txt
等包括的文件:
*.ddl
*_detail.md
qs_*.json
、qs_*.json.backup
metadata.txt
、filename_mapping.txt
sql_validation_*.log
、sql_validation_*.json
training_load_*.log
task_config.json
、task_result.json
data_pipeline.log
排除的文件:
table_list.txt
(严格按文件名匹配)排除的目录:
vector_bak/
等)主要文件:data_pipeline/api/simple_workflow.py
修改方法:SimpleWorkflowExecutor.execute_complete_workflow()
1. API调用 POST /api/v0/data_pipeline/tasks/{task_id}/execute
2. execution_mode == "complete"
3. 进入 SimpleWorkflowExecutor.execute_complete_workflow()
4. _ensure_task_directory() - 确保目录存在
5. _backup_existing_files_if_needed() - 🆕 检查并备份文件
├── 扫描 task_dir 下的所有文件(不包括子目录)
├── 排除 table_list.txt
├── 如果有文件需要备份:
│ ├── 创建 file_bak_YYYYMMDD_HHMMSS 目录
│ ├── 移动文件到备份目录(保持原文件名)
│ └── 写入 backup_info.json 记录
└── 记录备份日志
6. 继续正常的工作流执行...
async def execute_complete_workflow(self) -> Dict[str, Any]:
"""执行完整工作流"""
try:
# 确保任务目录存在
if not self._ensure_task_directory():
raise Exception("无法创建任务目录")
# 🆕 新增:备份现有文件(仅在complete模式下)
self._backup_existing_files_if_needed()
# 开始任务
self.task_manager.update_task_status(self.task_id, 'in_progress')
# ... 其余代码保持不变
def _backup_existing_files_if_needed(self):
"""如果需要,备份现有文件(仅备份文件,不包括子目录)"""
# 1. 扫描任务目录中的文件
# 2. 排除 table_list.txt 和子目录
# 3. 如果有文件需要备份,创建备份目录
# 4. 移动文件到备份目录
# 5. 生成备份记录文件
# 6. 记录日志
在备份目录中创建 backup_info.json
文件:
{
"backup_time": "2025-01-21T09:44:00.123456",
"backup_directory": "file_bak_20250121_094400",
"moved_files": ["old_file.ddl", "old_data.json"],
"failed_files": [],
"task_id": "task_20250721_083557"
}
如果备份目录名已存在,添加序号后缀:
file_bak_20250121_094400
file_bak_20250121_094400_1
file_bak_20250121_094400_2
task_20250721_083557/
├── table_list.txt # 用户上传的表清单
├── old_file.ddl # 之前生成的DDL文件
├── old_data.json # 之前生成的数据文件
├── some_log.log # 之前的日志文件
└── some_subdir/ # 子目录不处理
└── file.txt
task_20250721_083557/
├── table_list.txt # 保留在原位置
├── file_bak_20250121_094400/ # 新建备份目录
│ ├── old_file.ddl # 移动过来,文件名不变
│ ├── old_data.json # 移动过来,文件名不变
│ ├── some_log.log # 移动过来,文件名不变
│ └── backup_info.json # 备份记录
├── some_subdir/ # 子目录不动
│ └── file.txt
└── [新生成的文件...] # 工作流新生成的文件
POST /api/v0/data_pipeline/tasks/{task_id}/execute
+ {"execution_mode": "complete"}
{"execution_mode": "step"}
不会触发备份python -m data_pipeline.schema_workflow
不受影响table_list.txt
:不创建备份,直接继续table_list.txt
,不应创建备份table_list.txt
,只移动文件