import json import logging from flask import request, jsonify from app.api.production_line import bp from app.models.result import success, failed from app.api.graph.routes import MyEncoder, connect_graph from app.core.production_line import production_draw_graph from app.core.production_line.production_line import execute_production_line logger = logging.getLogger(__name__) # 生产线列表 @bp.route('/production/line/list', methods=['POST']) def production_line_list(): """ 获取生产线列表,支持分页和名称过滤 Args (通过JSON请求体): current (int): 当前页码,默认为1 size (int): 每页大小,默认为10 name (str, optional): 名称过滤条件 Returns: JSON: 包含生产线列表和分页信息的响应 """ try: receiver = request.get_json() page = int(receiver.get('current', 1)) page_size = int(receiver.get('size', 10)) name_filter = receiver.get('name', None) # 计算跳过的记录的数量 skip_count = (page - 1) * page_size if name_filter: where_clause = f"n.name CONTAINS'{name_filter}'" else: where_clause = "TRUE" cql = f""" MATCH (n) WHERE (n:data_model OR n:data_resource OR n:data_metric) AND {where_clause} WITH id(n) AS id, n.name AS name, labels(n)[0] AS type,n RETURN {{ id: id, name: name, type: type }} AS result,n.time as time ORDER BY time desc SKIP {skip_count} LIMIT {page_size} """ data = connect_graph.run(cql).data() records = [] for item in data: records.append(item['result']) # 获取总量 total_query = f"MATCH (n) WHERE (n:data_model OR n:data_resource OR n:data_metric) AND {where_clause}" \ f" RETURN COUNT(n) AS total" total_result = connect_graph.run(total_query).evaluate() response_data = {'records': records, 'total': total_result, 'size': page_size, 'current': page} res = success(response_data, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # 根据生产线列表,传入id,绘制图谱 @bp.route('/production/line/graph', methods=['POST']) def production_line_graph(): """ 根据生产线ID绘制关系图谱 Args (通过JSON请求体): id (int): 节点ID Returns: JSON: 包含图谱数据的响应 """ # 传入请求参数 receiver = request.get_json() id = receiver['id'] try: cql = """ MATCH (n) where id(n) = $nodeId return labels(n)[0] as type """ type = connect_graph.run(cql, nodeId=id).evaluate() data = production_draw_graph(id, type) res = success(data, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) """ Manual execution API endpoint Author: paul Date: 2024-03-20 """ @bp.route('/production/line/execute', methods=['POST']) def production_line_execute(): """ 手动执行数据资源加载 Args (通过JSON请求体): id (int): 数据资源ID Returns: JSON: 执行结果 """ try: # 获取资源ID resource_id = request.json.get('id') if resource_id is None: # 修改检查逻辑,只有当ID为None时才报错 return jsonify(failed("资源ID不能为空")) # 执行加载 result = execute_production_line(resource_id) if result['status'] == 'success': return jsonify(success(result)) else: return jsonify(failed(result['message'])) except Exception as e: logger.error(f"执行数据资源加载失败: {str(e)}") return jsonify(failed(str(e)))