| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- #!/usr/bin/env python
- """
- 修复 n8n 工作流中的脚本路径
- 工作流 ID: KxIyrja1o16rNUlc
- 工作流名称: DF_DO202601160001_工作流
- 问题: 脚本路径错误,应该是 task_38_DF_DO202601160001.py 而不是 DF_DO202601160001.py
- """
- import sys
- from pathlib import Path
- import requests
- # 添加项目根目录到路径
- PROJECT_ROOT = Path(__file__).parent.parent
- sys.path.insert(0, str(PROJECT_ROOT))
- from app.config.config import BaseConfig
- # n8n API 配置
- N8N_API_URL = BaseConfig.N8N_API_URL
- N8N_API_KEY = BaseConfig.N8N_API_KEY
- N8N_API_TIMEOUT = BaseConfig.N8N_API_TIMEOUT
- WORKFLOW_ID = "KxIyrja1o16rNUlc"
- CORRECT_SCRIPT_NAME = "task_38_DF_DO202601160001.py"
- OLD_SCRIPT_NAME = "DF_DO202601160001.py"
- def get_headers():
- """获取请求头"""
- return {
- "X-N8N-API-KEY": N8N_API_KEY,
- "Content-Type": "application/json",
- "Accept": "application/json",
- }
- def get_workflow(workflow_id: str) -> dict:
- """获取工作流配置"""
- url = f"{N8N_API_URL.rstrip('/')}/api/v1/workflows/{workflow_id}"
- response = requests.get(url, headers=get_headers(), timeout=N8N_API_TIMEOUT)
- response.raise_for_status()
- return response.json()
- def update_workflow(workflow_id: str, workflow_data: dict) -> dict:
- """更新工作流"""
- url = f"{N8N_API_URL.rstrip('/')}/api/v1/workflows/{workflow_id}"
- # 只更新允许的字段
- update_data = {
- "name": workflow_data.get("name"),
- "nodes": workflow_data.get("nodes"),
- "connections": workflow_data.get("connections"),
- "settings": workflow_data.get("settings", {}),
- }
- response = requests.put(
- url, headers=get_headers(), json=update_data, timeout=N8N_API_TIMEOUT
- )
- response.raise_for_status()
- return response.json()
- def fix_workflow_script_path(workflow_id: str) -> bool:
- """
- 修复工作流中的脚本路径
- Args:
- workflow_id: 工作流 ID
- Returns:
- 是否修复成功
- """
- print(f"正在获取工作流 {workflow_id}...")
- workflow = get_workflow(workflow_id)
- print(f"工作流名称: {workflow.get('name')}")
- print(f"节点数量: {len(workflow.get('nodes', []))}")
- # 查找 Execute Script 节点
- updated = False
- for node in workflow.get("nodes", []):
- if node.get("type") == "n8n-nodes-base.ssh":
- node_name = node.get("name", "")
- if "Execute Script" in node_name or "execute" in node_name.lower():
- params = node.get("parameters", {})
- command = params.get("command", "")
- print(f"\n找到 Execute Script 节点: {node_name}")
- print(f"当前命令: {command}")
- # 检查并修复脚本路径
- # 使用正则表达式精确匹配脚本文件名(避免重复替换)
- import re
- # 匹配 datafactory/scripts/ 后面的脚本文件名
- pattern = r"(datafactory/scripts/)([^/\s]+\.py)"
- match = re.search(pattern, command)
- if match:
- current_script = match.group(2)
- if current_script == OLD_SCRIPT_NAME:
- new_command = re.sub(
- pattern,
- rf"\1{CORRECT_SCRIPT_NAME}",
- command,
- )
- params["command"] = new_command
- node["parameters"] = params
- updated = True
- print(f"已修复命令: {new_command}")
- elif current_script == CORRECT_SCRIPT_NAME:
- print("脚本路径已正确,无需修复")
- else:
- print(
- f"当前脚本: {current_script}, 期望: {CORRECT_SCRIPT_NAME}"
- )
- # 如果当前脚本不是期望的,也进行修复
- new_command = re.sub(
- pattern,
- rf"\1{CORRECT_SCRIPT_NAME}",
- command,
- )
- params["command"] = new_command
- node["parameters"] = params
- updated = True
- print(f"已修复命令: {new_command}")
- else:
- print(f"警告: 未找到脚本路径模式,命令为: {command}")
- if updated:
- print("\n正在更新工作流...")
- updated_workflow = update_workflow(workflow_id, workflow)
- print("[成功] 工作流更新成功!")
- print(f"工作流 ID: {updated_workflow.get('id')}")
- print(f"工作流名称: {updated_workflow.get('name')}")
- return True
- else:
- print("\n未找到需要修复的节点或脚本路径已正确")
- return False
- if __name__ == "__main__":
- try:
- success = fix_workflow_script_path(WORKFLOW_ID)
- sys.exit(0 if success else 1)
- except Exception as e:
- print(f"[错误] 修复失败: {e}")
- import traceback
- traceback.print_exc()
- sys.exit(1)
|