| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 |
- #!/usr/bin/env python
- """
- 修复 n8n 工作流触发器问题
- 问题描述:
- Workflow "产品库存表原始数据导入_工作流" (ID: 5oIys8sZqxqQuZ5l) has no node
- to start the workflow - at least one trigger, poller or webhook node is required
- 解决方案:
- 将 Manual Trigger 替换为 Schedule Trigger,使工作流可以被激活
- """
- from __future__ import annotations
- import os
- import sys
- import uuid
- from typing import Any
- import requests
- from loguru import logger
- # 添加项目根目录到Python路径
- PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
- sys.path.insert(0, PROJECT_ROOT)
- from app.config.config import config, current_env
- # 获取配置
- app_config = config[current_env]
- # n8n API 配置
- N8N_API_URL = getattr(app_config, "N8N_API_URL", "https://n8n.citupro.com")
- N8N_API_KEY = getattr(app_config, "N8N_API_KEY", "")
- def get_headers() -> dict[str, str]:
- """获取请求头"""
- return {
- "X-N8N-API-KEY": N8N_API_KEY,
- "Content-Type": "application/json",
- "Accept": "application/json",
- }
- def get_workflow(workflow_id: str) -> dict[str, Any]:
- """获取工作流详情"""
- url = f"{N8N_API_URL.rstrip('/')}/api/v1/workflows/{workflow_id}"
- response = requests.get(url, headers=get_headers(), timeout=30)
- response.raise_for_status()
- return response.json()
- def update_workflow(workflow_id: str, workflow_data: dict[str, Any]) -> dict[str, Any]:
- """更新工作流"""
- url = f"{N8N_API_URL.rstrip('/')}/api/v1/workflows/{workflow_id}"
- response = requests.put(url, headers=get_headers(), json=workflow_data, timeout=30)
- response.raise_for_status()
- return response.json()
- def fix_workflow_trigger(workflow_id: str) -> bool:
- """
- 修复工作流触发器
- 将 Manual Trigger 替换为 Schedule Trigger
- Args:
- workflow_id: 工作流 ID
- Returns:
- 是否修复成功
- """
- logger.info(f"正在获取工作流 {workflow_id}...")
- try:
- # 获取当前工作流配置
- workflow = get_workflow(workflow_id)
- logger.info(f"工作流名称: {workflow.get('name')}")
- nodes = workflow.get("nodes", [])
- connections = workflow.get("connections", {})
- # 查找 Manual Trigger 节点
- manual_trigger_index = None
- manual_trigger_name = None
- for i, node in enumerate(nodes):
- if node.get("type") == "n8n-nodes-base.manualTrigger":
- manual_trigger_index = i
- manual_trigger_name = node.get("name", "Manual Trigger")
- logger.info(
- f"找到 Manual Trigger 节点: {manual_trigger_name} (index: {i})"
- )
- break
- if manual_trigger_index is None:
- logger.warning("未找到 Manual Trigger 节点")
- # 检查是否已经有 Schedule Trigger,如果有则更新其配置
- for i, node in enumerate(nodes):
- if node.get("type") == "n8n-nodes-base.scheduleTrigger":
- logger.info(
- "工作流已包含 Schedule Trigger,更新为每天凌晨1点执行..."
- )
- # 更新 Schedule Trigger 配置
- nodes[i]["parameters"] = {
- "rule": {
- "interval": [
- {
- "field": "days",
- "daysInterval": 1,
- "triggerAtHour": 1,
- "triggerAtMinute": 0,
- }
- ]
- }
- }
- # 更新工作流
- update_data = {
- "name": workflow.get("name"),
- "nodes": nodes,
- "connections": connections,
- "settings": workflow.get("settings", {"executionOrder": "v1"}),
- }
- result = update_workflow(workflow_id, update_data)
- logger.info("Schedule Trigger 配置已更新为每天凌晨1点执行")
- logger.info(f"工作流 ID: {result.get('id')}")
- return True
- logger.error("工作流既没有 Manual Trigger 也没有 Schedule Trigger")
- return False
- # 创建新的 Schedule Trigger 节点(每天凌晨1点执行)
- new_trigger_name = "Schedule Trigger"
- schedule_trigger = {
- "parameters": {
- "rule": {
- "interval": [
- {
- "field": "days",
- "daysInterval": 1,
- "triggerAtHour": 1,
- "triggerAtMinute": 0,
- }
- ]
- }
- },
- "id": str(uuid.uuid4()),
- "name": new_trigger_name,
- "type": "n8n-nodes-base.scheduleTrigger",
- "typeVersion": 1.2,
- "position": nodes[manual_trigger_index].get("position", [250, 300]),
- }
- # 替换节点
- nodes[manual_trigger_index] = schedule_trigger
- logger.info(f"将 {manual_trigger_name} 替换为 {new_trigger_name}")
- # 更新连接配置
- if manual_trigger_name in connections:
- connections[new_trigger_name] = connections.pop(manual_trigger_name)
- logger.info(f"更新连接配置: {manual_trigger_name} -> {new_trigger_name}")
- # 构建更新数据
- update_data = {
- "name": workflow.get("name"),
- "nodes": nodes,
- "connections": connections,
- "settings": workflow.get("settings", {"executionOrder": "v1"}),
- }
- # 更新工作流
- logger.info("正在更新工作流...")
- result = update_workflow(workflow_id, update_data)
- logger.info(f"工作流更新成功: {result.get('name')}")
- logger.info(f"工作流 ID: {result.get('id')}")
- logger.info(f"活跃状态: {result.get('active')}")
- return True
- except requests.exceptions.RequestException as e:
- logger.error(f"API 请求失败: {e}")
- return False
- except Exception as e:
- logger.exception(f"修复工作流失败: {e}")
- return False
- def main() -> None:
- """主函数"""
- # 工作流 ID(从错误信息中获取)
- workflow_id = "5oIys8sZqxqQuZ5l"
- logger.info("=" * 60)
- logger.info("n8n 工作流触发器修复脚本")
- logger.info("=" * 60)
- logger.info(f"目标工作流 ID: {workflow_id}")
- logger.info(f"n8n API URL: {N8N_API_URL}")
- if fix_workflow_trigger(workflow_id):
- logger.info("=" * 60)
- logger.info("工作流修复成功!")
- logger.info("现在可以尝试激活工作流了")
- logger.info("=" * 60)
- sys.exit(0)
- else:
- logger.error("工作流修复失败")
- sys.exit(1)
- if __name__ == "__main__":
- # 配置日志
- logger.remove()
- logger.add(
- sys.stderr,
- level="INFO",
- format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
- )
- main()
|