#!/usr/bin/env python """ 修复 n8n 工作流中的脚本路径 工作流 ID: KxIyrja1o16rNUlc 工作流名称: DF_DO202601160001_工作流 问题: 脚本路径错误,应该是 task_38_DF_DO202601160001.py 而不是 DF_DO202601160001.py """ import sys from pathlib import Path import requests # 添加项目根目录到路径 PROJECT_ROOT = Path(__file__).parent.parent sys.path.insert(0, str(PROJECT_ROOT)) from app.config.config import BaseConfig # n8n API 配置 N8N_API_URL = BaseConfig.N8N_API_URL N8N_API_KEY = BaseConfig.N8N_API_KEY N8N_API_TIMEOUT = BaseConfig.N8N_API_TIMEOUT WORKFLOW_ID = "KxIyrja1o16rNUlc" CORRECT_SCRIPT_NAME = "task_38_DF_DO202601160001.py" OLD_SCRIPT_NAME = "DF_DO202601160001.py" def get_headers(): """获取请求头""" return { "X-N8N-API-KEY": N8N_API_KEY, "Content-Type": "application/json", "Accept": "application/json", } def get_workflow(workflow_id: str) -> dict: """获取工作流配置""" url = f"{N8N_API_URL.rstrip('/')}/api/v1/workflows/{workflow_id}" response = requests.get(url, headers=get_headers(), timeout=N8N_API_TIMEOUT) response.raise_for_status() return response.json() def update_workflow(workflow_id: str, workflow_data: dict) -> dict: """更新工作流""" url = f"{N8N_API_URL.rstrip('/')}/api/v1/workflows/{workflow_id}" # 只更新允许的字段 update_data = { "name": workflow_data.get("name"), "nodes": workflow_data.get("nodes"), "connections": workflow_data.get("connections"), "settings": workflow_data.get("settings", {}), } response = requests.put( url, headers=get_headers(), json=update_data, timeout=N8N_API_TIMEOUT ) response.raise_for_status() return response.json() def fix_workflow_script_path(workflow_id: str) -> bool: """ 修复工作流中的脚本路径 Args: workflow_id: 工作流 ID Returns: 是否修复成功 """ print(f"正在获取工作流 {workflow_id}...") workflow = get_workflow(workflow_id) print(f"工作流名称: {workflow.get('name')}") print(f"节点数量: {len(workflow.get('nodes', []))}") # 查找 Execute Script 节点 updated = False for node in workflow.get("nodes", []): if node.get("type") == "n8n-nodes-base.ssh": node_name = node.get("name", "") if "Execute Script" in node_name or "execute" in node_name.lower(): params = node.get("parameters", {}) command = params.get("command", "") print(f"\n找到 Execute Script 节点: {node_name}") print(f"当前命令: {command}") # 检查并修复脚本路径 # 使用正则表达式精确匹配脚本文件名(避免重复替换) import re # 匹配 datafactory/scripts/ 后面的脚本文件名 pattern = r"(datafactory/scripts/)([^/\s]+\.py)" match = re.search(pattern, command) if match: current_script = match.group(2) if current_script == OLD_SCRIPT_NAME: new_command = re.sub( pattern, rf"\1{CORRECT_SCRIPT_NAME}", command, ) params["command"] = new_command node["parameters"] = params updated = True print(f"已修复命令: {new_command}") elif current_script == CORRECT_SCRIPT_NAME: print("脚本路径已正确,无需修复") else: print( f"当前脚本: {current_script}, 期望: {CORRECT_SCRIPT_NAME}" ) # 如果当前脚本不是期望的,也进行修复 new_command = re.sub( pattern, rf"\1{CORRECT_SCRIPT_NAME}", command, ) params["command"] = new_command node["parameters"] = params updated = True print(f"已修复命令: {new_command}") else: print(f"警告: 未找到脚本路径模式,命令为: {command}") if updated: print("\n正在更新工作流...") updated_workflow = update_workflow(workflow_id, workflow) print("[成功] 工作流更新成功!") print(f"工作流 ID: {updated_workflow.get('id')}") print(f"工作流名称: {updated_workflow.get('name')}") return True else: print("\n未找到需要修复的节点或脚本路径已正确") return False if __name__ == "__main__": try: success = fix_workflow_script_path(WORKFLOW_ID) sys.exit(0 if success else 1) except Exception as e: print(f"[错误] 修复失败: {e}") import traceback traceback.print_exc() sys.exit(1)