""" Data Factory API 路由 提供 n8n 工作流管理相关接口 """ import json import logging from flask import request from app.api.data_factory import bp from app.core.data_factory.n8n_client import N8nClientError from app.core.data_factory.n8n_service import N8nService from app.core.graph.graph_operations import MyEncoder from app.models.result import failed, success logger = logging.getLogger(__name__) # ==================== 工作流相关接口 ==================== @bp.route("/workflows", methods=["GET"]) def get_workflows(): """ 获取工作流列表 Query Parameters: page: 页码,默认 1 page_size: 每页数量,默认 20 active: 过滤活跃状态 (true/false) search: 搜索关键词 tags: 标签过滤,逗号分隔 """ try: page = request.args.get("page", 1, type=int) page_size = request.args.get("page_size", 20, type=int) search = request.args.get("search", "") # 处理 active 参数 active_param = request.args.get("active") active = None if active_param is not None: active = active_param.lower() == "true" # 处理 tags 参数 tags_param = request.args.get("tags", "") tags = ( [t.strip() for t in tags_param.split(",") if t.strip()] if tags_param else None ) result = N8nService.get_workflows( page=page, page_size=page_size, active=active if active is not None else False, tags=tags if tags is not None else [], search=search, ) res = success(result, "获取工作流列表成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except N8nClientError as e: logger.error(f"获取工作流列表失败: {e.message}") res = failed(f"获取工作流列表失败: {e.message}", code=e.status_code or 500) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"获取工作流列表失败: {str(e)}") res = failed(f"获取工作流列表失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route("/workflows/", methods=["GET"]) def get_workflow(workflow_id): """ 获取工作流详情 Path Parameters: workflow_id: 工作流 ID """ try: result = N8nService.get_workflow_by_id(workflow_id) res = success(result, "获取工作流详情成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except N8nClientError as e: logger.error(f"获取工作流详情失败: {e.message}") code = e.status_code or 500 if e.status_code == 404: res = failed("工作流不存在", code=404) else: res = failed(f"获取工作流详情失败: {e.message}", code=code) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"获取工作流详情失败: {str(e)}") res = failed(f"获取工作流详情失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route("/workflows//status", methods=["GET"]) def get_workflow_status(workflow_id): """ 获取工作流状态 Path Parameters: workflow_id: 工作流 ID """ try: result = N8nService.get_workflow_status(workflow_id) res = success(result, "获取工作流状态成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except N8nClientError as e: logger.error(f"获取工作流状态失败: {e.message}") code = e.status_code or 500 if e.status_code == 404: res = failed("工作流不存在", code=404) else: res = failed(f"获取工作流状态失败: {e.message}", code=code) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"获取工作流状态失败: {str(e)}") res = failed(f"获取工作流状态失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route("/workflows//activate", methods=["POST"]) def activate_workflow(workflow_id): """ 激活工作流 Path Parameters: workflow_id: 工作流 ID """ try: result = N8nService.activate_workflow(workflow_id) res = success(result, "工作流激活成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except N8nClientError as e: logger.error(f"激活工作流失败: {e.message}") code = e.status_code or 500 res = failed(f"激活工作流失败: {e.message}", code=code) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"激活工作流失败: {str(e)}") res = failed(f"激活工作流失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route("/workflows//deactivate", methods=["POST"]) def deactivate_workflow(workflow_id): """ 停用工作流 Path Parameters: workflow_id: 工作流 ID """ try: result = N8nService.deactivate_workflow(workflow_id) res = success(result, "工作流停用成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except N8nClientError as e: logger.error(f"停用工作流失败: {e.message}") code = e.status_code or 500 res = failed(f"停用工作流失败: {e.message}", code=code) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"停用工作流失败: {str(e)}") res = failed(f"停用工作流失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # ==================== 执行记录相关接口 ==================== @bp.route("/workflows//executions", methods=["GET"]) def get_workflow_executions(workflow_id): """ 获取工作流的执行记录列表 Path Parameters: workflow_id: 工作流 ID Query Parameters: page: 页码,默认 1 page_size: 每页数量,默认 20 status: 状态过滤 (success/error/waiting) """ try: page = request.args.get("page", 1, type=int) page_size = request.args.get("page_size", 20, type=int) status = request.args.get("status") result = N8nService.get_executions( workflow_id=workflow_id, status=status if status is not None else "", page=page, page_size=page_size, ) res = success(result, "获取执行记录列表成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except N8nClientError as e: logger.error(f"获取执行记录列表失败: {e.message}") res = failed(f"获取执行记录列表失败: {e.message}", code=e.status_code or 500) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"获取执行记录列表失败: {str(e)}") res = failed(f"获取执行记录列表失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route("/executions", methods=["GET"]) def get_all_executions(): """ 获取所有执行记录列表 Query Parameters: page: 页码,默认 1 page_size: 每页数量,默认 20 workflow_id: 工作流 ID 过滤(可选) status: 状态过滤 (success/error/waiting) """ try: page = request.args.get("page", 1, type=int) page_size = request.args.get("page_size", 20, type=int) workflow_id = request.args.get("workflow_id") status = request.args.get("status") result = N8nService.get_executions( workflow_id=workflow_id if workflow_id is not None else "", status=status if status is not None else "", page=page, page_size=page_size, ) res = success(result, "获取执行记录列表成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except N8nClientError as e: logger.error(f"获取执行记录列表失败: {e.message}") res = failed(f"获取执行记录列表失败: {e.message}", code=e.status_code or 500) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"获取执行记录列表失败: {str(e)}") res = failed(f"获取执行记录列表失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route("/executions/", methods=["GET"]) def get_execution(execution_id): """ 获取执行详情 Path Parameters: execution_id: 执行 ID """ try: result = N8nService.get_execution_by_id(execution_id) res = success(result, "获取执行详情成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except N8nClientError as e: logger.error(f"获取执行详情失败: {e.message}") code = e.status_code or 500 if e.status_code == 404: res = failed("执行记录不存在", code=404) else: res = failed(f"获取执行详情失败: {e.message}", code=code) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"获取执行详情失败: {str(e)}") res = failed(f"获取执行详情失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # ==================== 工作流触发接口 ==================== @bp.route("/workflows//execute", methods=["POST"]) def execute_workflow(workflow_id): """ 触发工作流执行 Path Parameters: workflow_id: 工作流 ID Request Body: webhook_path: Webhook 路径(必填,如果工作流使用 Webhook 触发器) data: 触发数据(可选) """ try: json_data = request.get_json() or {} webhook_path = json_data.get("webhook_path") data = json_data.get("data", {}) result = N8nService.trigger_workflow( workflow_id=workflow_id, webhook_path=webhook_path if webhook_path is not None else "", data=data, ) if result.get("success"): res = success(result, "工作流触发成功") else: res = failed(result.get("message", "工作流触发失败"), code=400, data=result) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except N8nClientError as e: logger.error(f"触发工作流失败: {e.message}") res = failed(f"触发工作流失败: {e.message}", code=e.status_code or 500) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"触发工作流失败: {str(e)}") res = failed(f"触发工作流失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # ==================== 健康检查接口 ==================== @bp.route("/health", methods=["GET"]) def health_check(): """ 检查 n8n 服务连接状态 """ try: result = N8nService.health_check() if result.get("connected"): res = success(result, "n8n 服务连接正常") else: res = failed( f"n8n 服务连接失败: {result.get('error', '未知错误')}", code=503, data=result, ) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"健康检查失败: {str(e)}") res = failed(f"健康检查失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder)