|
@@ -0,0 +1,591 @@
|
|
|
+"""
|
|
|
+数据指标核心业务逻辑模块
|
|
|
+
|
|
|
+该模块提供了数据指标的各种核心功能,包括:
|
|
|
+- 指标列表查询和过滤
|
|
|
+- 指标血缘关系处理
|
|
|
+- 指标图谱生成(血缘关系、影响关系、所有关系)
|
|
|
+- 指标数据的创建、更新和查询
|
|
|
+"""
|
|
|
+
|
|
|
+import json
|
|
|
+import logging
|
|
|
+from py2neo import Relationship
|
|
|
+from app.core.common import delete_relationships, update_or_create_node, get_node_by_id_no_label
|
|
|
+from app.core.meta_data import get_formatted_time
|
|
|
+from app.core.graph.graph_operations import connect_graph
|
|
|
+from app.core.data_resource.resource import get_node_by_id
|
|
|
+from app.services.package_function import get_node, create_or_get_node, relationship_exists
|
|
|
+
|
|
|
+# 配置日志
|
|
|
+logging.basicConfig(filename='metric_interface.log', level=logging.INFO)
|
|
|
+
|
|
|
+def metric_list(skip_count, page_size, en_name_filter=None,
|
|
|
+ name_filter=None, category_filter=None, time_filter=None, tag_filter=None):
|
|
|
+ """
|
|
|
+ 获取数据指标列表,支持多种过滤条件
|
|
|
+
|
|
|
+ Args:
|
|
|
+ skip_count: 分页跳过的记录数
|
|
|
+ page_size: 每页记录数
|
|
|
+ en_name_filter: 英文名称过滤
|
|
|
+ name_filter: 名称过滤
|
|
|
+ category_filter: 类别过滤
|
|
|
+ time_filter: 时间过滤
|
|
|
+ tag_filter: 标签过滤
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ tuple: (数据列表, 总记录数)
|
|
|
+ """
|
|
|
+ data = []
|
|
|
+
|
|
|
+ # 构建查询条件
|
|
|
+ where_clause = []
|
|
|
+ params = {}
|
|
|
+ if name_filter:
|
|
|
+ where_clause.append("n.name CONTAINS $name_filter")
|
|
|
+ params['name_filter'] = name_filter
|
|
|
+ if en_name_filter:
|
|
|
+ where_clause.append("n.en_name CONTAINS $en_name_filter")
|
|
|
+ params['en_name_filter'] = en_name_filter
|
|
|
+ if category_filter:
|
|
|
+ where_clause.append("n.category CONTAINS $category_filter")
|
|
|
+ params['category_filter'] = category_filter
|
|
|
+ if tag_filter:
|
|
|
+ where_clause.append("n.tag = $nodeId")
|
|
|
+ params['nodeId'] = tag_filter
|
|
|
+ if time_filter:
|
|
|
+ where_clause.append("n.time CONTAINS $time_filter")
|
|
|
+ params['time_filter'] = time_filter
|
|
|
+ else:
|
|
|
+ where_clause.append("TRUE")
|
|
|
+
|
|
|
+ where_str = " AND ".join(where_clause)
|
|
|
+
|
|
|
+ # 构建完整的查询语句
|
|
|
+ cql = f"""
|
|
|
+ MATCH (n:data_metric)
|
|
|
+ WHERE {where_str}
|
|
|
+ OPTIONAL MATCH (la:label) where id(la) = n.tag
|
|
|
+ OPTIONAL MATCH (n:data_metric)-[:origin]-(m:data_model)
|
|
|
+ OPTIONAL MATCH (n:data_metric)-[:origin]-(mr:data_metric)
|
|
|
+ //数据标签
|
|
|
+ WITH
|
|
|
+ CASE
|
|
|
+ WHEN m IS NOT NULL THEN collect(m.name)
|
|
|
+ WHEN mr IS NOT NULL THEN collect(mr.name)
|
|
|
+ ELSE []
|
|
|
+ END AS data_model,properties(n) as properties,
|
|
|
+ n.time as time,id(n) as nodeid,{{id:id(la),name:la.name}} as tag
|
|
|
+ return properties,time,nodeid,data_model,tag
|
|
|
+ ORDER BY time desc
|
|
|
+ SKIP $skip_count
|
|
|
+ LIMIT $page_size
|
|
|
+ """
|
|
|
+ params['skip_count'] = skip_count
|
|
|
+ params['page_size'] = page_size
|
|
|
+ result = connect_graph.run(cql, **params)
|
|
|
+ for record in result:
|
|
|
+ properties = record['properties']
|
|
|
+ properties['data_model'] = record['data_model']
|
|
|
+ properties['tag'] = record['tag']
|
|
|
+ new_attr = {
|
|
|
+ 'id': record['nodeid']
|
|
|
+ }
|
|
|
+ if "id_list" in properties:
|
|
|
+ properties['id_list'] = json.loads(properties['id_list'])
|
|
|
+ if "describe" not in properties:
|
|
|
+ properties["describe"] = None
|
|
|
+
|
|
|
+ properties.update(new_attr)
|
|
|
+ data.append(properties)
|
|
|
+
|
|
|
+ # 获取总量
|
|
|
+ total_query = f"MATCH (n:data_metric) " \
|
|
|
+ f"WHERE {where_str} RETURN COUNT(n) AS total"
|
|
|
+ total_result = connect_graph.run(total_query, **params).evaluate()
|
|
|
+ return data, total_result
|
|
|
+
|
|
|
+
|
|
|
+def handle_metric_relation(model_ids):
|
|
|
+ """
|
|
|
+ 处理数据指标血缘关系
|
|
|
+
|
|
|
+ Args:
|
|
|
+ model_ids: 数据模型ID
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ list: 血缘关系数据
|
|
|
+ """
|
|
|
+ query = """
|
|
|
+ MATCH (search:data_model)-[:connection]->(common_node:meta_node)<-[:connection]-(connect:data_model)
|
|
|
+ WHERE id(search) = $model_Ids
|
|
|
+ WITH search, connect, common_node
|
|
|
+ MATCH (search)-[:connection]->(search_node:meta_node)
|
|
|
+ WITH search, connect, common_node, collect(DISTINCT id(search_node)) AS search_nodes
|
|
|
+ MATCH (connect)-[:connection]->(connect_node:meta_node)
|
|
|
+ WITH search, connect, common_node, search_nodes, collect(DISTINCT id(connect_node)) AS connect_nodes
|
|
|
+ WITH search, connect, search_nodes, connect_nodes, collect(DISTINCT id(common_node)) AS common_nodes
|
|
|
+
|
|
|
+ // 剔除 search_nodes 和 connect_nodes 中包含在 common_nodes 中的内容
|
|
|
+ WITH search, connect, common_nodes,
|
|
|
+ [node IN search_nodes WHERE NOT node IN common_nodes] AS filtered_search_nodes,
|
|
|
+ [node IN connect_nodes WHERE NOT node IN common_nodes] AS filtered_connect_nodes
|
|
|
+
|
|
|
+ RETURN id(connect) as blood_model, common_nodes,
|
|
|
+ filtered_search_nodes as origin_nodes, filtered_connect_nodes as blood_nodes
|
|
|
+ """
|
|
|
+
|
|
|
+ result = connect_graph.run(query, model_Ids=model_ids)
|
|
|
+ return result.data()
|
|
|
+
|
|
|
+
|
|
|
+def id_mertic_graph(id):
|
|
|
+ """
|
|
|
+ 生成数据指标图谱
|
|
|
+
|
|
|
+ Args:
|
|
|
+ id: 指标ID
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ dict: 图谱数据,包含节点、连线和根节点ID
|
|
|
+ """
|
|
|
+ query = """
|
|
|
+ MATCH (n:data_metric)
|
|
|
+ WHERE id(n) = $nodeId
|
|
|
+ WITH n, apoc.convert.fromJsonList(n.model_id) AS id_lists
|
|
|
+ UNWIND id_lists AS id_list
|
|
|
+ WITH n, id_list.id AS model_id, id_list.metaData AS meta_ids
|
|
|
+ OPTIONAL MATCH (t:data_label) WHERE id(t) = n.tag
|
|
|
+ UNWIND meta_ids AS meta_id
|
|
|
+ MATCH (d:data_model) WHERE id(d) = model_id
|
|
|
+ MATCH (a:meta_node) WHERE id(a) = meta_id
|
|
|
+ with
|
|
|
+ collect({from:toString(id(n)),to:toString(id(d)),text:"来源"})+
|
|
|
+ collect({from:toString(id(n)),to:toString(id(a)),text:"包含"})+
|
|
|
+ collect({from:toString(id(d)),to:toString(id(a)),text:"包含"})+
|
|
|
+ collect({from:toString(id(n)),to:toString(id(t)),text:"标签"})+
|
|
|
+ collect({from:toString(id(d)),to:toString(id(t)),text:"标签"})AS line,
|
|
|
+ collect({id: toString(id(n)), text: n.name, type: "metric"}) +
|
|
|
+ collect({id: toString(id(d)), text: d.name, type: "model"}) +
|
|
|
+ collect({id: toString(id(t)), text: t.name, type: "label"}) +
|
|
|
+ collect({id: toString(id(a)), text: a.name}) AS node,n
|
|
|
+ WITH apoc.coll.toSet(line) AS lines,
|
|
|
+ apoc.coll.toSet(node) AS nodes,
|
|
|
+ toString(id(n)) as res
|
|
|
+ RETURN lines,nodes,res
|
|
|
+ """
|
|
|
+ data = connect_graph.run(query, nodeId=id)
|
|
|
+
|
|
|
+ res = {}
|
|
|
+ for item in data:
|
|
|
+ res = {
|
|
|
+ "nodes": [record for record in item['nodes'] if record['id']],
|
|
|
+ "lines": [record for record in item['lines'] if record['from'] and record['to']],
|
|
|
+ "rootId": item['res'],
|
|
|
+ }
|
|
|
+
|
|
|
+ logging.info(res) # 记录 'res' 变量
|
|
|
+ return res
|
|
|
+
|
|
|
+
|
|
|
+def handle_data_metric(metric_name, result_list, receiver):
|
|
|
+ """
|
|
|
+ 创建一个数据指标节点
|
|
|
+
|
|
|
+ Args:
|
|
|
+ metric_name: 指标名称
|
|
|
+ result_list: 结果列表
|
|
|
+ receiver: 请求参数
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ tuple: (指标节点ID, ID列表)
|
|
|
+ """
|
|
|
+ data_metric_en = result_list[0]
|
|
|
+ id_list = receiver['id_list']
|
|
|
+ receiver['id_list'] = json.dumps(receiver['id_list'], ensure_ascii=False)
|
|
|
+
|
|
|
+ receiver.update({
|
|
|
+ 'time': get_formatted_time(),
|
|
|
+ 'en_name': data_metric_en
|
|
|
+ })
|
|
|
+
|
|
|
+ data_metric_node = get_node('data_metric', name=metric_name) or create_or_get_node('data_metric', **receiver)
|
|
|
+
|
|
|
+ child_list = receiver['childrenId']
|
|
|
+ for child_id in child_list:
|
|
|
+ child = get_node_by_id_no_label(child_id)
|
|
|
+ # 建立关系:当前节点的childrenId指向,以及关系child
|
|
|
+ if child and not relationship_exists(data_metric_node, 'child', child):
|
|
|
+ connect_graph.create(Relationship(data_metric_node, 'child', child))
|
|
|
+
|
|
|
+ if receiver.get('tag'):
|
|
|
+ tag = get_node_by_id('data_label', receiver['tag'])
|
|
|
+ if tag and not relationship_exists(data_metric_node, 'label', tag):
|
|
|
+ connect_graph.create(Relationship(data_metric_node, 'label', tag))
|
|
|
+
|
|
|
+ return data_metric_node.identity, id_list
|
|
|
+
|
|
|
+
|
|
|
+def handle_meta_data_metric(data_metric_node_id, id_list):
|
|
|
+ """
|
|
|
+ 处理数据指标与其他节点之间的关系
|
|
|
+
|
|
|
+ Args:
|
|
|
+ data_metric_node_id: 数据指标节点ID
|
|
|
+ id_list: ID列表
|
|
|
+ """
|
|
|
+ # 提取 model_id 和 metric_id
|
|
|
+ model_ids = [item['id'] for item in id_list if item['type'] == 'model']
|
|
|
+ metric_ids = [item['id'] for item in id_list if item['type'] == 'metric']
|
|
|
+ # 创建与data_model的关系
|
|
|
+ if model_ids:
|
|
|
+ cql_resource = """
|
|
|
+ MATCH (n:data_metric)
|
|
|
+ WHERE id(n) = $data_metric_node_id
|
|
|
+ UNWIND $model_ids AS model_id
|
|
|
+ MATCH (d:data_model)
|
|
|
+ WHERE id(d) = model_id
|
|
|
+ MERGE (n)-[:origin]->(d)
|
|
|
+ """
|
|
|
+ connect_graph.run(cql_resource, data_metric_node_id=data_metric_node_id, model_ids=model_ids)
|
|
|
+ # 创建与data_metric的关系
|
|
|
+ if metric_ids:
|
|
|
+ cql_resource = """
|
|
|
+ MATCH (n:data_metric)
|
|
|
+ WHERE id(n) = $data_metric_node_id
|
|
|
+ UNWIND $metric_ids AS metric_id
|
|
|
+ MATCH (d:data_metric)
|
|
|
+ WHERE id(d) = metric_id
|
|
|
+ MERGE (n)-[:origin]->(d)
|
|
|
+ """
|
|
|
+ connect_graph.run(cql_resource, data_metric_node_id=data_metric_node_id, metric_ids=metric_ids)
|
|
|
+
|
|
|
+ # 创建与元数据的关系
|
|
|
+ for record in id_list:
|
|
|
+ if "metaData" in record and record['metaData'] != []:
|
|
|
+ cql_meta = """
|
|
|
+ MATCH (n:data_metric)
|
|
|
+ WHERE id(n) = $data_metric_node_id
|
|
|
+ UNWIND $meta_ids AS meta_id
|
|
|
+ MATCH (d:meta_node)
|
|
|
+ WHERE id(d) = meta_id
|
|
|
+ MERGE (n)-[:connection]->(d)
|
|
|
+ """
|
|
|
+ connect_graph.run(cql_meta, data_metric_node_id=data_metric_node_id, meta_ids=record['metaData'])
|
|
|
+
|
|
|
+
|
|
|
+def handle_id_metric(id):
|
|
|
+ """
|
|
|
+ 获取数据指标详情
|
|
|
+
|
|
|
+ Args:
|
|
|
+ id: 指标ID
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ dict: 指标详情数据
|
|
|
+ """
|
|
|
+ query = """
|
|
|
+ MATCH (n:data_metric)
|
|
|
+ WHERE id(n) = $nodeId
|
|
|
+ WITH apoc.convert.fromJsonList(n.id_list) AS info, n
|
|
|
+ UNWIND info AS item
|
|
|
+ WITH n, item.id AS model_or_metric_id, item.metaData AS meta_ids, item.type AS type
|
|
|
+
|
|
|
+ // 数据模型或者数据指标
|
|
|
+ OPTIONAL MATCH (n)-[:origin]->(m1:data_model)
|
|
|
+ WHERE type = 'model' AND id(m1) = model_or_metric_id
|
|
|
+ WITH n, model_or_metric_id, meta_ids, type, m1
|
|
|
+ OPTIONAL MATCH (n)-[:origin]->(m2:data_metric)
|
|
|
+ WHERE type = 'metric' AND id(m2) = model_or_metric_id
|
|
|
+ WITH n, model_or_metric_id, meta_ids, type, m1, m2
|
|
|
+ // 元数据
|
|
|
+ OPTIONAL MATCH (n)-[:connection]-(meta:meta_node)
|
|
|
+ // 数据标签
|
|
|
+ OPTIONAL MATCH (n)-[:label]-(la:data_label)
|
|
|
+ OPTIONAL MATCH (parent)-[:child]-(n)
|
|
|
+ WITH properties(n) AS properties,collect(DISTINCT id(meta)) AS meta_list,parent,
|
|
|
+ {id: id(la), name: la.name} AS tag,
|
|
|
+ CASE
|
|
|
+ WHEN type = 'model' THEN m1
|
|
|
+ WHEN type = 'metric' THEN m2
|
|
|
+ ELSE NULL
|
|
|
+ END AS m
|
|
|
+ WITH {model_name: m.name, model_id: id(m), meta: meta_list} AS result, properties,
|
|
|
+ tag,{id:id(parent),name:parent.name} as parentId
|
|
|
+ RETURN collect(result) AS id_list, properties, tag,collect(parentId)as parentId
|
|
|
+ """
|
|
|
+ data_ = connect_graph.run(query, nodeId=id).data()
|
|
|
+
|
|
|
+ if not data_:
|
|
|
+ return {"data_metric": {}}
|
|
|
+
|
|
|
+ record = data_[0]
|
|
|
+ properties = record['properties']
|
|
|
+ properties['id_list'] = record['id_list']
|
|
|
+ properties['tag'] = record['tag']
|
|
|
+ properties['parentId'] = record['parentId']
|
|
|
+
|
|
|
+ # 移除不需要的属性
|
|
|
+ properties.pop('model_id', None)
|
|
|
+
|
|
|
+ # 添加缺失的属性
|
|
|
+ for key in ["describe", "tag", "code"]:
|
|
|
+ if key not in properties:
|
|
|
+ properties[key] = None
|
|
|
+
|
|
|
+ response_data = {"data_metric": properties}
|
|
|
+
|
|
|
+ return response_data
|
|
|
+
|
|
|
+
|
|
|
+def metric_kinship_graph(nodeid, meta):
|
|
|
+ """
|
|
|
+ 生成数据指标血缘关系图谱
|
|
|
+
|
|
|
+ Args:
|
|
|
+ nodeid: 节点ID
|
|
|
+ meta: 是否包含元数据
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ dict: 图谱数据
|
|
|
+ """
|
|
|
+ # 查询语句 - 使用简单的路径匹配实现逐层查找
|
|
|
+ cql = """
|
|
|
+ // 使用可变长度路径查找所有相关节点
|
|
|
+ MATCH path = (start)-[:origin*0..]->(target)
|
|
|
+ WHERE id(start) = $nodeId
|
|
|
+
|
|
|
+ // 提取路径中的所有节点
|
|
|
+ WITH collect(DISTINCT path) as paths
|
|
|
+ UNWIND paths as p
|
|
|
+ UNWIND nodes(p) as n
|
|
|
+
|
|
|
+ // 收集所有节点信息
|
|
|
+ WITH collect(DISTINCT {
|
|
|
+ id: toString(id(n)),
|
|
|
+ text: n.name+'-测试',
|
|
|
+ type: CASE
|
|
|
+ WHEN 'data_metric' IN labels(n) THEN 'metric'
|
|
|
+ WHEN 'data_model' IN labels(n) THEN 'model'
|
|
|
+ ELSE split(labels(n)[0],'_')[1]
|
|
|
+ END
|
|
|
+ }) as nodes, paths
|
|
|
+
|
|
|
+ // 从路径中提取关系
|
|
|
+ UNWIND paths as p
|
|
|
+ UNWIND relationships(p) as r
|
|
|
+ WITH nodes, collect(DISTINCT {
|
|
|
+ from: toString(id(startNode(r))),
|
|
|
+ to: toString(id(endNode(r))),
|
|
|
+ text: '来源'
|
|
|
+ }) as relations
|
|
|
+
|
|
|
+ // 返回结果
|
|
|
+ RETURN nodes,
|
|
|
+ [rel in relations WHERE rel.from IS NOT NULL AND rel.to IS NOT NULL] as lines,
|
|
|
+ toString($nodeId) as rootId
|
|
|
+ """
|
|
|
+
|
|
|
+ data = connect_graph.run(cql, nodeId=nodeid)
|
|
|
+ res = {}
|
|
|
+ for item in data:
|
|
|
+ res = {
|
|
|
+ "nodes": [record for record in item['nodes'] if record['id']],
|
|
|
+ "lines": [record for record in item['lines'] if record['from'] and record['to']],
|
|
|
+ "rootId": str(nodeid)
|
|
|
+ }
|
|
|
+ logging.info(res) # 记录 'res' 变量
|
|
|
+ return res
|
|
|
+
|
|
|
+
|
|
|
+def metric_impact_graph(nodeid, meta):
|
|
|
+ """
|
|
|
+ 生成数据指标影响关系图谱
|
|
|
+
|
|
|
+ Args:
|
|
|
+ nodeid: 节点ID
|
|
|
+ meta: 是否包含元数据
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ dict: 图谱数据
|
|
|
+ """
|
|
|
+ if meta:
|
|
|
+ # 查询语句
|
|
|
+ cql = """
|
|
|
+ MATCH(mc2:data_metric)
|
|
|
+ WHERE id(mc2)=$nodeId
|
|
|
+ OPTIONAL MATCH(mc4:data_metric)-[:origin]-(mc2)
|
|
|
+ OPTIONAL MATCH(mc2)-[:connection]-(meta:meta_node)
|
|
|
+ OPTIONAL MATCH(mc2)-[:label]-(la:data_label)
|
|
|
+ OPTIONAL MATCH(mc2)-[:child]-(child)
|
|
|
+ WITH
|
|
|
+ collect({id:toString(id(mc2)),text:mc2.name,type:split(labels(mc2)[0],'_')[1]})+
|
|
|
+ collect({id:toString(id(mc4)),text:mc4.name,type:split(labels(mc4)[0],'_')[1]})+
|
|
|
+ collect({id:toString(id(la)),text:la.name,type:split(labels(la)[0],'_')[1]})+
|
|
|
+ collect({id:toString(id(meta)),text:meta.name})+
|
|
|
+ collect({id:toString(id(child)),text:child.name,type:split(labels(child)[0],'_')[1]})as nodes,mc2,
|
|
|
+ collect({from:toString(id(mc2)),to:toString(id(mc4)),text:'包含'})+
|
|
|
+ collect({from:toString(id(mc2)),to:toString(id(meta)),text:'包含'})+
|
|
|
+ collect({from:toString(id(la)),to:toString(id(mc2)),text:'标记'})+
|
|
|
+ collect({from:toString(id(mc2)),to:toString(id(child)),text:'下级'})as lines
|
|
|
+ WITH
|
|
|
+ toString(id(mc2)) as rootId,
|
|
|
+ apoc.coll.toSet(lines) as lines,
|
|
|
+ apoc.coll.toSet(nodes) as nodes
|
|
|
+ RETURN nodes,lines,rootId
|
|
|
+ """
|
|
|
+ else:
|
|
|
+ # 查询语句
|
|
|
+ cql = """
|
|
|
+ MATCH(mc2:data_metric)
|
|
|
+ WHERE id(mc2)=$nodeId
|
|
|
+ OPTIONAL MATCH(mc4:data_metric)-[:origin]-(mc2)
|
|
|
+ OPTIONAL MATCH(mc2)-[:label]-(la:data_label)
|
|
|
+ OPTIONAL MATCH(mc2)-[:child]-(child)
|
|
|
+ WITH
|
|
|
+ collect({id:toString(id(mc2)),text:mc2.name,type:split(labels(mc2)[0],'_')[1]})+
|
|
|
+ collect({id:toString(id(mc4)),text:mc4.name,type:split(labels(mc4)[0],'_')[1]})+
|
|
|
+ collect({id:toString(id(la)),text:la.name,type:split(labels(la)[0],'_')[1]})+
|
|
|
+ collect({id:toString(id(child)),text:child.name,type:split(labels(child)[0],'_')[1]})as nodes,mc2,
|
|
|
+ collect({from:toString(id(mc2)),to:toString(id(mc4)),text:'包含'})+
|
|
|
+ collect({from:toString(id(la)),to:toString(id(mc2)),text:'标记'})+
|
|
|
+ collect({from:toString(id(mc2)),to:toString(id(child)),text:'下级'})as lines
|
|
|
+ WITH
|
|
|
+ toString(id(mc2)) as rootId,
|
|
|
+ apoc.coll.toSet(lines) as lines,
|
|
|
+ apoc.coll.toSet(nodes) as nodes
|
|
|
+ RETURN nodes,lines,rootId
|
|
|
+ """
|
|
|
+ data = connect_graph.run(cql, nodeId=nodeid)
|
|
|
+ res = {}
|
|
|
+ for item in data:
|
|
|
+ res = {
|
|
|
+ "nodes": [record for record in item['nodes'] if record['id']],
|
|
|
+ "lines": [record for record in item['lines'] if record['from'] and record['to']],
|
|
|
+ "rootId": item['rootId']
|
|
|
+ }
|
|
|
+ logging.info(res) # 记录 'res' 变量
|
|
|
+ return res
|
|
|
+
|
|
|
+
|
|
|
+def metric_all_graph(nodeid, meta):
|
|
|
+ """
|
|
|
+ 生成数据指标所有关系图谱
|
|
|
+
|
|
|
+ Args:
|
|
|
+ nodeid: 节点ID
|
|
|
+ meta: 是否包含元数据
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ dict: 图谱数据
|
|
|
+ """
|
|
|
+ if meta:
|
|
|
+ # 查询语句
|
|
|
+ cql = """
|
|
|
+ MATCH(mc2:data_metric)
|
|
|
+ WHERE id(mc2)=$nodeId
|
|
|
+ OPTIONAL MATCH(mc2)-[:origin]-(mo:data_model)
|
|
|
+ OPTIONAL MATCH(mc2)-[:origin]-(mc1:data_metric)
|
|
|
+ OPTIONAL MATCH(mc4:data_metric)-[:origin]-(mc2)
|
|
|
+ OPTIONAL MATCH(mc2)-[:connection]-(meta:meta_node)
|
|
|
+ OPTIONAL MATCH(mc2)-[:label]-(la:data_label)
|
|
|
+ OPTIONAL MATCH(mc2)-[:child]-(child)
|
|
|
+ WITH
|
|
|
+ collect({id:toString(id(mc2)),text:mc2.name,type:split(labels(mc2)[0],'_')[1]})+
|
|
|
+ collect({id:toString(id(mo)),text:mo.name,type:split(labels(mo)[0],'_')[1]})+
|
|
|
+ collect({id:toString(id(mc1)),text:mc1.name,type:split(labels(mc1)[0],'_')[1]})+
|
|
|
+ collect({id:toString(id(mc4)),text:mc4.name,type:split(labels(mc4)[0],'_')[1]})+
|
|
|
+ collect({id:toString(id(la)),text:la.name,type:split(labels(la)[0],'_')[1]})+
|
|
|
+ collect({id:toString(id(meta)),text:meta.name})+
|
|
|
+ collect({id:toString(id(child)),text:child.name,type:split(labels(child)[0],'_')[1]})as nodes,mc2,
|
|
|
+ collect({from:toString(id(mc2)),to:toString(id(mo)),text:'来源'})+
|
|
|
+ collect({from:toString(id(mc2)),to:toString(id(mc1)),text:'来源'})+
|
|
|
+ collect({from:toString(id(mc2)),to:toString(id(mc4)),text:'包含'})+
|
|
|
+ collect({from:toString(id(mc2)),to:toString(id(meta)),text:'包含'})+
|
|
|
+ collect({from:toString(id(la)),to:toString(id(mc2)),text:'标记'})+
|
|
|
+ collect({from:toString(id(mc2)),to:toString(id(child)),text:'下级'})as lines
|
|
|
+ WITH
|
|
|
+ toString(id(mc2)) as rootId,
|
|
|
+ apoc.coll.toSet(lines) as lines,
|
|
|
+ apoc.coll.toSet(nodes) as nodes
|
|
|
+ RETURN nodes,lines,rootId
|
|
|
+ """
|
|
|
+ else:
|
|
|
+ # 查询语句
|
|
|
+ cql = """
|
|
|
+ MATCH(mc2:data_metric)
|
|
|
+ WHERE id(mc2)=$nodeId
|
|
|
+ OPTIONAL MATCH(mc2)-[:origin]-(mo:data_model)
|
|
|
+ OPTIONAL MATCH(mc2)-[:origin]-(mc1:data_metric)
|
|
|
+ OPTIONAL MATCH(mc4:data_metric)-[:origin]-(mc2)
|
|
|
+ OPTIONAL MATCH(mc2)-[:label]-(la:data_label)
|
|
|
+ OPTIONAL MATCH(mc2)-[:child]-(child)
|
|
|
+ WITH
|
|
|
+ collect({id:toString(id(mc2)),text:mc2.name,type:split(labels(mc2)[0],'_')[1]})+
|
|
|
+ collect({id:toString(id(mo)),text:mo.name,type:split(labels(mo)[0],'_')[1]})+
|
|
|
+ collect({id:toString(id(mc1)),text:mc1.name,type:split(labels(mc1)[0],'_')[1]})+
|
|
|
+ collect({id:toString(id(mc4)),text:mc4.name,type:split(labels(mc4)[0],'_')[1]})+
|
|
|
+ collect({id:toString(id(la)),text:la.name,type:split(labels(la)[0],'_')[1]})+
|
|
|
+ collect({id:toString(id(child)),text:child.name,type:split(labels(child)[0],'_')[1]})as nodes,mc2,
|
|
|
+ collect({from:toString(id(mc2)),to:toString(id(mo)),text:'来源'})+
|
|
|
+ collect({from:toString(id(mc2)),to:toString(id(mc1)),text:'来源'})+
|
|
|
+ collect({from:toString(id(mc2)),to:toString(id(mc4)),text:'包含'})+
|
|
|
+ collect({from:toString(id(la)),to:toString(id(mc2)),text:'标记'})+
|
|
|
+ collect({from:toString(id(mc2)),to:toString(id(child)),text:'下级'})as lines
|
|
|
+ WITH
|
|
|
+ toString(id(mc2)) as rootId,
|
|
|
+ apoc.coll.toSet(lines) as lines,
|
|
|
+ apoc.coll.toSet(nodes) as nodes
|
|
|
+ RETURN nodes,lines,rootId
|
|
|
+ """
|
|
|
+ data = connect_graph.run(cql, nodeId=nodeid)
|
|
|
+ res = {}
|
|
|
+ for item in data:
|
|
|
+ res = {
|
|
|
+ "nodes": [record for record in item['nodes'] if record['id']],
|
|
|
+ "lines": [record for record in item['lines'] if record['from'] and record['to']],
|
|
|
+ "rootId": item['rootId']
|
|
|
+ }
|
|
|
+ logging.info(res) # 记录 'res' 变量
|
|
|
+ return res
|
|
|
+
|
|
|
+
|
|
|
+def data_metric_edit(data):
|
|
|
+ """
|
|
|
+ 编辑数据指标
|
|
|
+
|
|
|
+ Args:
|
|
|
+ data: 数据指标数据
|
|
|
+ """
|
|
|
+ node_a = get_node_by_id('data_metric', data["id"])
|
|
|
+ if node_a:
|
|
|
+ delete_relationships(data["id"])
|
|
|
+
|
|
|
+ # 更新或创建数据指标节点的属性
|
|
|
+ for key, value in data.items():
|
|
|
+ if value is not None and key != "model_selected":
|
|
|
+ node_a[key] = value
|
|
|
+ connect_graph.push(node_a)
|
|
|
+
|
|
|
+ child_list = data['childrenId']
|
|
|
+ for child_id in child_list:
|
|
|
+ child = get_node_by_id_no_label(child_id)
|
|
|
+ # 建立关系:当前节点的childrenId指向,以及关系child
|
|
|
+ if child and not relationship_exists(node_a, 'child', child):
|
|
|
+ connect_graph.create(Relationship(node_a, 'child', child))
|
|
|
+
|
|
|
+ # 处理数据标签及其关系
|
|
|
+ if data["tag"]:
|
|
|
+ tag_node = get_node_by_id('data_label', data["tag"])
|
|
|
+ relationship_label = Relationship(node_a, "label", tag_node)
|
|
|
+ connect_graph.merge(relationship_label)
|
|
|
+
|
|
|
+ # 处理元数据节点及其关系(此处只调整关系,不修改对应属性)
|
|
|
+ for record in data['model_selected']:
|
|
|
+ for parsed_item in record["meta"]:
|
|
|
+ metadata_node = update_or_create_node(
|
|
|
+ parsed_item["id"]
|
|
|
+ )
|
|
|
+ relationship_connection = Relationship(node_a, "connection", metadata_node)
|
|
|
+ connect_graph.merge(relationship_connection)
|