本文档描述了Data Pipeline任务删除功能的详细设计,包括目录删除、数据库记录管理、批量操作等功能。该API主要用于删除任务对应的文件目录以节省存储空间,同时提供灵活的数据库记录管理选项。
data_pipeline/training_data/{task_id}/
整个目录及所有文件在 data_pipeline_tasks
表中添加以下字段:
ALTER TABLE data_pipeline_tasks ADD COLUMN directory_exists BOOLEAN DEFAULT TRUE;
ALTER TABLE data_pipeline_tasks ADD COLUMN updated_at TIMESTAMP NULL;
directory_exists
: 标记任务目录是否存在
TRUE
: 目录存在(默认值)FALSE
: 目录已被删除updated_at
: 记录最后更新时间
NULL
: 从未更新过(默认值)TIMESTAMP
: 最后更新的时间戳data_pipeline_tasks
- 任务主记录data_pipeline_task_steps
- 任务步骤记录DELETE /api/v0/data_pipeline/tasks/{task_id}
task_id
(string, required): 要删除的任务ID{
"confirm": true, // 必需,确认删除操作
"delete_database_records": false, // 可选,是否删除数据库记录,默认false
"force": false // 可选,强制删除(跳过状态检查),默认false
}
成功响应 (200):
{
"success": true,
"code": 200,
"message": "任务目录删除成功",
"data": {
"task_id": "task_20250702_173932",
"directory_deleted": true,
"database_records_deleted": false,
"deleted_files_count": 15,
"deleted_size": "2.5 MB",
"deleted_at": "2025-07-02T21:45:30"
}
}
DELETE /api/v0/data_pipeline/tasks
{
"task_ids": ["task_20250702_173932", "task_20250702_174521"], // 必需,任务ID列表
"confirm": true, // 必需,确认删除操作
"delete_database_records": false, // 可选,是否删除数据库记录,默认false
"force": false, // 可选,强制删除(跳过状态检查),默认false
"continue_on_error": true // 可选,遇到错误是否继续删除其他任务,默认true
}
成功响应 (200):
{
"success": true,
"code": 200,
"message": "批量删除完成",
"data": {
"deleted_tasks": [
{
"task_id": "task_20250702_173932",
"directory_deleted": true,
"database_records_deleted": false,
"deleted_files_count": 15,
"deleted_size": "2.5 MB"
},
{
"task_id": "task_20250702_174521",
"directory_deleted": true,
"database_records_deleted": false,
"deleted_files_count": 8,
"deleted_size": "1.2 MB"
}
],
"failed_tasks": [],
"summary": {
"total_requested": 2,
"successfully_deleted": 2,
"failed": 0,
"total_size_freed": "3.7 MB"
},
"deleted_at": "2025-07-02T21:45:30"
}
}
部分失败响应 (200):
{
"success": true,
"code": 200,
"message": "批量删除部分完成",
"data": {
"deleted_tasks": [
{
"task_id": "task_20250702_173932",
"directory_deleted": true,
"database_records_deleted": false,
"deleted_files_count": 15,
"deleted_size": "2.5 MB"
}
],
"failed_tasks": [
{
"task_id": "task_20250702_174521",
"error": "任务正在运行中,无法删除",
"error_code": "TASK_RUNNING"
}
],
"summary": {
"total_requested": 2,
"successfully_deleted": 1,
"failed": 1,
"total_size_freed": "2.5 MB"
},
"deleted_at": "2025-07-02T21:45:30"
}
}
状态码 | 错误类型 | 说明 |
---|---|---|
400 | Bad Request | 参数错误、缺少必需参数 |
404 | Not Found | 任务不存在 |
409 | Conflict | 任务正在运行,无法删除 |
403 | Forbidden | 权限不足 |
500 | Internal Server Error | 服务器内部错误、删除操作失败 |
任务不存在 (404):
{
"success": false,
"code": 404,
"message": "任务不存在: task_invalid_id"
}
任务正在运行 (409):
{
"success": false,
"code": 409,
"message": "任务正在运行中,无法删除",
"data": {
"task_id": "task_20250702_173932",
"current_status": "running"
}
}
缺少确认参数 (400):
{
"success": false,
"code": 400,
"message": "缺少必需参数: confirm",
"missing_params": ["confirm"]
}
shutil.rmtree
删除整个目录UPDATE data_pipeline_tasks
SET directory_exists = FALSE,
updated_at = CURRENT_TIMESTAMP
WHERE task_id = %s
-- 删除任务步骤记录
DELETE FROM data_pipeline_task_steps WHERE task_id = %s;
-- 删除任务主记录
DELETE FROM data_pipeline_tasks WHERE task_id = %s;
pathlib.Path
处理路径shutil.rmtree
递归删除目录import shutil
from pathlib import Path
def delete_task_directory(task_id):
try:
task_dir = Path("data_pipeline/training_data") / task_id
if task_dir.exists():
# 获取删除前的统计信息
file_count, total_size = get_directory_stats(task_dir)
# 删除目录
shutil.rmtree(task_dir)
return {
"deleted": True,
"file_count": file_count,
"size": total_size
}
else:
return {
"deleted": False,
"error": "目录不存在"
}
except PermissionError:
return {
"deleted": False,
"error": "权限不足"
}
except Exception as e:
return {
"deleted": False,
"error": str(e)
}
在 GET /api/v0/data_pipeline/tasks
响应中添加目录状态信息:
{
"tasks": [
{
"task_id": "task_20250702_173932",
"task_name": "测试任务",
"status": "completed",
"directory_exists": true, // 新增字段
"updated_at": null, // 新增字段
"created_at": "2025-07-02T17:39:32"
},
{
"task_id": "task_20250702_174521",
"task_name": "已删除目录的任务",
"status": "completed",
"directory_exists": false, // 目录已删除
"updated_at": "2025-07-02T21:45:30", // 删除时间
"created_at": "2025-07-02T17:45:21"
}
]
}
# 删除任务目录,保留数据库记录
curl -X DELETE \
http://localhost:8084/api/v0/data_pipeline/tasks/task_20250702_173932 \
-H "Content-Type: application/json" \
-d '{
"confirm": true,
"delete_database_records": false
}'
# 删除任务目录和数据库记录
curl -X DELETE \
http://localhost:8084/api/v0/data_pipeline/tasks/task_20250702_173932 \
-H "Content-Type: application/json" \
-d '{
"confirm": true,
"delete_database_records": true
}'
# 批量删除多个任务目录
curl -X DELETE \
http://localhost:8084/api/v0/data_pipeline/tasks \
-H "Content-Type: application/json" \
-d '{
"task_ids": ["task_20250702_173932", "task_20250702_174521"],
"confirm": true,
"delete_database_records": false,
"continue_on_error": true
}'
import requests
def delete_task(task_id, delete_db_records=False):
"""删除单个任务"""
url = f"http://localhost:8084/api/v0/data_pipeline/tasks/{task_id}"
payload = {
"confirm": True,
"delete_database_records": delete_db_records
}
response = requests.delete(url, json=payload)
return response.json()
def delete_tasks_batch(task_ids, delete_db_records=False):
"""批量删除任务"""
url = "http://localhost:8084/api/v0/data_pipeline/tasks"
payload = {
"task_ids": task_ids,
"confirm": True,
"delete_database_records": delete_db_records,
"continue_on_error": True
}
response = requests.delete(url, json=payload)
return response.json()
# 使用示例
result = delete_task("task_20250702_173932")
print(result)
batch_result = delete_tasks_batch([
"task_20250702_173932",
"task_20250702_174521"
])
print(batch_result)
记录以下信息:
Data Pipeline任务删除API提供了灵活、安全的任务目录管理功能。通过数据库状态跟踪和可选的记录删除,平衡了存储空间节省和历史数据保留的需求。批量操作支持提高了管理效率,而完善的安全机制确保了操作的可靠性。