deploy_n8n_workflow.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. """
  2. n8n 工作流部署脚本
  3. 用于将本地工作流 JSON 文件部署到 n8n 服务器
  4. 使用方法:
  5. python scripts/deploy_n8n_workflow.py <workflow_json_file> [--activate]
  6. 示例:
  7. python scripts/deploy_n8n_workflow.py app/core/data_flow/n8n_workflow_nursing_project_income.json
  8. python scripts/deploy_n8n_workflow.py app/core/data_flow/n8n_workflow_nursing_project_income.json --activate
  9. """
  10. import argparse
  11. import json
  12. import logging
  13. import os
  14. import sys
  15. # 添加项目根目录到路径
  16. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  17. import requests
  18. # 配置日志
  19. logging.basicConfig(
  20. level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
  21. )
  22. logger = logging.getLogger(__name__)
  23. def load_config():
  24. """加载 n8n API 配置"""
  25. # 尝试从 Flask 配置加载
  26. try:
  27. from app.config.config import BaseConfig
  28. return {
  29. "api_url": BaseConfig.N8N_API_URL,
  30. "api_key": BaseConfig.N8N_API_KEY,
  31. "timeout": BaseConfig.N8N_API_TIMEOUT,
  32. }
  33. except (ImportError, AttributeError):
  34. # 使用环境变量
  35. return {
  36. "api_url": os.environ.get("N8N_API_URL", "https://n8n.citupro.com"),
  37. "api_key": os.environ.get("N8N_API_KEY", ""),
  38. "timeout": int(os.environ.get("N8N_API_TIMEOUT", "30")),
  39. }
  40. def load_workflow_json(file_path: str) -> dict:
  41. """加载工作流 JSON 文件"""
  42. if not os.path.exists(file_path):
  43. raise FileNotFoundError(f"工作流文件不存在: {file_path}")
  44. with open(file_path, encoding="utf-8") as f:
  45. return json.load(f)
  46. def deploy_workflow(
  47. workflow_data: dict,
  48. api_url: str,
  49. api_key: str,
  50. timeout: int = 30,
  51. activate: bool = False,
  52. ) -> dict:
  53. """
  54. 部署工作流到 n8n 服务器
  55. Args:
  56. workflow_data: 工作流 JSON 数据
  57. api_url: n8n API 地址
  58. api_key: n8n API Key
  59. timeout: 请求超时时间
  60. activate: 是否激活工作流
  61. Returns:
  62. 部署结果
  63. """
  64. headers = {
  65. "X-N8N-API-KEY": api_key,
  66. "Content-Type": "application/json",
  67. "Accept": "application/json",
  68. }
  69. # 准备工作流数据(移除 tags,n8n API 不支持直接创建带 tags)
  70. workflow_payload = {
  71. "name": workflow_data.get("name", "Untitled Workflow"),
  72. "nodes": workflow_data.get("nodes", []),
  73. "connections": workflow_data.get("connections", {}),
  74. "settings": workflow_data.get("settings", {}),
  75. }
  76. # 创建工作流
  77. create_url = f"{api_url.rstrip('/')}/api/v1/workflows"
  78. logger.info(f"正在创建工作流: {workflow_payload['name']}")
  79. logger.info(f"API URL: {create_url}")
  80. try:
  81. response = requests.post(
  82. create_url,
  83. headers=headers,
  84. json=workflow_payload,
  85. timeout=timeout,
  86. )
  87. if response.status_code == 401:
  88. raise Exception("API 认证失败,请检查 N8N_API_KEY 配置")
  89. elif response.status_code == 403:
  90. raise Exception("API 权限不足")
  91. response.raise_for_status()
  92. created_workflow = response.json()
  93. workflow_id = created_workflow.get("id")
  94. logger.info(f"工作流创建成功! ID: {workflow_id}")
  95. # 如果需要激活
  96. if activate and workflow_id:
  97. activate_url = (
  98. f"{api_url.rstrip('/')}/api/v1/workflows/{workflow_id}/activate"
  99. )
  100. logger.info("正在激活工作流...")
  101. activate_response = requests.post(
  102. activate_url,
  103. headers=headers,
  104. timeout=timeout,
  105. )
  106. activate_response.raise_for_status()
  107. logger.info("工作流激活成功!")
  108. created_workflow["active"] = True
  109. return {
  110. "success": True,
  111. "workflow_id": workflow_id,
  112. "workflow_name": created_workflow.get("name"),
  113. "active": created_workflow.get("active", False),
  114. "message": "工作流部署成功",
  115. }
  116. except requests.exceptions.Timeout as e:
  117. raise Exception("请求超时,请检查网络连接") from e
  118. except requests.exceptions.ConnectionError as e:
  119. raise Exception(f"无法连接到 n8n 服务器: {api_url}") from e
  120. except requests.exceptions.HTTPError as e:
  121. error_detail = ""
  122. try:
  123. error_detail = e.response.json()
  124. except Exception:
  125. error_detail = e.response.text
  126. raise Exception(
  127. f"HTTP 错误: {e.response.status_code}, 详情: {error_detail}"
  128. ) from e
  129. def main():
  130. """主函数"""
  131. parser = argparse.ArgumentParser(description="n8n 工作流部署工具")
  132. parser.add_argument(
  133. "workflow_file",
  134. type=str,
  135. help="工作流 JSON 文件路径",
  136. )
  137. parser.add_argument(
  138. "--activate",
  139. action="store_true",
  140. help="部署后自动激活工作流",
  141. )
  142. parser.add_argument(
  143. "--api-url",
  144. type=str,
  145. default=None,
  146. help="n8n API URL(覆盖配置)",
  147. )
  148. parser.add_argument(
  149. "--api-key",
  150. type=str,
  151. default=None,
  152. help="n8n API Key(覆盖配置)",
  153. )
  154. args = parser.parse_args()
  155. # 加载配置
  156. config = load_config()
  157. # 命令行参数覆盖配置
  158. api_url = args.api_url or config["api_url"]
  159. api_key = args.api_key or config["api_key"]
  160. timeout = config["timeout"]
  161. if not api_key:
  162. logger.error("错误: 未配置 N8N_API_KEY")
  163. logger.error("请设置环境变量 N8N_API_KEY 或使用 --api-key 参数")
  164. sys.exit(1)
  165. try:
  166. # 加载工作流文件
  167. logger.info(f"加载工作流文件: {args.workflow_file}")
  168. workflow_data = load_workflow_json(args.workflow_file)
  169. logger.info(f"工作流名称: {workflow_data.get('name', 'Unknown')}")
  170. logger.info(f"节点数量: {len(workflow_data.get('nodes', []))}")
  171. # 部署工作流
  172. result = deploy_workflow(
  173. workflow_data=workflow_data,
  174. api_url=api_url,
  175. api_key=api_key,
  176. timeout=timeout,
  177. activate=args.activate,
  178. )
  179. # 输出结果
  180. print("\n" + "=" * 60)
  181. print("部署结果")
  182. print("=" * 60)
  183. print(f"状态: {'成功' if result['success'] else '失败'}")
  184. print(f"工作流 ID: {result['workflow_id']}")
  185. print(f"工作流名称: {result['workflow_name']}")
  186. print(f"激活状态: {'已激活' if result['active'] else '未激活'}")
  187. print(f"消息: {result['message']}")
  188. print("=" * 60)
  189. # 提示 n8n 访问地址
  190. workflow_url = f"{api_url}/workflow/{result['workflow_id']}"
  191. print(f"\n在 n8n 中查看工作流: {workflow_url}")
  192. sys.exit(0)
  193. except FileNotFoundError as e:
  194. logger.error(f"错误: {str(e)}")
  195. sys.exit(1)
  196. except json.JSONDecodeError as e:
  197. logger.error(f"JSON 解析错误: {str(e)}")
  198. sys.exit(1)
  199. except Exception as e:
  200. logger.error(f"部署失败: {str(e)}")
  201. sys.exit(1)
  202. if __name__ == "__main__":
  203. main()