import json import logging from flask import request from app.api.data_flow import bp from app.core.data_flow.dataflows import DataFlowService from app.core.graph.graph_operations import MyEncoder from app.models.result import failed, success logger = logging.getLogger(__name__) @bp.route("/get-dataflows-list", methods=["GET"]) def get_dataflows(): """获取数据流列表""" try: page = request.args.get("page", 1, type=int) page_size = request.args.get("page_size", 10, type=int) search = request.args.get("search", "") result = DataFlowService.get_dataflows( page=page, page_size=page_size, search=search, ) res = success(result, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"获取数据流列表失败: {str(e)}") res = failed(f"获取数据流列表失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route("/get-dataflow/", methods=["GET"]) def get_dataflow(dataflow_id): """根据ID获取数据流详情""" try: result = DataFlowService.get_dataflow_by_id(dataflow_id) if result: res = success(result, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) else: res = failed("数据流不存在", code=404) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"获取数据流详情失败: {str(e)}") res = failed(f"获取数据流详情失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route("/add-dataflow", methods=["POST"]) def create_dataflow(): """创建新的数据流""" try: data = request.get_json() if not data: res = failed("请求数据不能为空", code=400) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) result = DataFlowService.create_dataflow(data) res = success(result, "数据流创建成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except ValueError as ve: logger.error(f"创建数据流参数错误: {str(ve)}") res = failed(f"参数错误: {str(ve)}", code=400) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"创建数据流失败: {str(e)}") res = failed(f"创建数据流失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route("/update-dataflow/", methods=["PUT"]) def update_dataflow(dataflow_id): """更新数据流""" try: data = request.get_json() if not data: res = failed("请求数据不能为空", code=400) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) result = DataFlowService.update_dataflow(dataflow_id, data) if result: res = success(result, "数据流更新成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) else: res = failed("数据流不存在", code=404) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"更新数据流失败: {str(e)}") res = failed(f"更新数据流失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route("/delete-dataflow/", methods=["DELETE"]) def delete_dataflow(dataflow_id): """删除数据流""" try: result = DataFlowService.delete_dataflow(dataflow_id) if result: res = success({}, "数据流删除成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) else: res = failed("数据流不存在", code=404) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"删除数据流失败: {str(e)}") res = failed(f"删除数据流失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route("/execute-dataflow/", methods=["POST"]) def execute_dataflow(dataflow_id): """执行数据流""" try: data = request.get_json() or {} result = DataFlowService.execute_dataflow(dataflow_id, data) res = success(result, "数据流执行成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"执行数据流失败: {str(e)}") res = failed(f"执行数据流失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route("/get-dataflow-status/", methods=["GET"]) def get_dataflow_status(dataflow_id): """获取数据流执行状态""" try: result = DataFlowService.get_dataflow_status(dataflow_id) res = success(result, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"获取数据流状态失败: {str(e)}") res = failed(f"获取数据流状态失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route("/get-dataflow-logs/", methods=["GET"]) def get_dataflow_logs(dataflow_id): """获取数据流执行日志""" try: page = request.args.get("page", 1, type=int) page_size = request.args.get("page_size", 50, type=int) result = DataFlowService.get_dataflow_logs( dataflow_id, page=page, page_size=page_size, ) res = success(result, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"获取数据流日志失败: {str(e)}") res = failed(f"获取数据流日志失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route("/get-BD-list", methods=["GET"]) def get_business_domain_list(): """获取BusinessDomain节点列表""" try: logger.info("接收到获取BusinessDomain列表请求") # 调用服务层函数获取BusinessDomain列表 bd_list = DataFlowService.get_business_domain_list() res = success(bd_list, "操作成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"获取BusinessDomain列表失败: {str(e)}") res = failed(f"获取BusinessDomain列表失败: {str(e)}", 500, {}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route("/get-script/", methods=["GET"]) def get_script(dataflow_id): """ 获取 DataFlow 关联的脚本内容 Args: dataflow_id: DataFlow 节点的 ID Returns: 包含脚本内容和元信息的 JSON 响应: - script_path: 脚本路径 - script_content: 脚本内容 - script_type: 脚本类型(python/javascript/sql等) - dataflow_id: DataFlow ID - dataflow_name: DataFlow 中文名称 - dataflow_name_en: DataFlow 英文名称 """ try: logger.info(f"接收到获取脚本请求, DataFlow ID: {dataflow_id}") result = DataFlowService.get_script_content(dataflow_id) res = success(result, "获取脚本成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except ValueError as ve: logger.warning(f"获取脚本参数错误: {str(ve)}") res = failed(f"{str(ve)}", code=400) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except FileNotFoundError as fe: logger.warning(f"脚本文件不存在: {str(fe)}") res = failed(f"脚本文件不存在: {str(fe)}", code=404) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"获取脚本失败: {str(e)}") res = failed(f"获取脚本失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder)