#!/usr/bin/env python
"""
修复 n8n 工作流触发器问题
问题描述:
Workflow "产品库存表原始数据导入_工作流" (ID: 5oIys8sZqxqQuZ5l) has no node
to start the workflow - at least one trigger, poller or webhook node is required
解决方案:
将 Manual Trigger 替换为 Schedule Trigger,使工作流可以被激活
"""
from __future__ import annotations
import os
import sys
import uuid
from typing import Any
import requests
from loguru import logger
# 添加项目根目录到Python路径
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
sys.path.insert(0, PROJECT_ROOT)
from app.config.config import config, current_env
# 获取配置
app_config = config[current_env]
# n8n API 配置
N8N_API_URL = getattr(app_config, "N8N_API_URL", "https://n8n.citupro.com")
N8N_API_KEY = getattr(app_config, "N8N_API_KEY", "")
def get_headers() -> dict[str, str]:
"""获取请求头"""
return {
"X-N8N-API-KEY": N8N_API_KEY,
"Content-Type": "application/json",
"Accept": "application/json",
}
def get_workflow(workflow_id: str) -> dict[str, Any]:
"""获取工作流详情"""
url = f"{N8N_API_URL.rstrip('/')}/api/v1/workflows/{workflow_id}"
response = requests.get(url, headers=get_headers(), timeout=30)
response.raise_for_status()
return response.json()
def update_workflow(workflow_id: str, workflow_data: dict[str, Any]) -> dict[str, Any]:
"""更新工作流"""
url = f"{N8N_API_URL.rstrip('/')}/api/v1/workflows/{workflow_id}"
response = requests.put(url, headers=get_headers(), json=workflow_data, timeout=30)
response.raise_for_status()
return response.json()
def fix_workflow_trigger(workflow_id: str) -> bool:
"""
修复工作流触发器
将 Manual Trigger 替换为 Schedule Trigger
Args:
workflow_id: 工作流 ID
Returns:
是否修复成功
"""
logger.info(f"正在获取工作流 {workflow_id}...")
try:
# 获取当前工作流配置
workflow = get_workflow(workflow_id)
logger.info(f"工作流名称: {workflow.get('name')}")
nodes = workflow.get("nodes", [])
connections = workflow.get("connections", {})
# 查找 Manual Trigger 节点
manual_trigger_index = None
manual_trigger_name = None
for i, node in enumerate(nodes):
if node.get("type") == "n8n-nodes-base.manualTrigger":
manual_trigger_index = i
manual_trigger_name = node.get("name", "Manual Trigger")
logger.info(
f"找到 Manual Trigger 节点: {manual_trigger_name} (index: {i})"
)
break
if manual_trigger_index is None:
logger.warning("未找到 Manual Trigger 节点")
# 检查是否已经有 Schedule Trigger,如果有则更新其配置
for i, node in enumerate(nodes):
if node.get("type") == "n8n-nodes-base.scheduleTrigger":
logger.info(
"工作流已包含 Schedule Trigger,更新为每天凌晨1点执行..."
)
# 更新 Schedule Trigger 配置
nodes[i]["parameters"] = {
"rule": {
"interval": [
{
"field": "days",
"daysInterval": 1,
"triggerAtHour": 1,
"triggerAtMinute": 0,
}
]
}
}
# 更新工作流
update_data = {
"name": workflow.get("name"),
"nodes": nodes,
"connections": connections,
"settings": workflow.get("settings", {"executionOrder": "v1"}),
}
result = update_workflow(workflow_id, update_data)
logger.info("Schedule Trigger 配置已更新为每天凌晨1点执行")
logger.info(f"工作流 ID: {result.get('id')}")
return True
logger.error("工作流既没有 Manual Trigger 也没有 Schedule Trigger")
return False
# 创建新的 Schedule Trigger 节点(每天凌晨1点执行)
new_trigger_name = "Schedule Trigger"
schedule_trigger = {
"parameters": {
"rule": {
"interval": [
{
"field": "days",
"daysInterval": 1,
"triggerAtHour": 1,
"triggerAtMinute": 0,
}
]
}
},
"id": str(uuid.uuid4()),
"name": new_trigger_name,
"type": "n8n-nodes-base.scheduleTrigger",
"typeVersion": 1.2,
"position": nodes[manual_trigger_index].get("position", [250, 300]),
}
# 替换节点
nodes[manual_trigger_index] = schedule_trigger
logger.info(f"将 {manual_trigger_name} 替换为 {new_trigger_name}")
# 更新连接配置
if manual_trigger_name in connections:
connections[new_trigger_name] = connections.pop(manual_trigger_name)
logger.info(f"更新连接配置: {manual_trigger_name} -> {new_trigger_name}")
# 构建更新数据
update_data = {
"name": workflow.get("name"),
"nodes": nodes,
"connections": connections,
"settings": workflow.get("settings", {"executionOrder": "v1"}),
}
# 更新工作流
logger.info("正在更新工作流...")
result = update_workflow(workflow_id, update_data)
logger.info(f"工作流更新成功: {result.get('name')}")
logger.info(f"工作流 ID: {result.get('id')}")
logger.info(f"活跃状态: {result.get('active')}")
return True
except requests.exceptions.RequestException as e:
logger.error(f"API 请求失败: {e}")
return False
except Exception as e:
logger.exception(f"修复工作流失败: {e}")
return False
def main() -> None:
"""主函数"""
# 工作流 ID(从错误信息中获取)
workflow_id = "5oIys8sZqxqQuZ5l"
logger.info("=" * 60)
logger.info("n8n 工作流触发器修复脚本")
logger.info("=" * 60)
logger.info(f"目标工作流 ID: {workflow_id}")
logger.info(f"n8n API URL: {N8N_API_URL}")
if fix_workflow_trigger(workflow_id):
logger.info("=" * 60)
logger.info("工作流修复成功!")
logger.info("现在可以尝试激活工作流了")
logger.info("=" * 60)
sys.exit(0)
else:
logger.error("工作流修复失败")
sys.exit(1)
if __name__ == "__main__":
# 配置日志
logger.remove()
logger.add(
sys.stderr,
level="INFO",
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
)
main()