| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- import json
- import logging
- from flask import request
- from app.api.data_flow import bp
- from app.core.data_flow.dataflows import DataFlowService
- from app.core.graph.graph_operations import MyEncoder
- from app.models.result import failed, success
- logger = logging.getLogger(__name__)
- @bp.route("/get-dataflows-list", methods=["GET"])
- def get_dataflows():
- """获取数据流列表"""
- try:
- page = request.args.get("page", 1, type=int)
- page_size = request.args.get("page_size", 10, type=int)
- search = request.args.get("search", "")
- result = DataFlowService.get_dataflows(
- page=page,
- page_size=page_size,
- search=search,
- )
- res = success(result, "success")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"获取数据流列表失败: {str(e)}")
- res = failed(f"获取数据流列表失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route("/get-dataflow/<int:dataflow_id>", methods=["GET"])
- def get_dataflow(dataflow_id):
- """根据ID获取数据流详情"""
- try:
- result = DataFlowService.get_dataflow_by_id(dataflow_id)
- if result:
- res = success(result, "success")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- else:
- res = failed("数据流不存在", code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"获取数据流详情失败: {str(e)}")
- res = failed(f"获取数据流详情失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route("/add-dataflow", methods=["POST"])
- def create_dataflow():
- """创建新的数据流"""
- try:
- data = request.get_json()
- if not data:
- res = failed("请求数据不能为空", code=400)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- result = DataFlowService.create_dataflow(data)
- res = success(result, "数据流创建成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except ValueError as ve:
- logger.error(f"创建数据流参数错误: {str(ve)}")
- res = failed(f"参数错误: {str(ve)}", code=400)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"创建数据流失败: {str(e)}")
- res = failed(f"创建数据流失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route("/update-dataflow/<int:dataflow_id>", methods=["PUT"])
- def update_dataflow(dataflow_id):
- """更新数据流"""
- try:
- data = request.get_json()
- if not data:
- res = failed("请求数据不能为空", code=400)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- result = DataFlowService.update_dataflow(dataflow_id, data)
- if result:
- res = success(result, "数据流更新成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- else:
- res = failed("数据流不存在", code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"更新数据流失败: {str(e)}")
- res = failed(f"更新数据流失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route("/delete-dataflow/<int:dataflow_id>", methods=["DELETE"])
- def delete_dataflow(dataflow_id):
- """删除数据流"""
- try:
- result = DataFlowService.delete_dataflow(dataflow_id)
- if result:
- res = success({}, "数据流删除成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- else:
- res = failed("数据流不存在", code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"删除数据流失败: {str(e)}")
- res = failed(f"删除数据流失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route("/execute-dataflow/<int:dataflow_id>", methods=["POST"])
- def execute_dataflow(dataflow_id):
- """执行数据流"""
- try:
- data = request.get_json() or {}
- result = DataFlowService.execute_dataflow(dataflow_id, data)
- res = success(result, "数据流执行成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"执行数据流失败: {str(e)}")
- res = failed(f"执行数据流失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route("/get-dataflow-status/<int:dataflow_id>", methods=["GET"])
- def get_dataflow_status(dataflow_id):
- """获取数据流执行状态"""
- try:
- result = DataFlowService.get_dataflow_status(dataflow_id)
- res = success(result, "success")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"获取数据流状态失败: {str(e)}")
- res = failed(f"获取数据流状态失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route("/get-dataflow-logs/<int:dataflow_id>", methods=["GET"])
- def get_dataflow_logs(dataflow_id):
- """获取数据流执行日志"""
- try:
- page = request.args.get("page", 1, type=int)
- page_size = request.args.get("page_size", 50, type=int)
- result = DataFlowService.get_dataflow_logs(
- dataflow_id,
- page=page,
- page_size=page_size,
- )
- res = success(result, "success")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"获取数据流日志失败: {str(e)}")
- res = failed(f"获取数据流日志失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route("/get-BD-list", methods=["GET"])
- def get_business_domain_list():
- """获取BusinessDomain节点列表"""
- try:
- logger.info("接收到获取BusinessDomain列表请求")
- # 调用服务层函数获取BusinessDomain列表
- bd_list = DataFlowService.get_business_domain_list()
- res = success(bd_list, "操作成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"获取BusinessDomain列表失败: {str(e)}")
- res = failed(f"获取BusinessDomain列表失败: {str(e)}", 500, {})
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route("/get-script/<int:dataflow_id>", methods=["GET"])
- def get_script(dataflow_id):
- """
- 获取 DataFlow 关联的脚本内容
- Args:
- dataflow_id: DataFlow 节点的 ID
- Returns:
- 包含脚本内容和元信息的 JSON 响应:
- - script_path: 脚本路径
- - script_content: 脚本内容
- - script_type: 脚本类型(python/javascript/sql等)
- - dataflow_id: DataFlow ID
- - dataflow_name: DataFlow 中文名称
- - dataflow_name_en: DataFlow 英文名称
- """
- try:
- logger.info(f"接收到获取脚本请求, DataFlow ID: {dataflow_id}")
- result = DataFlowService.get_script_content(dataflow_id)
- res = success(result, "获取脚本成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except ValueError as ve:
- logger.warning(f"获取脚本参数错误: {str(ve)}")
- res = failed(f"{str(ve)}", code=400)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except FileNotFoundError as fe:
- logger.warning(f"脚本文件不存在: {str(fe)}")
- res = failed(f"脚本文件不存在: {str(fe)}", code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"获取脚本失败: {str(e)}")
- res = failed(f"获取脚本失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
|