#!/usr/bin/env python """ 修复 n8n 工作流触发器问题 问题描述: Workflow "产品库存表原始数据导入_工作流" (ID: 5oIys8sZqxqQuZ5l) has no node to start the workflow - at least one trigger, poller or webhook node is required 解决方案: 将 Manual Trigger 替换为 Schedule Trigger,使工作流可以被激活 """ from __future__ import annotations import os import sys import uuid from typing import Any import requests from loguru import logger # 添加项目根目录到Python路径 PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) sys.path.insert(0, PROJECT_ROOT) from app.config.config import config, current_env # 获取配置 app_config = config[current_env] # n8n API 配置 N8N_API_URL = getattr(app_config, "N8N_API_URL", "https://n8n.citupro.com") N8N_API_KEY = getattr(app_config, "N8N_API_KEY", "") def get_headers() -> dict[str, str]: """获取请求头""" return { "X-N8N-API-KEY": N8N_API_KEY, "Content-Type": "application/json", "Accept": "application/json", } def get_workflow(workflow_id: str) -> dict[str, Any]: """获取工作流详情""" url = f"{N8N_API_URL.rstrip('/')}/api/v1/workflows/{workflow_id}" response = requests.get(url, headers=get_headers(), timeout=30) response.raise_for_status() return response.json() def update_workflow(workflow_id: str, workflow_data: dict[str, Any]) -> dict[str, Any]: """更新工作流""" url = f"{N8N_API_URL.rstrip('/')}/api/v1/workflows/{workflow_id}" response = requests.put(url, headers=get_headers(), json=workflow_data, timeout=30) response.raise_for_status() return response.json() def fix_workflow_trigger(workflow_id: str) -> bool: """ 修复工作流触发器 将 Manual Trigger 替换为 Schedule Trigger Args: workflow_id: 工作流 ID Returns: 是否修复成功 """ logger.info(f"正在获取工作流 {workflow_id}...") try: # 获取当前工作流配置 workflow = get_workflow(workflow_id) logger.info(f"工作流名称: {workflow.get('name')}") nodes = workflow.get("nodes", []) connections = workflow.get("connections", {}) # 查找 Manual Trigger 节点 manual_trigger_index = None manual_trigger_name = None for i, node in enumerate(nodes): if node.get("type") == "n8n-nodes-base.manualTrigger": manual_trigger_index = i manual_trigger_name = node.get("name", "Manual Trigger") logger.info( f"找到 Manual Trigger 节点: {manual_trigger_name} (index: {i})" ) break if manual_trigger_index is None: logger.warning("未找到 Manual Trigger 节点") # 检查是否已经有 Schedule Trigger,如果有则更新其配置 for i, node in enumerate(nodes): if node.get("type") == "n8n-nodes-base.scheduleTrigger": logger.info( "工作流已包含 Schedule Trigger,更新为每天凌晨1点执行..." ) # 更新 Schedule Trigger 配置 nodes[i]["parameters"] = { "rule": { "interval": [ { "field": "days", "daysInterval": 1, "triggerAtHour": 1, "triggerAtMinute": 0, } ] } } # 更新工作流 update_data = { "name": workflow.get("name"), "nodes": nodes, "connections": connections, "settings": workflow.get("settings", {"executionOrder": "v1"}), } result = update_workflow(workflow_id, update_data) logger.info("Schedule Trigger 配置已更新为每天凌晨1点执行") logger.info(f"工作流 ID: {result.get('id')}") return True logger.error("工作流既没有 Manual Trigger 也没有 Schedule Trigger") return False # 创建新的 Schedule Trigger 节点(每天凌晨1点执行) new_trigger_name = "Schedule Trigger" schedule_trigger = { "parameters": { "rule": { "interval": [ { "field": "days", "daysInterval": 1, "triggerAtHour": 1, "triggerAtMinute": 0, } ] } }, "id": str(uuid.uuid4()), "name": new_trigger_name, "type": "n8n-nodes-base.scheduleTrigger", "typeVersion": 1.2, "position": nodes[manual_trigger_index].get("position", [250, 300]), } # 替换节点 nodes[manual_trigger_index] = schedule_trigger logger.info(f"将 {manual_trigger_name} 替换为 {new_trigger_name}") # 更新连接配置 if manual_trigger_name in connections: connections[new_trigger_name] = connections.pop(manual_trigger_name) logger.info(f"更新连接配置: {manual_trigger_name} -> {new_trigger_name}") # 构建更新数据 update_data = { "name": workflow.get("name"), "nodes": nodes, "connections": connections, "settings": workflow.get("settings", {"executionOrder": "v1"}), } # 更新工作流 logger.info("正在更新工作流...") result = update_workflow(workflow_id, update_data) logger.info(f"工作流更新成功: {result.get('name')}") logger.info(f"工作流 ID: {result.get('id')}") logger.info(f"活跃状态: {result.get('active')}") return True except requests.exceptions.RequestException as e: logger.error(f"API 请求失败: {e}") return False except Exception as e: logger.exception(f"修复工作流失败: {e}") return False def main() -> None: """主函数""" # 工作流 ID(从错误信息中获取) workflow_id = "5oIys8sZqxqQuZ5l" logger.info("=" * 60) logger.info("n8n 工作流触发器修复脚本") logger.info("=" * 60) logger.info(f"目标工作流 ID: {workflow_id}") logger.info(f"n8n API URL: {N8N_API_URL}") if fix_workflow_trigger(workflow_id): logger.info("=" * 60) logger.info("工作流修复成功!") logger.info("现在可以尝试激活工作流了") logger.info("=" * 60) sys.exit(0) else: logger.error("工作流修复失败") sys.exit(1) if __name__ == "__main__": # 配置日志 logger.remove() logger.add( sys.stderr, level="INFO", format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}", ) main()