""" 数据指标API接口模块 该模块提供了数据指标的各种API接口,包括: - 指标新增、更新和查询 - 指标列表展示 - 指标图谱生成 - 指标代码生成 - 指标关系管理 """ from flask import request, jsonify import json from app.api.data_metric import bp from app.core.data_metric.metric_interface import ( metric_list, handle_metric_relation, handle_data_metric, handle_meta_data_metric, handle_id_metric, metric_kinship_graph, metric_impact_graph, metric_all_graph, data_metric_edit, metric_check ) from app.core.llm import code_generate_metric from app.core.meta_data import translate_and_parse from app.models.result import success, failed from app.core.graph.graph_operations import MyEncoder, connect_graph @bp.route('/relation', methods=['POST']) def data_metric_relation(): """ 处理数据指标血缘关系 请求参数: - id: 数据模型ID 返回: - 处理结果,包含血缘关系数据 """ try: # 传入请求参数 receiver = request.get_json() model_ids = receiver['id'] # 给定一个数据模型的id response_data = handle_metric_relation(model_ids) res = success(response_data, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route('/add', methods=['POST']) def data_metric_add(): """ 新增数据指标 请求参数: - name: 指标名称 - 其他指标相关属性 返回: - 处理结果,成功或失败 """ # 传入请求参数 receiver = request.get_json() metric_name = receiver['name'] # 数据指标的name try: result_list = translate_and_parse(metric_name) id, id_list = handle_data_metric(metric_name, result_list, receiver) handle_meta_data_metric(id, id_list) res = success({}, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route('/code', methods=['POST']) def data_metric_code(): """ 生成指标计算的代码 请求参数: - content: 指标规则描述 - relation: 映射关系 返回: - 生成的代码 """ # 传入请求参数 receiver = request.get_json() content = receiver['content'] # 指标规则描述 relation = receiver['relation'] # 映射关系 try: id_list = [item["id"] for item in relation] cql = """ MATCH (n:DataMetric) WHERE id(n) IN $Id_list WITH n.name_en AS name_en, n.name_zh AS name_zh WITH collect({name_en: name_en, name_zh: name_zh}) AS res WITH reduce(acc = {}, item IN res | apoc.map.setKey(acc, item.name_en, item.name_zh)) AS result RETURN result """ id_relation = connect_graph.run(cql, Id_list=id_list).evaluate() result = code_generate_metric(content, id_relation) res = success(result, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route('/detail', methods=['POST']) def data_metric_detail(): """ 获取数据指标详情 请求参数: - id: 指标ID 返回: - 指标详情数据 """ try: # 传入请求参数 receiver = request.get_json() id = int(receiver.get('id')) response_data = handle_id_metric(id) res = success(response_data, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route('/list', methods=['POST']) def data_metric_list(): """ 获取数据指标列表 请求参数: - current: 当前页码,默认1 - size: 每页大小,默认10 - name_en: 英文名称过滤 - name_zh: 名称过滤 - category: 类别过滤 - time: 时间过滤 - tag: 标签过滤 返回: - 指标列表数据和分页信息 """ try: # 传入请求参数 receiver = request.get_json() page = int(receiver.get('current', 1)) page_size = int(receiver.get('size', 10)) name_en_filter = receiver.get('name_en', None) name_zh_filter = receiver.get('name_zh', None) category = receiver.get('category', None) time = receiver.get('time', None) tag = receiver.get('tag', None) # 计算跳过的记录的数量 skip_count = (page - 1) * page_size data, total = metric_list(skip_count, page_size, name_en_filter, name_zh_filter, category, time, tag) response_data = {'records': data, 'total': total, 'size': page_size, 'current': page} res = success(response_data, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route('/graph/all', methods=['POST']) def data_metric_graph_all(): """ 获取数据指标图谱 请求参数: - id: 指标ID - type: 图谱类型,可选kinship/impact/all - meta: 是否返回元数据,true/false 返回: - 图谱数据 """ try: # 传入请求参数 receiver = request.get_json() nodeid = receiver['id'] type = receiver['type'] # kinship/impact/all meta = receiver['meta'] # true/false 是否返回元数据 if type == 'kinship': result = metric_kinship_graph(nodeid, meta) elif type == 'impact': result = metric_impact_graph(nodeid, meta) else: result = metric_all_graph(nodeid, meta) return json.dumps(success(result, "success"), ensure_ascii=False, cls=MyEncoder) except Exception as e: return json.dumps(failed({}, str(e)), ensure_ascii=False, cls=MyEncoder) @bp.route('/list/graph', methods=['POST']) def data_metric_list_graph(): """ 获取数据指标列表图谱 请求参数: - tag: 标签ID 返回: - 图谱数据,包含节点和连线 """ try: # 传入请求参数 receiver = request.get_json() if not receiver or 'tag' not in receiver: raise ValueError("Missing 'tag' parameter in request body") nodeid = receiver['tag'] # 构建查询条件 params = {} where_clause = "" if nodeid is not None: where_clause = "MATCH (n)-[:LABEL]->(la) WHERE id(la) = $nodeId" params['nodeId'] = nodeid query = f""" MATCH (n:DataMetric) OPTIONAL MATCH (n)-[:child]->(child) {where_clause} WITH collect(DISTINCT {{id: toString(id(n)), text: n.name, type: split(labels(n)[0], '_')[1]}}) AS nodes, collect(DISTINCT {{id: toString(id(child)), text: child.name, type: split(labels(child)[0], '_')[1]}}) AS nodes2, collect(DISTINCT {{from: toString(id(n)), to: toString(id(child)), text: '下级'}}) AS lines RETURN nodes + nodes2 AS nodes, lines AS lines """ # 修复:使用正确的session方式执行查询 driver = connect_graph() if not driver: return json.dumps(failed({}, "无法连接到数据库"), ensure_ascii=False, cls=MyEncoder) with driver.session() as session: result = session.run(query, **params) res = {} for item in result: res = { "nodes": [record for record in item['nodes'] if record['id']], "lines": [record for record in item['lines'] if record['from'] and record['to']], } return json.dumps(success(res, "success"), ensure_ascii=False, cls=MyEncoder) except Exception as e: return json.dumps(failed({}, str(e)), ensure_ascii=False, cls=MyEncoder) @bp.route('/update', methods=['POST']) def data_metric_update(): """ 更新数据指标 请求参数: - id: 指标ID - 其他需要更新的属性 返回: - 处理结果,成功或失败 """ try: # 传入请求参数 receiver = request.get_json() data_metric_edit(receiver) res = success({}, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route('/check', methods=['POST']) def data_metric_check(): """ 检查指标计算公式中的变量 请求参数: - formula: 指标计算公式文本,格式如:指标名称 = 变量1 + 变量2 * 数字 返回: - 变量检查结果列表,包含每个变量的匹配信息 """ try: # 获取请求参数 receiver = request.get_json() formula_text = receiver.get('formula', '') if not formula_text: res = failed({}, {"error": "公式文本不能为空"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # 调用核心业务逻辑 result = metric_check(formula_text) res = success(result, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder)