fix_workflow_script_path.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. #!/usr/bin/env python
  2. """
  3. 修复 n8n 工作流中的脚本路径
  4. 工作流 ID: KxIyrja1o16rNUlc
  5. 工作流名称: DF_DO202601160001_工作流
  6. 问题: 脚本路径错误,应该是 task_38_DF_DO202601160001.py 而不是 DF_DO202601160001.py
  7. """
  8. import sys
  9. from pathlib import Path
  10. import requests
  11. # 添加项目根目录到路径
  12. PROJECT_ROOT = Path(__file__).parent.parent
  13. sys.path.insert(0, str(PROJECT_ROOT))
  14. from app.config.config import BaseConfig
  15. # n8n API 配置
  16. N8N_API_URL = BaseConfig.N8N_API_URL
  17. N8N_API_KEY = BaseConfig.N8N_API_KEY
  18. N8N_API_TIMEOUT = BaseConfig.N8N_API_TIMEOUT
  19. WORKFLOW_ID = "KxIyrja1o16rNUlc"
  20. CORRECT_SCRIPT_NAME = "task_38_DF_DO202601160001.py"
  21. OLD_SCRIPT_NAME = "DF_DO202601160001.py"
  22. def get_headers():
  23. """获取请求头"""
  24. return {
  25. "X-N8N-API-KEY": N8N_API_KEY,
  26. "Content-Type": "application/json",
  27. "Accept": "application/json",
  28. }
  29. def get_workflow(workflow_id: str) -> dict:
  30. """获取工作流配置"""
  31. url = f"{N8N_API_URL.rstrip('/')}/api/v1/workflows/{workflow_id}"
  32. response = requests.get(url, headers=get_headers(), timeout=N8N_API_TIMEOUT)
  33. response.raise_for_status()
  34. return response.json()
  35. def update_workflow(workflow_id: str, workflow_data: dict) -> dict:
  36. """更新工作流"""
  37. url = f"{N8N_API_URL.rstrip('/')}/api/v1/workflows/{workflow_id}"
  38. # 只更新允许的字段
  39. update_data = {
  40. "name": workflow_data.get("name"),
  41. "nodes": workflow_data.get("nodes"),
  42. "connections": workflow_data.get("connections"),
  43. "settings": workflow_data.get("settings", {}),
  44. }
  45. response = requests.put(
  46. url, headers=get_headers(), json=update_data, timeout=N8N_API_TIMEOUT
  47. )
  48. response.raise_for_status()
  49. return response.json()
  50. def fix_workflow_script_path(workflow_id: str) -> bool:
  51. """
  52. 修复工作流中的脚本路径
  53. Args:
  54. workflow_id: 工作流 ID
  55. Returns:
  56. 是否修复成功
  57. """
  58. print(f"正在获取工作流 {workflow_id}...")
  59. workflow = get_workflow(workflow_id)
  60. print(f"工作流名称: {workflow.get('name')}")
  61. print(f"节点数量: {len(workflow.get('nodes', []))}")
  62. # 查找 Execute Script 节点
  63. updated = False
  64. for node in workflow.get("nodes", []):
  65. if node.get("type") == "n8n-nodes-base.ssh":
  66. node_name = node.get("name", "")
  67. if "Execute Script" in node_name or "execute" in node_name.lower():
  68. params = node.get("parameters", {})
  69. command = params.get("command", "")
  70. print(f"\n找到 Execute Script 节点: {node_name}")
  71. print(f"当前命令: {command}")
  72. # 检查并修复脚本路径
  73. # 使用正则表达式精确匹配脚本文件名(避免重复替换)
  74. import re
  75. # 匹配 datafactory/scripts/ 后面的脚本文件名
  76. pattern = r"(datafactory/scripts/)([^/\s]+\.py)"
  77. match = re.search(pattern, command)
  78. if match:
  79. current_script = match.group(2)
  80. if current_script == OLD_SCRIPT_NAME:
  81. new_command = re.sub(
  82. pattern,
  83. rf"\1{CORRECT_SCRIPT_NAME}",
  84. command,
  85. )
  86. params["command"] = new_command
  87. node["parameters"] = params
  88. updated = True
  89. print(f"已修复命令: {new_command}")
  90. elif current_script == CORRECT_SCRIPT_NAME:
  91. print("脚本路径已正确,无需修复")
  92. else:
  93. print(
  94. f"当前脚本: {current_script}, 期望: {CORRECT_SCRIPT_NAME}"
  95. )
  96. # 如果当前脚本不是期望的,也进行修复
  97. new_command = re.sub(
  98. pattern,
  99. rf"\1{CORRECT_SCRIPT_NAME}",
  100. command,
  101. )
  102. params["command"] = new_command
  103. node["parameters"] = params
  104. updated = True
  105. print(f"已修复命令: {new_command}")
  106. else:
  107. print(f"警告: 未找到脚本路径模式,命令为: {command}")
  108. if updated:
  109. print("\n正在更新工作流...")
  110. updated_workflow = update_workflow(workflow_id, workflow)
  111. print("[成功] 工作流更新成功!")
  112. print(f"工作流 ID: {updated_workflow.get('id')}")
  113. print(f"工作流名称: {updated_workflow.get('name')}")
  114. return True
  115. else:
  116. print("\n未找到需要修复的节点或脚本路径已正确")
  117. return False
  118. if __name__ == "__main__":
  119. try:
  120. success = fix_workflow_script_path(WORKFLOW_ID)
  121. sys.exit(0 if success else 1)
  122. except Exception as e:
  123. print(f"[错误] 修复失败: {e}")
  124. import traceback
  125. traceback.print_exc()
  126. sys.exit(1)