123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- import json
- import logging
- from flask import request, jsonify
- from app.api.production_line import bp
- from app.models.result import success, failed
- from app.api.graph.routes import MyEncoder, connect_graph
- from app.core.production_line import production_draw_graph
- from app.core.production_line.production_line import execute_production_line
- logger = logging.getLogger(__name__)
- # 生产线列表
- @bp.route('/production/line/list', methods=['POST'])
- def production_line_list():
- """
- 获取生产线列表,支持分页和名称过滤
-
- Args (通过JSON请求体):
- current (int): 当前页码,默认为1
- size (int): 每页大小,默认为10
- name (str, optional): 名称过滤条件
-
- Returns:
- JSON: 包含生产线列表和分页信息的响应
- """
- try:
- receiver = request.get_json()
- page = int(receiver.get('current', 1))
- page_size = int(receiver.get('size', 10))
- name_filter = receiver.get('name', None)
- # 计算跳过的记录的数量
- skip_count = (page - 1) * page_size
- if name_filter:
- where_clause = f"n.name CONTAINS'{name_filter}'"
- else:
- where_clause = "TRUE"
- cql = f"""
- MATCH (n)
- WHERE (n:data_model OR n:data_resource OR n:data_metric) AND {where_clause}
- WITH id(n) AS id, n.name AS name, labels(n)[0] AS type,n
- RETURN {{
- id: id,
- name: name,
- type: type
- }} AS result,n.time as time
- ORDER BY time desc
- SKIP {skip_count}
- LIMIT {page_size}
- """
- data = connect_graph.run(cql).data()
- records = []
- for item in data:
- records.append(item['result'])
- # 获取总量
- total_query = f"MATCH (n) WHERE (n:data_model OR n:data_resource OR n:data_metric) AND {where_clause}" \
- f" RETURN COUNT(n) AS total"
- total_result = connect_graph.run(total_query).evaluate()
- response_data = {'records': records, 'total': total_result, 'size': page_size, 'current': page}
- res = success(response_data, "success")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- res = failed({}, {"error": f"{e}"})
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # 根据生产线列表,传入id,绘制图谱
- @bp.route('/production/line/graph', methods=['POST'])
- def production_line_graph():
- """
- 根据生产线ID绘制关系图谱
-
- Args (通过JSON请求体):
- id (int): 节点ID
-
- Returns:
- JSON: 包含图谱数据的响应
- """
- # 传入请求参数
- receiver = request.get_json()
- id = receiver['id']
- try:
- cql = """
- MATCH (n) where id(n) = $nodeId return labels(n)[0] as type
- """
- type = connect_graph.run(cql, nodeId=id).evaluate()
- data = production_draw_graph(id, type)
- res = success(data, "success")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- res = failed({}, {"error": f"{e}"})
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- """
- Manual execution API endpoint
- Author: paul
- Date: 2024-03-20
- """
- @bp.route('/production/line/execute', methods=['POST'])
- def production_line_execute():
- """
- 手动执行数据资源加载
-
- Args (通过JSON请求体):
- id (int): 数据资源ID
-
- Returns:
- JSON: 执行结果
- """
- try:
- # 获取资源ID
- resource_id = request.json.get('id')
- if resource_id is None: # 修改检查逻辑,只有当ID为None时才报错
- return jsonify(failed("资源ID不能为空"))
-
- # 执行加载
- result = execute_production_line(resource_id)
-
- if result['status'] == 'success':
- return jsonify(success(result))
- else:
- return jsonify(failed(result['message']))
-
- except Exception as e:
- logger.error(f"执行数据资源加载失败: {str(e)}")
- return jsonify(failed(str(e)))
|