routes.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. import json
  2. import logging
  3. from flask import request
  4. from app.api.data_flow import bp
  5. from app.core.data_flow.dataflows import DataFlowService
  6. from app.core.graph.graph_operations import MyEncoder
  7. from app.models.result import failed, success
  8. logger = logging.getLogger(__name__)
  9. @bp.route("/get-dataflows-list", methods=["GET"])
  10. def get_dataflows():
  11. """获取数据流列表"""
  12. try:
  13. page = request.args.get("page", 1, type=int)
  14. page_size = request.args.get("page_size", 10, type=int)
  15. search = request.args.get("search", "")
  16. result = DataFlowService.get_dataflows(
  17. page=page,
  18. page_size=page_size,
  19. search=search,
  20. )
  21. res = success(result, "success")
  22. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  23. except Exception as e:
  24. logger.error(f"获取数据流列表失败: {str(e)}")
  25. res = failed(f"获取数据流列表失败: {str(e)}")
  26. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  27. @bp.route("/get-dataflow/<int:dataflow_id>", methods=["GET"])
  28. def get_dataflow(dataflow_id):
  29. """根据ID获取数据流详情"""
  30. try:
  31. result = DataFlowService.get_dataflow_by_id(dataflow_id)
  32. if result:
  33. res = success(result, "success")
  34. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  35. else:
  36. res = failed("数据流不存在", code=404)
  37. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  38. except Exception as e:
  39. logger.error(f"获取数据流详情失败: {str(e)}")
  40. res = failed(f"获取数据流详情失败: {str(e)}")
  41. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  42. @bp.route("/add-dataflow", methods=["POST"])
  43. def create_dataflow():
  44. """创建新的数据流"""
  45. try:
  46. data = request.get_json()
  47. if not data:
  48. res = failed("请求数据不能为空", code=400)
  49. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  50. result = DataFlowService.create_dataflow(data)
  51. res = success(result, "数据流创建成功")
  52. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  53. except ValueError as ve:
  54. logger.error(f"创建数据流参数错误: {str(ve)}")
  55. res = failed(f"参数错误: {str(ve)}", code=400)
  56. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  57. except Exception as e:
  58. logger.error(f"创建数据流失败: {str(e)}")
  59. res = failed(f"创建数据流失败: {str(e)}")
  60. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  61. @bp.route("/update-dataflow/<int:dataflow_id>", methods=["PUT"])
  62. def update_dataflow(dataflow_id):
  63. """更新数据流"""
  64. try:
  65. data = request.get_json()
  66. if not data:
  67. res = failed("请求数据不能为空", code=400)
  68. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  69. result = DataFlowService.update_dataflow(dataflow_id, data)
  70. if result:
  71. res = success(result, "数据流更新成功")
  72. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  73. else:
  74. res = failed("数据流不存在", code=404)
  75. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  76. except Exception as e:
  77. logger.error(f"更新数据流失败: {str(e)}")
  78. res = failed(f"更新数据流失败: {str(e)}")
  79. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  80. @bp.route("/delete-dataflow/<int:dataflow_id>", methods=["DELETE"])
  81. def delete_dataflow(dataflow_id):
  82. """删除数据流"""
  83. try:
  84. result = DataFlowService.delete_dataflow(dataflow_id)
  85. if result:
  86. res = success({}, "数据流删除成功")
  87. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  88. else:
  89. res = failed("数据流不存在", code=404)
  90. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  91. except Exception as e:
  92. logger.error(f"删除数据流失败: {str(e)}")
  93. res = failed(f"删除数据流失败: {str(e)}")
  94. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  95. @bp.route("/execute-dataflow/<int:dataflow_id>", methods=["POST"])
  96. def execute_dataflow(dataflow_id):
  97. """执行数据流"""
  98. try:
  99. data = request.get_json() or {}
  100. result = DataFlowService.execute_dataflow(dataflow_id, data)
  101. res = success(result, "数据流执行成功")
  102. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  103. except Exception as e:
  104. logger.error(f"执行数据流失败: {str(e)}")
  105. res = failed(f"执行数据流失败: {str(e)}")
  106. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  107. @bp.route("/get-dataflow-status/<int:dataflow_id>", methods=["GET"])
  108. def get_dataflow_status(dataflow_id):
  109. """获取数据流执行状态"""
  110. try:
  111. result = DataFlowService.get_dataflow_status(dataflow_id)
  112. res = success(result, "success")
  113. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  114. except Exception as e:
  115. logger.error(f"获取数据流状态失败: {str(e)}")
  116. res = failed(f"获取数据流状态失败: {str(e)}")
  117. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  118. @bp.route("/get-dataflow-logs/<int:dataflow_id>", methods=["GET"])
  119. def get_dataflow_logs(dataflow_id):
  120. """获取数据流执行日志"""
  121. try:
  122. page = request.args.get("page", 1, type=int)
  123. page_size = request.args.get("page_size", 50, type=int)
  124. result = DataFlowService.get_dataflow_logs(
  125. dataflow_id,
  126. page=page,
  127. page_size=page_size,
  128. )
  129. res = success(result, "success")
  130. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  131. except Exception as e:
  132. logger.error(f"获取数据流日志失败: {str(e)}")
  133. res = failed(f"获取数据流日志失败: {str(e)}")
  134. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  135. @bp.route("/get-BD-list", methods=["GET"])
  136. def get_business_domain_list():
  137. """获取BusinessDomain节点列表"""
  138. try:
  139. logger.info("接收到获取BusinessDomain列表请求")
  140. # 调用服务层函数获取BusinessDomain列表
  141. bd_list = DataFlowService.get_business_domain_list()
  142. res = success(bd_list, "操作成功")
  143. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  144. except Exception as e:
  145. logger.error(f"获取BusinessDomain列表失败: {str(e)}")
  146. res = failed(f"获取BusinessDomain列表失败: {str(e)}", 500, {})
  147. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  148. @bp.route("/get-script/<int:dataflow_id>", methods=["GET"])
  149. def get_script(dataflow_id):
  150. """
  151. 获取 DataFlow 关联的脚本内容
  152. Args:
  153. dataflow_id: DataFlow 节点的 ID
  154. Returns:
  155. 包含脚本内容和元信息的 JSON 响应:
  156. - script_path: 脚本路径
  157. - script_content: 脚本内容
  158. - script_type: 脚本类型(python/javascript/sql等)
  159. - dataflow_id: DataFlow ID
  160. - dataflow_name: DataFlow 中文名称
  161. - dataflow_name_en: DataFlow 英文名称
  162. """
  163. try:
  164. logger.info(f"接收到获取脚本请求, DataFlow ID: {dataflow_id}")
  165. result = DataFlowService.get_script_content(dataflow_id)
  166. res = success(result, "获取脚本成功")
  167. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  168. except ValueError as ve:
  169. logger.warning(f"获取脚本参数错误: {str(ve)}")
  170. res = failed(f"{str(ve)}", code=400)
  171. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  172. except FileNotFoundError as fe:
  173. logger.warning(f"脚本文件不存在: {str(fe)}")
  174. res = failed(f"脚本文件不存在: {str(fe)}", code=404)
  175. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  176. except Exception as e:
  177. logger.error(f"获取脚本失败: {str(e)}")
  178. res = failed(f"获取脚本失败: {str(e)}")
  179. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)