""" n8n 工作流部署脚本 用于将本地工作流 JSON 文件部署到 n8n 服务器 使用方法: python scripts/deploy_n8n_workflow.py [--activate] 示例: python scripts/deploy_n8n_workflow.py app/core/data_flow/n8n_workflow_nursing_project_income.json python scripts/deploy_n8n_workflow.py app/core/data_flow/n8n_workflow_nursing_project_income.json --activate """ import argparse import json import logging import os import sys # 添加项目根目录到路径 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import requests # 配置日志 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) def load_config(): """加载 n8n API 配置""" # 尝试从 Flask 配置加载 try: from app.config.config import BaseConfig return { "api_url": BaseConfig.N8N_API_URL, "api_key": BaseConfig.N8N_API_KEY, "timeout": BaseConfig.N8N_API_TIMEOUT, } except (ImportError, AttributeError): # 使用环境变量 return { "api_url": os.environ.get("N8N_API_URL", "https://n8n.citupro.com"), "api_key": os.environ.get("N8N_API_KEY", ""), "timeout": int(os.environ.get("N8N_API_TIMEOUT", "30")), } def load_workflow_json(file_path: str) -> dict: """加载工作流 JSON 文件""" if not os.path.exists(file_path): raise FileNotFoundError(f"工作流文件不存在: {file_path}") with open(file_path, encoding="utf-8") as f: return json.load(f) def deploy_workflow( workflow_data: dict, api_url: str, api_key: str, timeout: int = 30, activate: bool = False, ) -> dict: """ 部署工作流到 n8n 服务器 Args: workflow_data: 工作流 JSON 数据 api_url: n8n API 地址 api_key: n8n API Key timeout: 请求超时时间 activate: 是否激活工作流 Returns: 部署结果 """ headers = { "X-N8N-API-KEY": api_key, "Content-Type": "application/json", "Accept": "application/json", } # 准备工作流数据(移除 tags,n8n API 不支持直接创建带 tags) workflow_payload = { "name": workflow_data.get("name", "Untitled Workflow"), "nodes": workflow_data.get("nodes", []), "connections": workflow_data.get("connections", {}), "settings": workflow_data.get("settings", {}), } # 创建工作流 create_url = f"{api_url.rstrip('/')}/api/v1/workflows" logger.info(f"正在创建工作流: {workflow_payload['name']}") logger.info(f"API URL: {create_url}") try: response = requests.post( create_url, headers=headers, json=workflow_payload, timeout=timeout, ) if response.status_code == 401: raise Exception("API 认证失败,请检查 N8N_API_KEY 配置") elif response.status_code == 403: raise Exception("API 权限不足") response.raise_for_status() created_workflow = response.json() workflow_id = created_workflow.get("id") logger.info(f"工作流创建成功! ID: {workflow_id}") # 如果需要激活 if activate and workflow_id: activate_url = ( f"{api_url.rstrip('/')}/api/v1/workflows/{workflow_id}/activate" ) logger.info("正在激活工作流...") activate_response = requests.post( activate_url, headers=headers, timeout=timeout, ) activate_response.raise_for_status() logger.info("工作流激活成功!") created_workflow["active"] = True return { "success": True, "workflow_id": workflow_id, "workflow_name": created_workflow.get("name"), "active": created_workflow.get("active", False), "message": "工作流部署成功", } except requests.exceptions.Timeout as e: raise Exception("请求超时,请检查网络连接") from e except requests.exceptions.ConnectionError as e: raise Exception(f"无法连接到 n8n 服务器: {api_url}") from e except requests.exceptions.HTTPError as e: error_detail = "" try: error_detail = e.response.json() except Exception: error_detail = e.response.text raise Exception( f"HTTP 错误: {e.response.status_code}, 详情: {error_detail}" ) from e def main(): """主函数""" parser = argparse.ArgumentParser(description="n8n 工作流部署工具") parser.add_argument( "workflow_file", type=str, help="工作流 JSON 文件路径", ) parser.add_argument( "--activate", action="store_true", help="部署后自动激活工作流", ) parser.add_argument( "--api-url", type=str, default=None, help="n8n API URL(覆盖配置)", ) parser.add_argument( "--api-key", type=str, default=None, help="n8n API Key(覆盖配置)", ) args = parser.parse_args() # 加载配置 config = load_config() # 命令行参数覆盖配置 api_url = args.api_url or config["api_url"] api_key = args.api_key or config["api_key"] timeout = config["timeout"] if not api_key: logger.error("错误: 未配置 N8N_API_KEY") logger.error("请设置环境变量 N8N_API_KEY 或使用 --api-key 参数") sys.exit(1) try: # 加载工作流文件 logger.info(f"加载工作流文件: {args.workflow_file}") workflow_data = load_workflow_json(args.workflow_file) logger.info(f"工作流名称: {workflow_data.get('name', 'Unknown')}") logger.info(f"节点数量: {len(workflow_data.get('nodes', []))}") # 部署工作流 result = deploy_workflow( workflow_data=workflow_data, api_url=api_url, api_key=api_key, timeout=timeout, activate=args.activate, ) # 输出结果 print("\n" + "=" * 60) print("部署结果") print("=" * 60) print(f"状态: {'成功' if result['success'] else '失败'}") print(f"工作流 ID: {result['workflow_id']}") print(f"工作流名称: {result['workflow_name']}") print(f"激活状态: {'已激活' if result['active'] else '未激活'}") print(f"消息: {result['message']}") print("=" * 60) # 提示 n8n 访问地址 workflow_url = f"{api_url}/workflow/{result['workflow_id']}" print(f"\n在 n8n 中查看工作流: {workflow_url}") sys.exit(0) except FileNotFoundError as e: logger.error(f"错误: {str(e)}") sys.exit(1) except json.JSONDecodeError as e: logger.error(f"JSON 解析错误: {str(e)}") sys.exit(1) except Exception as e: logger.error(f"部署失败: {str(e)}") sys.exit(1) if __name__ == "__main__": main()