routes.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. import json
  2. import logging
  3. from flask import request, jsonify
  4. from app.api.production_line import bp
  5. from app.models.result import success, failed
  6. from app.api.graph.routes import MyEncoder, connect_graph
  7. from app.core.production_line import production_draw_graph
  8. from app.core.production_line.production_line import execute_production_line
  9. logger = logging.getLogger(__name__)
  10. # 生产线列表
  11. @bp.route('/production/line/list', methods=['POST'])
  12. def production_line_list():
  13. """
  14. 获取生产线列表,支持分页和名称过滤
  15. Args (通过JSON请求体):
  16. current (int): 当前页码,默认为1
  17. size (int): 每页大小,默认为10
  18. name (str, optional): 名称过滤条件
  19. Returns:
  20. JSON: 包含生产线列表和分页信息的响应
  21. """
  22. try:
  23. receiver = request.get_json()
  24. page = int(receiver.get('current', 1))
  25. page_size = int(receiver.get('size', 10))
  26. name_filter = receiver.get('name', None)
  27. # 计算跳过的记录的数量
  28. skip_count = (page - 1) * page_size
  29. if name_filter:
  30. where_clause = f"n.name CONTAINS'{name_filter}'"
  31. else:
  32. where_clause = "TRUE"
  33. cql = f"""
  34. MATCH (n)
  35. WHERE (n:data_model OR n:data_resource OR n:data_metric) AND {where_clause}
  36. WITH id(n) AS id, n.name AS name, labels(n)[0] AS type,n
  37. RETURN {{
  38. id: id,
  39. name: name,
  40. type: type
  41. }} AS result,n.time as time
  42. ORDER BY time desc
  43. SKIP {skip_count}
  44. LIMIT {page_size}
  45. """
  46. # 修复:使用正确的session方式执行查询
  47. driver = connect_graph()
  48. if not driver:
  49. return json.dumps(failed("无法连接到数据库"), ensure_ascii=False, cls=MyEncoder)
  50. with driver.session() as session:
  51. result = session.run(cql)
  52. data = result.data()
  53. records = []
  54. for item in data:
  55. records.append(item['result'])
  56. # 获取总量
  57. total_query = f"MATCH (n) WHERE (n:data_model OR n:data_resource OR n:data_metric) AND {where_clause}" \
  58. f" RETURN COUNT(n) AS total"
  59. total_result = session.run(total_query).single()["total"]
  60. response_data = {'records': records, 'total': total_result, 'size': page_size, 'current': page}
  61. res = success(response_data, "success")
  62. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  63. except Exception as e:
  64. res = failed({}, {"error": f"{e}"})
  65. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  66. # 根据生产线列表,传入id,绘制图谱
  67. @bp.route('/production/line/graph', methods=['POST'])
  68. def production_line_graph():
  69. """
  70. 根据生产线ID绘制关系图谱
  71. Args (通过JSON请求体):
  72. id (int): 节点ID
  73. Returns:
  74. JSON: 包含图谱数据的响应
  75. """
  76. try:
  77. # 获取请求参数
  78. receiver = request.get_json()
  79. if not receiver or 'id' not in receiver:
  80. return json.dumps(failed("缺少必要参数: id"), ensure_ascii=False, cls=MyEncoder)
  81. id = receiver['id']
  82. # 修改: 专门处理ID为0的情况,将ID类型视为整数
  83. if id is None:
  84. return json.dumps(failed("节点ID不能为空"), ensure_ascii=False, cls=MyEncoder)
  85. # 确保ID是整数类型
  86. try:
  87. id = int(id)
  88. except (ValueError, TypeError):
  89. return json.dumps(failed("节点ID必须是整数"), ensure_ascii=False, cls=MyEncoder)
  90. # 修复:使用正确的session方式执行查询
  91. driver = connect_graph()
  92. if not driver:
  93. return json.dumps(failed("无法连接到数据库"), ensure_ascii=False, cls=MyEncoder)
  94. with driver.session() as session:
  95. # 检查节点是否存在
  96. check_query = """
  97. MATCH (n) WHERE id(n) = $nodeId
  98. RETURN labels(n)[0] as type, n.name as name
  99. """
  100. result = session.run(check_query, nodeId=id)
  101. record = result.single()
  102. if not record:
  103. return json.dumps(failed(f"节点不存在: ID={id}"), ensure_ascii=False, cls=MyEncoder)
  104. type = record["type"]
  105. # 生成图谱
  106. data = production_draw_graph(id, type)
  107. # 检查返回结果是否包含错误
  108. if "error" in data:
  109. error_msg = data.pop("error")
  110. return json.dumps(failed(error_msg, data), ensure_ascii=False, cls=MyEncoder)
  111. return json.dumps(success(data, "success"), ensure_ascii=False, cls=MyEncoder)
  112. except Exception as e:
  113. logger.error(f"生成图谱失败: {str(e)}")
  114. return json.dumps(failed(str(e)), ensure_ascii=False, cls=MyEncoder)
  115. """
  116. Manual execution API endpoint
  117. Author: paul
  118. Date: 2024-03-20
  119. """
  120. @bp.route('/production/line/execute', methods=['POST'])
  121. def production_line_execute():
  122. """
  123. 手动执行数据资源加载
  124. Args (通过JSON请求体):
  125. id (int): 数据资源ID
  126. Returns:
  127. JSON: 执行结果
  128. """
  129. try:
  130. # 获取资源ID
  131. resource_id = request.json.get('id')
  132. if resource_id is None: # 修改检查逻辑,只有当ID为None时才报错
  133. return jsonify(failed("资源ID不能为空"))
  134. # 执行加载
  135. result = execute_production_line(resource_id)
  136. if result['status'] == 'success':
  137. return jsonify(success(result))
  138. else:
  139. return jsonify(failed(result['message']))
  140. except Exception as e:
  141. logger.error(f"执行数据资源加载失败: {str(e)}")
  142. return jsonify(failed(str(e)))