| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350 |
- """
- Data Factory API 路由
- 提供 n8n 工作流管理相关接口
- """
- import json
- import logging
- from flask import request
- from app.api.data_factory import bp
- from app.core.data_factory.n8n_client import N8nClientError
- from app.core.data_factory.n8n_service import N8nService
- from app.core.graph.graph_operations import MyEncoder
- from app.models.result import failed, success
- logger = logging.getLogger(__name__)
- # ==================== 工作流相关接口 ====================
- @bp.route("/workflows", methods=["GET"])
- def get_workflows():
- """
- 获取工作流列表
- Query Parameters:
- page: 页码,默认 1
- page_size: 每页数量,默认 20
- active: 过滤活跃状态 (true/false)
- search: 搜索关键词
- tags: 标签过滤,逗号分隔
- """
- try:
- page = request.args.get("page", 1, type=int)
- page_size = request.args.get("page_size", 20, type=int)
- search = request.args.get("search", "")
- # 处理 active 参数
- active_param = request.args.get("active")
- active = None
- if active_param is not None:
- active = active_param.lower() == "true"
- # 处理 tags 参数
- tags_param = request.args.get("tags", "")
- tags = (
- [t.strip() for t in tags_param.split(",") if t.strip()]
- if tags_param
- else None
- )
- result = N8nService.get_workflows(
- page=page,
- page_size=page_size,
- active=active if active is not None else False,
- tags=tags if tags is not None else [],
- search=search,
- )
- res = success(result, "获取工作流列表成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except N8nClientError as e:
- logger.error(f"获取工作流列表失败: {e.message}")
- res = failed(f"获取工作流列表失败: {e.message}", code=e.status_code or 500)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"获取工作流列表失败: {str(e)}")
- res = failed(f"获取工作流列表失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route("/workflows/<workflow_id>", methods=["GET"])
- def get_workflow(workflow_id):
- """
- 获取工作流详情
- Path Parameters:
- workflow_id: 工作流 ID
- """
- try:
- result = N8nService.get_workflow_by_id(workflow_id)
- res = success(result, "获取工作流详情成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except N8nClientError as e:
- logger.error(f"获取工作流详情失败: {e.message}")
- code = e.status_code or 500
- if e.status_code == 404:
- res = failed("工作流不存在", code=404)
- else:
- res = failed(f"获取工作流详情失败: {e.message}", code=code)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"获取工作流详情失败: {str(e)}")
- res = failed(f"获取工作流详情失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route("/workflows/<workflow_id>/status", methods=["GET"])
- def get_workflow_status(workflow_id):
- """
- 获取工作流状态
- Path Parameters:
- workflow_id: 工作流 ID
- """
- try:
- result = N8nService.get_workflow_status(workflow_id)
- res = success(result, "获取工作流状态成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except N8nClientError as e:
- logger.error(f"获取工作流状态失败: {e.message}")
- code = e.status_code or 500
- if e.status_code == 404:
- res = failed("工作流不存在", code=404)
- else:
- res = failed(f"获取工作流状态失败: {e.message}", code=code)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"获取工作流状态失败: {str(e)}")
- res = failed(f"获取工作流状态失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route("/workflows/<workflow_id>/activate", methods=["POST"])
- def activate_workflow(workflow_id):
- """
- 激活工作流
- Path Parameters:
- workflow_id: 工作流 ID
- """
- try:
- result = N8nService.activate_workflow(workflow_id)
- res = success(result, "工作流激活成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except N8nClientError as e:
- logger.error(f"激活工作流失败: {e.message}")
- code = e.status_code or 500
- res = failed(f"激活工作流失败: {e.message}", code=code)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"激活工作流失败: {str(e)}")
- res = failed(f"激活工作流失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route("/workflows/<workflow_id>/deactivate", methods=["POST"])
- def deactivate_workflow(workflow_id):
- """
- 停用工作流
- Path Parameters:
- workflow_id: 工作流 ID
- """
- try:
- result = N8nService.deactivate_workflow(workflow_id)
- res = success(result, "工作流停用成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except N8nClientError as e:
- logger.error(f"停用工作流失败: {e.message}")
- code = e.status_code or 500
- res = failed(f"停用工作流失败: {e.message}", code=code)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"停用工作流失败: {str(e)}")
- res = failed(f"停用工作流失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # ==================== 执行记录相关接口 ====================
- @bp.route("/workflows/<workflow_id>/executions", methods=["GET"])
- def get_workflow_executions(workflow_id):
- """
- 获取工作流的执行记录列表
- Path Parameters:
- workflow_id: 工作流 ID
- Query Parameters:
- page: 页码,默认 1
- page_size: 每页数量,默认 20
- status: 状态过滤 (success/error/waiting)
- """
- try:
- page = request.args.get("page", 1, type=int)
- page_size = request.args.get("page_size", 20, type=int)
- status = request.args.get("status")
- result = N8nService.get_executions(
- workflow_id=workflow_id,
- status=status if status is not None else "",
- page=page,
- page_size=page_size,
- )
- res = success(result, "获取执行记录列表成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except N8nClientError as e:
- logger.error(f"获取执行记录列表失败: {e.message}")
- res = failed(f"获取执行记录列表失败: {e.message}", code=e.status_code or 500)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"获取执行记录列表失败: {str(e)}")
- res = failed(f"获取执行记录列表失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route("/executions", methods=["GET"])
- def get_all_executions():
- """
- 获取所有执行记录列表
- Query Parameters:
- page: 页码,默认 1
- page_size: 每页数量,默认 20
- workflow_id: 工作流 ID 过滤(可选)
- status: 状态过滤 (success/error/waiting)
- """
- try:
- page = request.args.get("page", 1, type=int)
- page_size = request.args.get("page_size", 20, type=int)
- workflow_id = request.args.get("workflow_id")
- status = request.args.get("status")
- result = N8nService.get_executions(
- workflow_id=workflow_id if workflow_id is not None else "",
- status=status if status is not None else "",
- page=page,
- page_size=page_size,
- )
- res = success(result, "获取执行记录列表成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except N8nClientError as e:
- logger.error(f"获取执行记录列表失败: {e.message}")
- res = failed(f"获取执行记录列表失败: {e.message}", code=e.status_code or 500)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"获取执行记录列表失败: {str(e)}")
- res = failed(f"获取执行记录列表失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route("/executions/<execution_id>", methods=["GET"])
- def get_execution(execution_id):
- """
- 获取执行详情
- Path Parameters:
- execution_id: 执行 ID
- """
- try:
- result = N8nService.get_execution_by_id(execution_id)
- res = success(result, "获取执行详情成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except N8nClientError as e:
- logger.error(f"获取执行详情失败: {e.message}")
- code = e.status_code or 500
- if e.status_code == 404:
- res = failed("执行记录不存在", code=404)
- else:
- res = failed(f"获取执行详情失败: {e.message}", code=code)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"获取执行详情失败: {str(e)}")
- res = failed(f"获取执行详情失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # ==================== 工作流触发接口 ====================
- @bp.route("/workflows/<workflow_id>/execute", methods=["POST"])
- def execute_workflow(workflow_id):
- """
- 触发工作流执行
- Path Parameters:
- workflow_id: 工作流 ID
- Request Body:
- webhook_path: Webhook 路径(必填,如果工作流使用 Webhook 触发器)
- data: 触发数据(可选)
- """
- try:
- json_data = request.get_json() or {}
- webhook_path = json_data.get("webhook_path")
- data = json_data.get("data", {})
- result = N8nService.trigger_workflow(
- workflow_id=workflow_id,
- webhook_path=webhook_path if webhook_path is not None else "",
- data=data,
- )
- if result.get("success"):
- res = success(result, "工作流触发成功")
- else:
- res = failed(result.get("message", "工作流触发失败"), code=400, data=result)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except N8nClientError as e:
- logger.error(f"触发工作流失败: {e.message}")
- res = failed(f"触发工作流失败: {e.message}", code=e.status_code or 500)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"触发工作流失败: {str(e)}")
- res = failed(f"触发工作流失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # ==================== 健康检查接口 ====================
- @bp.route("/health", methods=["GET"])
- def health_check():
- """
- 检查 n8n 服务连接状态
- """
- try:
- result = N8nService.health_check()
- if result.get("connected"):
- res = success(result, "n8n 服务连接正常")
- else:
- res = failed(
- f"n8n 服务连接失败: {result.get('error', '未知错误')}",
- code=503,
- data=result,
- )
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"健康检查失败: {str(e)}")
- res = failed(f"健康检查失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
|