routes.py 12 KB


  1. """
  2. Data Factory API 路由
  3. 提供 n8n 工作流管理相关接口
  4. """
  5. import json
  6. import logging
  7. from flask import request
  8. from app.api.data_factory import bp
  9. from app.core.data_factory.n8n_client import N8nClientError
  10. from app.core.data_factory.n8n_service import N8nService
  11. from app.core.graph.graph_operations import MyEncoder
  12. from app.models.result import failed, success
  13. logger = logging.getLogger(__name__)
  14. # ==================== 工作流相关接口 ====================
  15. @bp.route("/workflows", methods=["GET"])
  16. def get_workflows():
  17. """
  18. 获取工作流列表
  19. Query Parameters:
  20. page: 页码,默认 1
  21. page_size: 每页数量,默认 20
  22. active: 过滤活跃状态 (true/false)
  23. search: 搜索关键词
  24. tags: 标签过滤,逗号分隔
  25. """
  26. try:
  27. page = request.args.get("page", 1, type=int)
  28. page_size = request.args.get("page_size", 20, type=int)
  29. search = request.args.get("search", "")
  30. # 处理 active 参数
  31. active_param = request.args.get("active")
  32. active = None
  33. if active_param is not None:
  34. active = active_param.lower() == "true"
  35. # 处理 tags 参数
  36. tags_param = request.args.get("tags", "")
  37. tags = (
  38. [t.strip() for t in tags_param.split(",") if t.strip()]
  39. if tags_param
  40. else None
  41. )
  42. result = N8nService.get_workflows(
  43. page=page,
  44. page_size=page_size,
  45. active=active if active is not None else False,
  46. tags=tags if tags is not None else [],
  47. search=search,
  48. )
  49. res = success(result, "获取工作流列表成功")
  50. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  51. except N8nClientError as e:
  52. logger.error(f"获取工作流列表失败: {e.message}")
  53. res = failed(f"获取工作流列表失败: {e.message}", code=e.status_code or 500)
  54. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  55. except Exception as e:
  56. logger.error(f"获取工作流列表失败: {str(e)}")
  57. res = failed(f"获取工作流列表失败: {str(e)}")
  58. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  59. @bp.route("/workflows/<workflow_id>", methods=["GET"])
  60. def get_workflow(workflow_id):
  61. """
  62. 获取工作流详情
  63. Path Parameters:
  64. workflow_id: 工作流 ID
  65. """
  66. try:
  67. result = N8nService.get_workflow_by_id(workflow_id)
  68. res = success(result, "获取工作流详情成功")
  69. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  70. except N8nClientError as e:
  71. logger.error(f"获取工作流详情失败: {e.message}")
  72. code = e.status_code or 500
  73. if e.status_code == 404:
  74. res = failed("工作流不存在", code=404)
  75. else:
  76. res = failed(f"获取工作流详情失败: {e.message}", code=code)
  77. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  78. except Exception as e:
  79. logger.error(f"获取工作流详情失败: {str(e)}")
  80. res = failed(f"获取工作流详情失败: {str(e)}")
  81. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  82. @bp.route("/workflows/<workflow_id>/status", methods=["GET"])
  83. def get_workflow_status(workflow_id):
  84. """
  85. 获取工作流状态
  86. Path Parameters:
  87. workflow_id: 工作流 ID
  88. """
  89. try:
  90. result = N8nService.get_workflow_status(workflow_id)
  91. res = success(result, "获取工作流状态成功")
  92. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  93. except N8nClientError as e:
  94. logger.error(f"获取工作流状态失败: {e.message}")
  95. code = e.status_code or 500
  96. if e.status_code == 404:
  97. res = failed("工作流不存在", code=404)
  98. else:
  99. res = failed(f"获取工作流状态失败: {e.message}", code=code)
  100. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  101. except Exception as e:
  102. logger.error(f"获取工作流状态失败: {str(e)}")
  103. res = failed(f"获取工作流状态失败: {str(e)}")
  104. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  105. @bp.route("/workflows/<workflow_id>/activate", methods=["POST"])
  106. def activate_workflow(workflow_id):
  107. """
  108. 激活工作流
  109. Path Parameters:
  110. workflow_id: 工作流 ID
  111. """
  112. try:
  113. result = N8nService.activate_workflow(workflow_id)
  114. res = success(result, "工作流激活成功")
  115. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  116. except N8nClientError as e:
  117. logger.error(f"激活工作流失败: {e.message}")
  118. code = e.status_code or 500
  119. res = failed(f"激活工作流失败: {e.message}", code=code)
  120. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  121. except Exception as e:
  122. logger.error(f"激活工作流失败: {str(e)}")
  123. res = failed(f"激活工作流失败: {str(e)}")
  124. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  125. @bp.route("/workflows/<workflow_id>/deactivate", methods=["POST"])
  126. def deactivate_workflow(workflow_id):
  127. """
  128. 停用工作流
  129. Path Parameters:
  130. workflow_id: 工作流 ID
  131. """
  132. try:
  133. result = N8nService.deactivate_workflow(workflow_id)
  134. res = success(result, "工作流停用成功")
  135. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  136. except N8nClientError as e:
  137. logger.error(f"停用工作流失败: {e.message}")
  138. code = e.status_code or 500
  139. res = failed(f"停用工作流失败: {e.message}", code=code)
  140. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  141. except Exception as e:
  142. logger.error(f"停用工作流失败: {str(e)}")
  143. res = failed(f"停用工作流失败: {str(e)}")
  144. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  145. # ==================== 执行记录相关接口 ====================
  146. @bp.route("/workflows/<workflow_id>/executions", methods=["GET"])
  147. def get_workflow_executions(workflow_id):
  148. """
  149. 获取工作流的执行记录列表
  150. Path Parameters:
  151. workflow_id: 工作流 ID
  152. Query Parameters:
  153. page: 页码,默认 1
  154. page_size: 每页数量,默认 20
  155. status: 状态过滤 (success/error/waiting)
  156. """
  157. try:
  158. page = request.args.get("page", 1, type=int)
  159. page_size = request.args.get("page_size", 20, type=int)
  160. status = request.args.get("status")
  161. result = N8nService.get_executions(
  162. workflow_id=workflow_id,
  163. status=status if status is not None else "",
  164. page=page,
  165. page_size=page_size,
  166. )
  167. res = success(result, "获取执行记录列表成功")
  168. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  169. except N8nClientError as e:
  170. logger.error(f"获取执行记录列表失败: {e.message}")
  171. res = failed(f"获取执行记录列表失败: {e.message}", code=e.status_code or 500)
  172. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  173. except Exception as e:
  174. logger.error(f"获取执行记录列表失败: {str(e)}")
  175. res = failed(f"获取执行记录列表失败: {str(e)}")
  176. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  177. @bp.route("/executions", methods=["GET"])
  178. def get_all_executions():
  179. """
  180. 获取所有执行记录列表
  181. Query Parameters:
  182. page: 页码,默认 1
  183. page_size: 每页数量,默认 20
  184. workflow_id: 工作流 ID 过滤(可选)
  185. status: 状态过滤 (success/error/waiting)
  186. """
  187. try:
  188. page = request.args.get("page", 1, type=int)
  189. page_size = request.args.get("page_size", 20, type=int)
  190. workflow_id = request.args.get("workflow_id")
  191. status = request.args.get("status")
  192. result = N8nService.get_executions(
  193. workflow_id=workflow_id if workflow_id is not None else "",
  194. status=status if status is not None else "",
  195. page=page,
  196. page_size=page_size,
  197. )
  198. res = success(result, "获取执行记录列表成功")
  199. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  200. except N8nClientError as e:
  201. logger.error(f"获取执行记录列表失败: {e.message}")
  202. res = failed(f"获取执行记录列表失败: {e.message}", code=e.status_code or 500)
  203. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  204. except Exception as e:
  205. logger.error(f"获取执行记录列表失败: {str(e)}")
  206. res = failed(f"获取执行记录列表失败: {str(e)}")
  207. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  208. @bp.route("/executions/<execution_id>", methods=["GET"])
  209. def get_execution(execution_id):
  210. """
  211. 获取执行详情
  212. Path Parameters:
  213. execution_id: 执行 ID
  214. """
  215. try:
  216. result = N8nService.get_execution_by_id(execution_id)
  217. res = success(result, "获取执行详情成功")
  218. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  219. except N8nClientError as e:
  220. logger.error(f"获取执行详情失败: {e.message}")
  221. code = e.status_code or 500
  222. if e.status_code == 404:
  223. res = failed("执行记录不存在", code=404)
  224. else:
  225. res = failed(f"获取执行详情失败: {e.message}", code=code)
  226. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  227. except Exception as e:
  228. logger.error(f"获取执行详情失败: {str(e)}")
  229. res = failed(f"获取执行详情失败: {str(e)}")
  230. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  231. # ==================== 工作流触发接口 ====================
  232. @bp.route("/workflows/<workflow_id>/execute", methods=["POST"])
  233. def execute_workflow(workflow_id):
  234. """
  235. 触发工作流执行
  236. Path Parameters:
  237. workflow_id: 工作流 ID
  238. Request Body:
  239. webhook_path: Webhook 路径(必填,如果工作流使用 Webhook 触发器)
  240. data: 触发数据(可选)
  241. """
  242. try:
  243. json_data = request.get_json() or {}
  244. webhook_path = json_data.get("webhook_path")
  245. data = json_data.get("data", {})
  246. result = N8nService.trigger_workflow(
  247. workflow_id=workflow_id,
  248. webhook_path=webhook_path if webhook_path is not None else "",
  249. data=data,
  250. )
  251. if result.get("success"):
  252. res = success(result, "工作流触发成功")
  253. else:
  254. res = failed(result.get("message", "工作流触发失败"), code=400, data=result)
  255. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  256. except N8nClientError as e:
  257. logger.error(f"触发工作流失败: {e.message}")
  258. res = failed(f"触发工作流失败: {e.message}", code=e.status_code or 500)
  259. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  260. except Exception as e:
  261. logger.error(f"触发工作流失败: {str(e)}")
  262. res = failed(f"触发工作流失败: {str(e)}")
  263. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  264. # ==================== 健康检查接口 ====================
  265. @bp.route("/health", methods=["GET"])
  266. def health_check():
  267. """
  268. 检查 n8n 服务连接状态
  269. """
  270. try:
  271. result = N8nService.health_check()
  272. if result.get("connected"):
  273. res = success(result, "n8n 服务连接正常")
  274. else:
  275. res = failed(
  276. f"n8n 服务连接失败: {result.get('error', '未知错误')}",
  277. code=503,
  278. data=result,
  279. )
  280. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  281. except Exception as e:
  282. logger.error(f"健康检查失败: {str(e)}")
  283. res = failed(f"健康检查失败: {str(e)}")
  284. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)