| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- """
- n8n 工作流部署脚本
- 用于将本地工作流 JSON 文件部署到 n8n 服务器
- 使用方法:
- python scripts/deploy_n8n_workflow.py <workflow_json_file> [--activate]
- 示例:
- python scripts/deploy_n8n_workflow.py app/core/data_flow/n8n_workflow_nursing_project_income.json
- python scripts/deploy_n8n_workflow.py app/core/data_flow/n8n_workflow_nursing_project_income.json --activate
- """
- import argparse
- import json
- import logging
- import os
- import sys
- # 添加项目根目录到路径
- sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- import requests
- # 配置日志
- logging.basicConfig(
- level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
- )
- logger = logging.getLogger(__name__)
- def load_config():
- """加载 n8n API 配置"""
- # 尝试从 Flask 配置加载
- try:
- from app.config.config import BaseConfig
- return {
- "api_url": BaseConfig.N8N_API_URL,
- "api_key": BaseConfig.N8N_API_KEY,
- "timeout": BaseConfig.N8N_API_TIMEOUT,
- }
- except (ImportError, AttributeError):
- # 使用环境变量
- return {
- "api_url": os.environ.get("N8N_API_URL", "https://n8n.citupro.com"),
- "api_key": os.environ.get("N8N_API_KEY", ""),
- "timeout": int(os.environ.get("N8N_API_TIMEOUT", "30")),
- }
- def load_workflow_json(file_path: str) -> dict:
- """加载工作流 JSON 文件"""
- if not os.path.exists(file_path):
- raise FileNotFoundError(f"工作流文件不存在: {file_path}")
- with open(file_path, encoding="utf-8") as f:
- return json.load(f)
- def deploy_workflow(
- workflow_data: dict,
- api_url: str,
- api_key: str,
- timeout: int = 30,
- activate: bool = False,
- ) -> dict:
- """
- 部署工作流到 n8n 服务器
- Args:
- workflow_data: 工作流 JSON 数据
- api_url: n8n API 地址
- api_key: n8n API Key
- timeout: 请求超时时间
- activate: 是否激活工作流
- Returns:
- 部署结果
- """
- headers = {
- "X-N8N-API-KEY": api_key,
- "Content-Type": "application/json",
- "Accept": "application/json",
- }
- # 准备工作流数据(移除 tags,n8n API 不支持直接创建带 tags)
- workflow_payload = {
- "name": workflow_data.get("name", "Untitled Workflow"),
- "nodes": workflow_data.get("nodes", []),
- "connections": workflow_data.get("connections", {}),
- "settings": workflow_data.get("settings", {}),
- }
- # 创建工作流
- create_url = f"{api_url.rstrip('/')}/api/v1/workflows"
- logger.info(f"正在创建工作流: {workflow_payload['name']}")
- logger.info(f"API URL: {create_url}")
- try:
- response = requests.post(
- create_url,
- headers=headers,
- json=workflow_payload,
- timeout=timeout,
- )
- if response.status_code == 401:
- raise Exception("API 认证失败,请检查 N8N_API_KEY 配置")
- elif response.status_code == 403:
- raise Exception("API 权限不足")
- response.raise_for_status()
- created_workflow = response.json()
- workflow_id = created_workflow.get("id")
- logger.info(f"工作流创建成功! ID: {workflow_id}")
- # 如果需要激活
- if activate and workflow_id:
- activate_url = (
- f"{api_url.rstrip('/')}/api/v1/workflows/{workflow_id}/activate"
- )
- logger.info("正在激活工作流...")
- activate_response = requests.post(
- activate_url,
- headers=headers,
- timeout=timeout,
- )
- activate_response.raise_for_status()
- logger.info("工作流激活成功!")
- created_workflow["active"] = True
- return {
- "success": True,
- "workflow_id": workflow_id,
- "workflow_name": created_workflow.get("name"),
- "active": created_workflow.get("active", False),
- "message": "工作流部署成功",
- }
- except requests.exceptions.Timeout as e:
- raise Exception("请求超时,请检查网络连接") from e
- except requests.exceptions.ConnectionError as e:
- raise Exception(f"无法连接到 n8n 服务器: {api_url}") from e
- except requests.exceptions.HTTPError as e:
- error_detail = ""
- try:
- error_detail = e.response.json()
- except Exception:
- error_detail = e.response.text
- raise Exception(
- f"HTTP 错误: {e.response.status_code}, 详情: {error_detail}"
- ) from e
- def main():
- """主函数"""
- parser = argparse.ArgumentParser(description="n8n 工作流部署工具")
- parser.add_argument(
- "workflow_file",
- type=str,
- help="工作流 JSON 文件路径",
- )
- parser.add_argument(
- "--activate",
- action="store_true",
- help="部署后自动激活工作流",
- )
- parser.add_argument(
- "--api-url",
- type=str,
- default=None,
- help="n8n API URL(覆盖配置)",
- )
- parser.add_argument(
- "--api-key",
- type=str,
- default=None,
- help="n8n API Key(覆盖配置)",
- )
- args = parser.parse_args()
- # 加载配置
- config = load_config()
- # 命令行参数覆盖配置
- api_url = args.api_url or config["api_url"]
- api_key = args.api_key or config["api_key"]
- timeout = config["timeout"]
- if not api_key:
- logger.error("错误: 未配置 N8N_API_KEY")
- logger.error("请设置环境变量 N8N_API_KEY 或使用 --api-key 参数")
- sys.exit(1)
- try:
- # 加载工作流文件
- logger.info(f"加载工作流文件: {args.workflow_file}")
- workflow_data = load_workflow_json(args.workflow_file)
- logger.info(f"工作流名称: {workflow_data.get('name', 'Unknown')}")
- logger.info(f"节点数量: {len(workflow_data.get('nodes', []))}")
- # 部署工作流
- result = deploy_workflow(
- workflow_data=workflow_data,
- api_url=api_url,
- api_key=api_key,
- timeout=timeout,
- activate=args.activate,
- )
- # 输出结果
- print("\n" + "=" * 60)
- print("部署结果")
- print("=" * 60)
- print(f"状态: {'成功' if result['success'] else '失败'}")
- print(f"工作流 ID: {result['workflow_id']}")
- print(f"工作流名称: {result['workflow_name']}")
- print(f"激活状态: {'已激活' if result['active'] else '未激活'}")
- print(f"消息: {result['message']}")
- print("=" * 60)
- # 提示 n8n 访问地址
- workflow_url = f"{api_url}/workflow/{result['workflow_id']}"
- print(f"\n在 n8n 中查看工作流: {workflow_url}")
- sys.exit(0)
- except FileNotFoundError as e:
- logger.error(f"错误: {str(e)}")
- sys.exit(1)
- except json.JSONDecodeError as e:
- logger.error(f"JSON 解析错误: {str(e)}")
- sys.exit(1)
- except Exception as e:
- logger.error(f"部署失败: {str(e)}")
- sys.exit(1)
- if __name__ == "__main__":
- main()
|