""" 数据指标API接口模块 该模块提供了数据指标的各种API接口,包括: - 指标新增、更新和查询 - 指标列表展示 - 指标图谱生成 - 指标代码生成 - 指标关系管理 """ from flask import request, jsonify import json from app.api.data_metric import bp from app.core.data_metric.metric_interface import ( metric_list, handle_metric_relation, handle_data_metric, handle_meta_data_metric, handle_id_metric, metric_kinship_graph, metric_impact_graph, metric_all_graph, data_metric_edit ) from app.core.llm import code_generate_metric from app.core.meta_data import translate_and_parse from app.models.result import success, failed from app.core.graph.graph_operations import MyEncoder, connect_graph @bp.route('/data/metric/relation', methods=['POST']) def data_metric_relation(): """ 处理数据指标血缘关系 请求参数: - id: 数据模型ID 返回: - 处理结果,包含血缘关系数据 """ try: # 传入请求参数 receiver = request.get_json() model_ids = receiver['id'] # 给定一个数据模型的id response_data = handle_metric_relation(model_ids) res = success(response_data, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route('/data/metric/add', methods=['POST']) def data_metric_add(): """ 新增数据指标 请求参数: - name: 指标名称 - 其他指标相关属性 返回: - 处理结果,成功或失败 """ # 传入请求参数 receiver = request.get_json() metric_name = receiver['name'] # 数据指标的name try: result_list = translate_and_parse(metric_name) id, id_list = handle_data_metric(metric_name, result_list, receiver) handle_meta_data_metric(id, id_list) res = success({}, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route('/data/metric/code', methods=['POST']) def data_metric_code(): """ 生成指标计算的代码 请求参数: - content: 指标规则描述 - relation: 映射关系 返回: - 生成的代码 """ # 传入请求参数 receiver = request.get_json() content = receiver['content'] # 指标规则描述 relation = receiver['relation'] # 映射关系 try: id_list = [item["id"] for item in relation] cql = """ MATCH (n:meta_node) WHERE id(n) IN $Id_list WITH n.en_name AS en_name, n.name AS name WITH collect({en_name: en_name, name: name}) AS res WITH reduce(acc = {}, item IN res | apoc.map.setKey(acc, item.en_name, item.name)) AS result RETURN result """ id_relation = connect_graph.run(cql, Id_list=id_list).evaluate() result = code_generate_metric(content, id_relation) res = success(result, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route('/data/metric/detail', methods=['POST']) def data_metric_detail(): """ 获取数据指标详情 请求参数: - id: 指标ID 返回: - 指标详情数据 """ try: # 传入请求参数 receiver = request.get_json() id = int(receiver.get('id')) response_data = handle_id_metric(id) res = success(response_data, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route('/data/metric/list', methods=['POST']) def data_metric_list(): """ 获取数据指标列表 请求参数: - current: 当前页码,默认1 - size: 每页大小,默认10 - en_name: 英文名称过滤 - name: 名称过滤 - category: 类别过滤 - time: 时间过滤 - tag: 标签过滤 返回: - 指标列表数据和分页信息 """ try: # 传入请求参数 receiver = request.get_json() page = int(receiver.get('current', 1)) page_size = int(receiver.get('size', 10)) en_name_filter = receiver.get('en_name', None) name_filter = receiver.get('name', None) category = receiver.get('category', None) time = receiver.get('time', None) tag = receiver.get('tag', None) # 计算跳过的记录的数量 skip_count = (page - 1) * page_size data, total = metric_list(skip_count, page_size, en_name_filter, name_filter, category, time, tag) response_data = {'records': data, 'total': total, 'size': page_size, 'current': page} res = success(response_data, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route('/data/metric/graph/all', methods=['POST']) def data_metric_graph_all(): """ 获取数据指标图谱 请求参数: - id: 指标ID - type: 图谱类型,可选kinship/impact/all - meta: 是否返回元数据,true/false 返回: - 图谱数据 """ try: # 传入请求参数 receiver = request.get_json() nodeid = receiver['id'] type = receiver['type'] # kinship/impact/all meta = receiver['meta'] # true/false 是否返回元数据 if type == 'kinship': result = metric_kinship_graph(nodeid, meta) elif type == 'impact': result = metric_impact_graph(nodeid, meta) else: result = metric_all_graph(nodeid, meta) return json.dumps(success(result, "success"), ensure_ascii=False, cls=MyEncoder) except Exception as e: return json.dumps(failed({}, str(e)), ensure_ascii=False, cls=MyEncoder) @bp.route('/data/metric/list/graph', methods=['POST']) def data_metric_list_graph(): """ 获取数据指标列表图谱 请求参数: - tag: 标签ID 返回: - 图谱数据,包含节点和连线 """ try: # 传入请求参数 receiver = request.get_json() if not receiver or 'tag' not in receiver: raise ValueError("Missing 'tag' parameter in request body") nodeid = receiver['tag'] # 构建查询条件 params = {} where_clause = "" if nodeid is not None: where_clause = "MATCH (n)-[:label]->(la) WHERE id(la) = $nodeId" params['nodeId'] = nodeid query = f""" MATCH (n:data_metric) OPTIONAL MATCH (n)-[:child]->(child) {where_clause} WITH collect(DISTINCT {{id: toString(id(n)), text: n.name, type: split(labels(n)[0], '_')[1]}}) AS nodes, collect(DISTINCT {{id: toString(id(child)), text: child.name, type: split(labels(child)[0], '_')[1]}}) AS nodes2, collect(DISTINCT {{from: toString(id(n)), to: toString(id(child)), text: '下级'}}) AS lines RETURN nodes + nodes2 AS nodes, lines AS lines """ # 修复:使用正确的session方式执行查询 driver = connect_graph() if not driver: return json.dumps(failed({}, "无法连接到数据库"), ensure_ascii=False, cls=MyEncoder) with driver.session() as session: result = session.run(query, **params) res = {} for item in result: res = { "nodes": [record for record in item['nodes'] if record['id']], "lines": [record for record in item['lines'] if record['from'] and record['to']], } return json.dumps(success(res, "success"), ensure_ascii=False, cls=MyEncoder) except Exception as e: return json.dumps(failed({}, str(e)), ensure_ascii=False, cls=MyEncoder) @bp.route('/data/metric/update', methods=['POST']) def data_metric_update(): """ 更新数据指标 请求参数: - id: 指标ID - 其他需要更新的属性 返回: - 处理结果,成功或失败 """ try: # 传入请求参数 receiver = request.get_json() data_metric_edit(receiver) res = success({}, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder)