123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286 |
- """
- 数据指标API接口模块
- 该模块提供了数据指标的各种API接口,包括:
- - 指标新增、更新和查询
- - 指标列表展示
- - 指标图谱生成
- - 指标代码生成
- - 指标关系管理
- """
- from flask import request, jsonify
- import json
- from app.api.data_metric import bp
- from app.core.data_metric.metric_interface import (
- metric_list, handle_metric_relation, handle_data_metric,
- handle_meta_data_metric, handle_id_metric, metric_kinship_graph,
- metric_impact_graph, metric_all_graph, data_metric_edit
- )
- from app.core.llm import code_generate_metric
- from app.core.meta_data import translate_and_parse
- from app.models.result import success, failed
- from app.core.graph.graph_operations import MyEncoder, connect_graph
- @bp.route('/data/metric/relation', methods=['POST'])
- def data_metric_relation():
- """
- 处理数据指标血缘关系
-
- 请求参数:
- - id: 数据模型ID
-
- 返回:
- - 处理结果,包含血缘关系数据
- """
- try:
- # 传入请求参数
- receiver = request.get_json()
- model_ids = receiver['id'] # 给定一个数据模型的id
- response_data = handle_metric_relation(model_ids)
- res = success(response_data, "success")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- res = failed({}, {"error": f"{e}"})
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route('/data/metric/add', methods=['POST'])
- def data_metric_add():
- """
- 新增数据指标
-
- 请求参数:
- - name: 指标名称
- - 其他指标相关属性
-
- 返回:
- - 处理结果,成功或失败
- """
- # 传入请求参数
- receiver = request.get_json()
- metric_name = receiver['name'] # 数据指标的name
- try:
- result_list = translate_and_parse(metric_name)
- id, id_list = handle_data_metric(metric_name, result_list, receiver)
- handle_meta_data_metric(id, id_list)
- res = success({}, "success")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- res = failed({}, {"error": f"{e}"})
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route('/data/metric/code', methods=['POST'])
- def data_metric_code():
- """
- 生成指标计算的代码
-
- 请求参数:
- - content: 指标规则描述
- - relation: 映射关系
-
- 返回:
- - 生成的代码
- """
- # 传入请求参数
- receiver = request.get_json()
- content = receiver['content'] # 指标规则描述
- relation = receiver['relation'] # 映射关系
- try:
- id_list = [item["id"] for item in relation]
- cql = """
- MATCH (n:meta_node) WHERE id(n) IN $Id_list
- WITH n.en_name AS en_name, n.name AS name
- WITH collect({en_name: en_name, name: name}) AS res
- WITH reduce(acc = {}, item IN res | apoc.map.setKey(acc, item.en_name, item.name)) AS result
- RETURN result
- """
- id_relation = connect_graph.run(cql, Id_list=id_list).evaluate()
- result = code_generate_metric(content, id_relation)
- res = success(result, "success")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- res = failed({}, {"error": f"{e}"})
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route('/data/metric/detail', methods=['POST'])
- def data_metric_detail():
- """
- 获取数据指标详情
-
- 请求参数:
- - id: 指标ID
-
- 返回:
- - 指标详情数据
- """
- try:
- # 传入请求参数
- receiver = request.get_json()
- id = int(receiver.get('id'))
- response_data = handle_id_metric(id)
- res = success(response_data, "success")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- res = failed({}, {"error": f"{e}"})
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route('/data/metric/list', methods=['POST'])
- def data_metric_list():
- """
- 获取数据指标列表
-
- 请求参数:
- - current: 当前页码,默认1
- - size: 每页大小,默认10
- - en_name: 英文名称过滤
- - name: 名称过滤
- - category: 类别过滤
- - time: 时间过滤
- - tag: 标签过滤
-
- 返回:
- - 指标列表数据和分页信息
- """
- try:
- # 传入请求参数
- receiver = request.get_json()
- page = int(receiver.get('current', 1))
- page_size = int(receiver.get('size', 10))
- en_name_filter = receiver.get('en_name', None)
- name_filter = receiver.get('name', None)
- category = receiver.get('category', None)
- time = receiver.get('time', None)
- tag = receiver.get('tag', None)
- # 计算跳过的记录的数量
- skip_count = (page - 1) * page_size
- data, total = metric_list(skip_count, page_size, en_name_filter,
- name_filter, category, time, tag)
- response_data = {'records': data, 'total': total, 'size': page_size, 'current': page}
- res = success(response_data, "success")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- res = failed({}, {"error": f"{e}"})
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route('/data/metric/graph/all', methods=['POST'])
- def data_metric_graph_all():
- """
- 获取数据指标图谱
-
- 请求参数:
- - id: 指标ID
- - type: 图谱类型,可选kinship/impact/all
- - meta: 是否返回元数据,true/false
-
- 返回:
- - 图谱数据
- """
- try:
- # 传入请求参数
- receiver = request.get_json()
- nodeid = receiver['id']
- type = receiver['type'] # kinship/impact/all
- meta = receiver['meta'] # true/false 是否返回元数据
- if type == 'kinship':
- result = metric_kinship_graph(nodeid, meta)
- elif type == 'impact':
- result = metric_impact_graph(nodeid, meta)
- else:
- result = metric_all_graph(nodeid, meta)
- return json.dumps(success(result, "success"), ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- return json.dumps(failed({}, str(e)), ensure_ascii=False, cls=MyEncoder)
- @bp.route('/data/metric/list/graph', methods=['POST'])
- def data_metric_list_graph():
- """
- 获取数据指标列表图谱
-
- 请求参数:
- - tag: 标签ID
-
- 返回:
- - 图谱数据,包含节点和连线
- """
- try:
- # 传入请求参数
- receiver = request.get_json()
- if not receiver or 'tag' not in receiver:
- raise ValueError("Missing 'tag' parameter in request body")
- nodeid = receiver['tag']
- # 构建查询条件
- params = {}
- where_clause = ""
- if nodeid is not None:
- where_clause = "MATCH (n)-[:label]->(la) WHERE id(la) = $nodeId"
- params['nodeId'] = nodeid
- query = f"""
- MATCH (n:data_metric)
- OPTIONAL MATCH (n)-[:child]->(child)
- {where_clause}
- WITH
- collect(DISTINCT {{id: toString(id(n)), text: n.name, type: split(labels(n)[0], '_')[1]}}) AS nodes,
- collect(DISTINCT {{id: toString(id(child)), text: child.name, type: split(labels(child)[0], '_')[1]}}) AS nodes2,
- collect(DISTINCT {{from: toString(id(n)), to: toString(id(child)), text: '下级'}}) AS lines
- RETURN nodes + nodes2 AS nodes, lines AS lines
- """
- # 修复:使用正确的session方式执行查询
- driver = connect_graph()
- if not driver:
- return json.dumps(failed({}, "无法连接到数据库"), ensure_ascii=False, cls=MyEncoder)
-
- with driver.session() as session:
- result = session.run(query, **params)
- res = {}
- for item in result:
- res = {
- "nodes": [record for record in item['nodes'] if record['id']],
- "lines": [record for record in item['lines'] if record['from'] and record['to']],
- }
- return json.dumps(success(res, "success"), ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- return json.dumps(failed({}, str(e)), ensure_ascii=False, cls=MyEncoder)
- @bp.route('/data/metric/update', methods=['POST'])
- def data_metric_update():
- """
- 更新数据指标
-
- 请求参数:
- - id: 指标ID
- - 其他需要更新的属性
-
- 返回:
- - 处理结果,成功或失败
- """
- try:
- # 传入请求参数
- receiver = request.get_json()
- data_metric_edit(receiver)
- res = success({}, "success")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- res = failed({}, {"error": f"{e}"})
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
|