Task Manager MCP 能够读取到任务,但是没有自动生成对应的代码。
Task Manager MCP 的工作流程分为以下步骤:
MCP 只负责任务管理,不负责代码生成。代码生成需要 AI 来完成。
execute_task)mcp_task-manager_execute_task({
task_id: 7,
auto_complete: true
})
返回内容:
注意: 这个函数不会自动生成代码,只是返回指令。
AI 需要:
update_task_status)mcp_task-manager_update_task_status({
task_id: 7,
status: "completed",
code_name: "import_dept_data.py",
code_path: "app/core/data_flow"
})
这一步是必须的,否则任务会一直停留在 "processing" 状态。
// 获取任务详情
mcp_task-manager_get_task_by_id({ task_id: 7 })
// 返回
{
"task_id": 7,
"task_name": "导入数据资源的科室对照表",
"status": "pending",
"task_description": "# Task: 导入数据资源的科室对照表\n..."
}
// 执行任务
mcp_task-manager_execute_task({
task_id: 7,
auto_complete: true
})
// 返回执行指令
{
"message": "请生成代码并更新任务状态...",
"task_description": "...",
"steps": [
"分析需求",
"生成代码",
"更新任务状态"
]
}
注意: 此时任务状态变为 "processing"。
AI 执行以下操作:
# 1. 创建代码文件
write("app/core/data_flow/import_dept_data.py", code_content)
# 2. 验证代码
python -m py_compile app/core/data_flow/import_dept_data.py
// AI 调用 MCP 工具更新状态
mcp_task-manager_update_task_status({
task_id: 7,
status: "completed",
code_name: "import_dept_data.py",
code_path: "app/core/data_flow"
})
// 返回
{
"task_id": 7,
"status": "completed",
"update_time": "2025-11-28T10:33:37.833Z"
}
pending (待处理)
↓
execute_task 调用
↓
processing (处理中)
↓
AI 生成代码
↓
update_task_status 调用
↓
completed (已完成)
原因: AI 没有调用 update_task_status 更新状态。
解决方案: 在生成代码后,必须调用 update_task_status。
不能。只有 "pending" 状态的任务才能执行。
解决方案:
// 先更新为 pending
mcp_task-manager_update_task_status({
task_id: 7,
status: "pending"
})
// 再执行
mcp_task-manager_execute_task({ task_id: 7 })
不会。MCP 只负责:
代码生成由 AI 负责。
// 查看 pending 状态的任务
mcp_task-manager_get_pending_tasks()
// 查看所有任务
mcp_task-manager_get_all_tasks({ limit: 20 })
| 工具 | 功能 | 参数 |
|---|---|---|
get_pending_tasks |
获取待处理任务列表 | 无 |
get_task_by_id |
根据ID获取任务详情 | task_id |
execute_task |
执行任务(返回指令) | task_id, auto_complete |
update_task_status |
更新任务状态 | task_id, status, code_name, code_path |
process_all_tasks |
批量处理所有待处理任务 | auto_poll |
create_task |
创建新任务 | task_name, task_description, create_by |
get_all_tasks |
获取所有任务(调试用) | limit |
// 1. 获取所有待处理任务
const pendingTasks = mcp_task-manager_get_pending_tasks()
// 2. 逐个执行
for (const task of pendingTasks) {
// 执行任务(获取指令)
const instructions = mcp_task-manager_execute_task({
task_id: task.task_id,
auto_complete: true
})
// AI 生成代码
// ... (AI 操作)
// 更新状态
mcp_task-manager_update_task_status({
task_id: task.task_id,
status: "completed",
code_name: "generated_file.py",
code_path: "app/core/data_flow"
})
}
try {
// 执行任务
const result = mcp_task-manager_execute_task({ task_id: 7 })
// AI 生成代码
// ...
// 更新为完成
mcp_task-manager_update_task_status({
task_id: 7,
status: "completed"
})
} catch (error) {
// 更新为失败
mcp_task-manager_update_task_status({
task_id: 7,
status: "failed"
})
}
// 启用自动轮询(每5分钟检查一次)
mcp_task-manager_process_all_tasks({
auto_poll: true
})
当创建 DataFlow 时,系统会自动生成任务描述:
# Task: {任务名称}
## Data Source
- Type: postgresql
- Host: 10.52.31.104
- Port: 5432
- Database: hospital_db
## Source Tables (DDL)
```sql
CREATE TABLE source_table (...);
CREATE TABLE target_table (
...,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
{rule 字段的内容}
Generate n8n workflow
## 生成的代码结构
### 重要要求:目标表检测和自动创建
**所有数据流脚本必须包含以下功能:**
1. **目标表检测功能**:在数据加载之前,必须检查目标表是否存在
2. **自动创建表**:如果目标表不存在,脚本必须根据任务描述中的 DDL 自动创建目标表
3. **表结构匹配**:创建的表结构必须与任务描述中的 DDL 完全一致
4. **Schema 支持**:必须正确处理 schema(如 `dags`、`public` 等)
### 标准模板(包含表检测功能)
```python
"""
{任务名称}
功能:{任务描述}
模式:{update_mode}
作者:cursor
创建时间:{当前日期}
"""
import os
import sys
from datetime import datetime
from typing import Any
import pandas as pd
import psycopg2
from loguru import logger
# 添加项目根目录到Python路径
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
sys.path.insert(0, PROJECT_ROOT)
from app.config.config import config, current_env
# 获取配置
app_config = config[current_env]
def ensure_target_table_exists(conn: psycopg2.extensions.connection) -> None:
"""
确保目标表存在,如果不存在则创建
重要:此函数必须根据任务描述中的目标表 DDL 来实现
Args:
conn: 目标数据库连接
"""
cursor = conn.cursor()
target_table = "target_table_name" # 根据任务描述设置
target_schema = "public" # 根据任务描述设置(如 dags, public 等)
try:
# 检查表是否存在
cursor.execute("""
SELECT EXISTS(
SELECT 1 FROM information_schema.tables
WHERE table_schema = %s
AND table_name = %s
)
""", (target_schema, target_table))
result = cursor.fetchone()
exists = result[0] if result else False
if not exists:
logger.info(f"目标表不存在,正在创建 {target_schema}.{target_table}...")
# 根据任务描述中的 DDL 创建表
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
-- 根据任务描述中的 DDL 添加所有列
id SERIAL PRIMARY KEY,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
cursor.execute(create_table_sql)
# 添加表注释和列注释
# 根据任务描述中的 COMMENT 添加
conn.commit()
logger.info(f"目标表 {target_schema}.{target_table} 创建成功")
else:
logger.info(f"目标表 {target_schema}.{target_table} 已存在")
except Exception as e:
conn.rollback()
logger.error(f"创建目标表失败: {e}")
raise
finally:
cursor.close()
def main() -> dict[str, Any]:
"""
主函数:执行ETL流程
重要:必须按以下顺序执行:
1. 建立数据库连接
2. 确保目标表存在(调用 ensure_target_table_exists)
3. 提取源数据
4. 数据转换
5. 加载到目标表
"""
# ... 实现代码 ...
# 步骤2: 确保目标表存在(必须在数据加载前执行)
logger.info("[Step 2/5] 检查/创建目标表...")
ensure_target_table_exists(target_conn)
# ... 其他步骤 ...
完整的脚本模板请参考:docs/script_template_with_table_check.py
在 app/core/data_flow/dataflows.py 的 create_dataflow 函数中:
# 保存到 task_list 表
task_insert_sql = text("""
INSERT INTO public.task_list
(task_name, task_description, status, code_name, code_path, create_by, create_time)
VALUES
(:task_name, :task_description, :status, :code_name, :code_path, :create_by, :create_time)
""")
task_params = {
'task_name': script_name,
'task_description': task_description_md, # Markdown 格式的任务描述
'status': 'pending',
'code_name': script_name,
'code_path': 'app/core/data_flow',
'create_by': 'cursor',
'create_time': current_time
}
db.session.execute(task_insert_sql, task_params)
execute_task 不会自动生成代码,只返回指令update_task_status 更新状态// 1. 查看任务
mcp_task-manager_get_task_by_id({ task_id: 7 })
// 状态: pending
// 2. 执行任务
mcp_task-manager_execute_task({ task_id: 7, auto_complete: true })
// 状态: processing
// 返回: 任务描述 + 执行指令
// 3. AI 生成代码
write("app/core/data_flow/import_dept_data.py", code)
// 文件已创建
// 4. 更新状态
mcp_task-manager_update_task_status({
task_id: 7,
status: "completed",
code_name: "import_dept_data.py",
code_path: "app/core/data_flow"
})
// 状态: completed ✅
app/core/data_flow/dataflows.py - DataFlow 创建时写入 task_listdocs/DataFlow_task_list优化说明.md - 任务描述生成逻辑app/core/data_flow/import_dept_data.py - 任务 7 生成的代码