fix_n8n_workflow_trigger.py 7.3 KB


  1. #!/usr/bin/env python
  2. """
  3. 修复 n8n 工作流触发器问题
  4. 问题描述:
  5. Workflow "产品库存表原始数据导入_工作流" (ID: 5oIys8sZqxqQuZ5l) has no node
  6. to start the workflow - at least one trigger, poller or webhook node is required
  7. 解决方案:
  8. 将 Manual Trigger 替换为 Schedule Trigger,使工作流可以被激活
  9. """
  10. from __future__ import annotations
  11. import os
  12. import sys
  13. import uuid
  14. from typing import Any
  15. import requests
  16. from loguru import logger
  17. # 添加项目根目录到Python路径
  18. PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
  19. sys.path.insert(0, PROJECT_ROOT)
  20. from app.config.config import config, current_env
  21. # 获取配置
  22. app_config = config[current_env]
  23. # n8n API 配置
  24. N8N_API_URL = getattr(app_config, "N8N_API_URL", "https://n8n.citupro.com")
  25. N8N_API_KEY = getattr(app_config, "N8N_API_KEY", "")
  26. def get_headers() -> dict[str, str]:
  27. """获取请求头"""
  28. return {
  29. "X-N8N-API-KEY": N8N_API_KEY,
  30. "Content-Type": "application/json",
  31. "Accept": "application/json",
  32. }
  33. def get_workflow(workflow_id: str) -> dict[str, Any]:
  34. """获取工作流详情"""
  35. url = f"{N8N_API_URL.rstrip('/')}/api/v1/workflows/{workflow_id}"
  36. response = requests.get(url, headers=get_headers(), timeout=30)
  37. response.raise_for_status()
  38. return response.json()
  39. def update_workflow(workflow_id: str, workflow_data: dict[str, Any]) -> dict[str, Any]:
  40. """更新工作流"""
  41. url = f"{N8N_API_URL.rstrip('/')}/api/v1/workflows/{workflow_id}"
  42. response = requests.put(url, headers=get_headers(), json=workflow_data, timeout=30)
  43. response.raise_for_status()
  44. return response.json()
  45. def fix_workflow_trigger(workflow_id: str) -> bool:
  46. """
  47. 修复工作流触发器
  48. 将 Manual Trigger 替换为 Schedule Trigger
  49. Args:
  50. workflow_id: 工作流 ID
  51. Returns:
  52. 是否修复成功
  53. """
  54. logger.info(f"正在获取工作流 {workflow_id}...")
  55. try:
  56. # 获取当前工作流配置
  57. workflow = get_workflow(workflow_id)
  58. logger.info(f"工作流名称: {workflow.get('name')}")
  59. nodes = workflow.get("nodes", [])
  60. connections = workflow.get("connections", {})
  61. # 查找 Manual Trigger 节点
  62. manual_trigger_index = None
  63. manual_trigger_name = None
  64. for i, node in enumerate(nodes):
  65. if node.get("type") == "n8n-nodes-base.manualTrigger":
  66. manual_trigger_index = i
  67. manual_trigger_name = node.get("name", "Manual Trigger")
  68. logger.info(
  69. f"找到 Manual Trigger 节点: {manual_trigger_name} (index: {i})"
  70. )
  71. break
  72. if manual_trigger_index is None:
  73. logger.warning("未找到 Manual Trigger 节点")
  74. # 检查是否已经有 Schedule Trigger,如果有则更新其配置
  75. for i, node in enumerate(nodes):
  76. if node.get("type") == "n8n-nodes-base.scheduleTrigger":
  77. logger.info(
  78. "工作流已包含 Schedule Trigger,更新为每天凌晨1点执行..."
  79. )
  80. # 更新 Schedule Trigger 配置
  81. nodes[i]["parameters"] = {
  82. "rule": {
  83. "interval": [
  84. {
  85. "field": "days",
  86. "daysInterval": 1,
  87. "triggerAtHour": 1,
  88. "triggerAtMinute": 0,
  89. }
  90. ]
  91. }
  92. }
  93. # 更新工作流
  94. update_data = {
  95. "name": workflow.get("name"),
  96. "nodes": nodes,
  97. "connections": connections,
  98. "settings": workflow.get("settings", {"executionOrder": "v1"}),
  99. }
  100. result = update_workflow(workflow_id, update_data)
  101. logger.info("Schedule Trigger 配置已更新为每天凌晨1点执行")
  102. logger.info(f"工作流 ID: {result.get('id')}")
  103. return True
  104. logger.error("工作流既没有 Manual Trigger 也没有 Schedule Trigger")
  105. return False
  106. # 创建新的 Schedule Trigger 节点(每天凌晨1点执行)
  107. new_trigger_name = "Schedule Trigger"
  108. schedule_trigger = {
  109. "parameters": {
  110. "rule": {
  111. "interval": [
  112. {
  113. "field": "days",
  114. "daysInterval": 1,
  115. "triggerAtHour": 1,
  116. "triggerAtMinute": 0,
  117. }
  118. ]
  119. }
  120. },
  121. "id": str(uuid.uuid4()),
  122. "name": new_trigger_name,
  123. "type": "n8n-nodes-base.scheduleTrigger",
  124. "typeVersion": 1.2,
  125. "position": nodes[manual_trigger_index].get("position", [250, 300]),
  126. }
  127. # 替换节点
  128. nodes[manual_trigger_index] = schedule_trigger
  129. logger.info(f"将 {manual_trigger_name} 替换为 {new_trigger_name}")
  130. # 更新连接配置
  131. if manual_trigger_name in connections:
  132. connections[new_trigger_name] = connections.pop(manual_trigger_name)
  133. logger.info(f"更新连接配置: {manual_trigger_name} -> {new_trigger_name}")
  134. # 构建更新数据
  135. update_data = {
  136. "name": workflow.get("name"),
  137. "nodes": nodes,
  138. "connections": connections,
  139. "settings": workflow.get("settings", {"executionOrder": "v1"}),
  140. }
  141. # 更新工作流
  142. logger.info("正在更新工作流...")
  143. result = update_workflow(workflow_id, update_data)
  144. logger.info(f"工作流更新成功: {result.get('name')}")
  145. logger.info(f"工作流 ID: {result.get('id')}")
  146. logger.info(f"活跃状态: {result.get('active')}")
  147. return True
  148. except requests.exceptions.RequestException as e:
  149. logger.error(f"API 请求失败: {e}")
  150. return False
  151. except Exception as e:
  152. logger.exception(f"修复工作流失败: {e}")
  153. return False
  154. def main() -> None:
  155. """主函数"""
  156. # 工作流 ID(从错误信息中获取)
  157. workflow_id = "5oIys8sZqxqQuZ5l"
  158. logger.info("=" * 60)
  159. logger.info("n8n 工作流触发器修复脚本")
  160. logger.info("=" * 60)
  161. logger.info(f"目标工作流 ID: {workflow_id}")
  162. logger.info(f"n8n API URL: {N8N_API_URL}")
  163. if fix_workflow_trigger(workflow_id):
  164. logger.info("=" * 60)
  165. logger.info("工作流修复成功!")
  166. logger.info("现在可以尝试激活工作流了")
  167. logger.info("=" * 60)
  168. sys.exit(0)
  169. else:
  170. logger.error("工作流修复失败")
  171. sys.exit(1)
  172. if __name__ == "__main__":
  173. # 配置日志
  174. logger.remove()
  175. logger.add(
  176. sys.stderr,
  177. level="INFO",
  178. format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
  179. )
  180. main()