|
- """
- 数据模型核心业务逻辑模块
- 本模块包含了数据模型相关的所有核心业务逻辑函数,包括:
- - 数据模型的创建、更新、删除
- - 数据模型与数据资源、元数据之间的关系处理
- - 数据模型血缘关系管理
- - 数据模型图谱生成
- - 数据模型层级计算等功能
- """
- import math
- import threading
- from concurrent.futures import ThreadPoolExecutor
- import pandas as pd
- from py2neo import Relationship
- import logging
- import json
- from app.services.package_function import create_or_get_node, relationship_exists, get_node
- from app.core.graph.graph_operations import connect_graph
- from app.services.neo4j_driver import neo4j_driver
- from app.core.meta_data import get_formatted_time, handle_id_unstructured
- from app.core.common import delete_relationships, update_or_create_node, get_node_by_id_no_label
- from app.core.data_resource.resource import get_node_by_id
- # 根据child关系计算数据模型当前的level自动保存
- def calculate_model_level(id):
- """
- 根据child关系计算数据模型当前的level并自动保存
-
- Args:
- id: 数据模型的节点ID
-
- Returns:
- None
- """
- cql = """
- MATCH (start_node:data_model {id: $nodeId})
- CALL {
- WITH start_node
- OPTIONAL MATCH path = (start_node)-[:child*]->(end_node)
- RETURN length(path) AS level
- }
- WITH coalesce(max(level), 0) AS max_level
- RETURN max_level
- """
- data = connect_graph.run(cql, nodeId=id).evaluate()
- # 更新level属性
- update_query = """
- MATCH (n:data_model {id: $nodeId})
- SET n.level = $level
- RETURN n
- """
- connect_graph.run(update_query, nodeId=id, level=data)
- # 处理数据模型血缘关系
- def handle_model_relation(resource_ids):
- """
- 处理数据模型血缘关系
-
- Args:
- resource_ids: 数据资源ID
-
- Returns:
- 血缘关系数据
- """
- query = """
- MATCH (search:data_resource)-[:connection]->(common_node:meta_node)<-[:connection]-(connect:data_resource)
- WHERE id(search) = $resource_Ids
- WITH search, connect, common_node
- MATCH (search)-[:connection]->(search_node:meta_node)
- WITH search, connect, common_node, collect(DISTINCT id(search_node)) AS search_nodes
- MATCH (connect)-[:connection]->(connect_node:meta_node)
- WITH search, connect, common_node, search_nodes, collect(DISTINCT id(connect_node)) AS connect_nodes
- WITH search, connect, search_nodes, connect_nodes, collect(DISTINCT id(common_node)) AS common_nodes
- // 剔除 search_nodes 和 connect_nodes 中包含在 common_nodes 中的内容
- WITH search, connect, common_nodes,
- [node IN search_nodes WHERE NOT node IN common_nodes] AS filtered_search_nodes,
- [node IN connect_nodes WHERE NOT node IN common_nodes] AS filtered_connect_nodes
- RETURN id(connect) as blood_resources, common_nodes,
- filtered_search_nodes as origin_nodes, filtered_connect_nodes as blood_nodes
- """
- result = connect_graph.run(query, resource_Ids=resource_ids)
- return result.data()
- # 创建一个数据模型节点
- def handle_data_model(data_model, result_list, result, receiver):
- """
- 创建一个数据模型节点
-
- Args:
- data_model: 数据模型名称
- result_list: 数据模型英文名列表
- result: 序列化的ID列表
- receiver: 接收到的请求参数
-
- Returns:
- tuple: (id, data_model_node)
- """
- # 添加数据资源 血缘关系的字段 blood_resource
- data_model_en = result_list[0]
- receiver['id_list'] = result
- add_attribute = {
- 'time': get_formatted_time(),
- 'en_name': data_model_en
- }
- receiver.update(add_attribute)
- data_model_node = get_node('data_model', name=data_model) or create_or_get_node('data_model', **receiver)
- child_list = receiver['childrenId']
- for child_id in child_list:
- child = get_node_by_id_no_label(child_id)
- # 建立关系:当前节点的childrenId指向,以及关系child
- res = relationship_exists(data_model_node, 'child', child)
- if child and not res:
- connect_graph.create(Relationship(data_model_node, 'child', child))
- # 根据传入参数id,和数据标签建立关系
- if receiver['tag']:
- # 使用 Cypher 查询通过 id 查找节点
- tag = get_node_by_id('data_label', receiver['tag'])
- if tag and not relationship_exists(data_model_node, 'label', tag):
- connection = Relationship(data_model_node, 'label', tag)
- connect_graph.create(connection)
- id = data_model_node.identity
- return id, data_model_node
- # (从数据资源中选取)
- def resource_handle_meta_data_model(id_lists, data_model_node_id):
- """
- 处理从数据资源中选取的数据模型与元数据的关系
-
- Args:
- id_lists: ID列表
- data_model_node_id: 数据模型节点ID
-
- Returns:
- None
- """
- # 构建meta_id和resouce_id的列表
- resouce_ids = [record['resource_id'] for record in id_lists]
- meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']]
- metaData = [record['data_standard'] for id_list in id_lists for record in id_list['metaData']]
-
- # 创建与meta_node的关系 组成关系
- if meta_ids:
- query = """
- MATCH (source:data_model), (target:meta_node)
- WHERE id(source)=$source_id AND id(target) IN $target_ids
- MERGE (source)-[:component]->(target)
- """
- with neo4j_driver.get_session() as session:
- session.run(query, source_id=data_model_node_id, target_ids=meta_ids)
- # 创建与data_resource的关系 资源关系
- if resouce_ids:
- query = """
- MATCH (source:data_model), (target:data_resource)
- WHERE id(source)=$source_id AND id(target) IN $target_ids
- MERGE (source)-[:resource]->(target)
- """
- with neo4j_driver.get_session() as session:
- session.run(query, source_id=data_model_node_id, target_ids=resouce_ids)
- # (从数据模型中选取)
- def model_handle_meta_data_model(id_lists, data_model_node_id):
- """
- 处理从数据模型中选取的数据模型与元数据的关系
-
- Args:
- id_lists: ID列表
- data_model_node_id: 数据模型节点ID
-
- Returns:
- None
- """
- # 构建meta_id和model_id的列表
- model_ids = [record['model_id'] for record in id_lists]
- meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']]
-
- # 创建与meta_node的关系 组成关系
- if meta_ids:
- query = """
- MATCH (source:data_model), (target:meta_node)
- WHERE id(source)=$source_id AND id(target) IN $target_ids
- MERGE (source)-[:component]->(target)
- """
- with neo4j_driver.get_session() as session:
- session.run(query, source_id=data_model_node_id, target_ids=meta_ids)
- # 创建与data_model的关系 模型关系
- if model_ids:
- query = """
- MATCH (source:data_model), (target:data_model)
- WHERE id(source)=$source_id AND id(target) IN $target_ids
- MERGE (source)-[:use]->(target)
- """
- with neo4j_driver.get_session() as session:
- session.run(query, source_id=data_model_node_id, target_ids=model_ids)
- # (从DDL中选取)
- def handle_no_meta_data_model(id_lists, receiver, data_model_node):
- """
- 处理从DDL中选取的没有元数据的数据模型
-
- Args:
- id_lists: ID列表
- receiver: 接收到的请求参数
- data_model_node: 数据模型节点
-
- Returns:
- None
- """
- # 构建meta_id和resouce_id的列表
- resouce_ids = [record['resource_id'] for record in id_lists]
- meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']]
-
- # 创建与data_resource的关系 资源关系
- if resouce_ids:
- query = """
- MATCH (source:data_model), (target:data_resource)
- WHERE id(source)=$source_id AND id(target) IN $target_ids
- MERGE (source)-[:resource]->(target)
- """
- with neo4j_driver.get_session() as session:
- session.run(query, source_id=data_model_node.identity, target_ids=resouce_ids)
- if meta_ids:
- meta_node_list = []
- for id in meta_ids:
- query = """
- MATCH (n)
- WHERE id(n) = $node_id
- RETURN n
- """
- result = connect_graph.run(query, node_id=id)
- if result:
- record = result.data()
- if record:
- meta_node_list.append(record[0]['n'])
-
- # 提取接收到的数据并创建meta_node节点
- meta_node = None
- resource_ids = []
-
- for item in id_lists:
- resource_id = item['resource_id']
- resource_ids.append(resource_id)
-
- for meta_item in item['metaData']:
- meta_id = meta_item['id']
- data_standard = meta_item.get('data_standard', '')
- en_name_zh = meta_item.get('en_name_zh', '')
- data_name = meta_item.get('data_name', '')
-
- # 使用传递的参数创建meta_node节点
- meta_params = {
- 'name': data_name,
- 'cn_name': en_name_zh,
- 'standard': data_standard,
- 'time': get_formatted_time()
- }
-
- # 创建meta_node节点
- meta_node = create_or_get_node('meta_node', **meta_params)
-
- # 创建与data_model的关系
- if meta_node and not relationship_exists(data_model_node, 'component', meta_node):
- connection = Relationship(data_model_node, 'component', meta_node)
- connect_graph.create(connection)
- # 数据模型详情
- def handle_id_model(model_id):
- """
- 获取数据模型详情
-
- Args:
- model_id: 数据模型ID
-
- Returns:
- 数据模型详情
- """
- model_detail_query = """
- MATCH (n:data_model) WHERE id(n) = $model_id
- RETURN n
- """
- model_detail_result = connect_graph.run(model_detail_query, model_id=model_id).data()
-
- if not model_detail_result:
- return None
-
- model_detail = model_detail_result[0]['n']
- model_info = dict(model_detail)
- model_info['id'] = model_id
-
- # 获取data_model节点连接的resource节点
- resource_query = """
- MATCH (n:data_model)-[:resource]->(r:data_resource) WHERE id(n) = $model_id
- RETURN r
- """
- resource_result = connect_graph.run(resource_query, model_id=model_id).data()
- resources = []
-
- for item in resource_result:
- resource = dict(item['r'])
- resource['id'] = item['r'].identity
- resources.append(resource)
-
- model_info['resources'] = resources
-
- # 获取data_model节点连接的component节点
- component_query = """
- MATCH (n:data_model)-[:component]->(m:meta_node) WHERE id(n) = $model_id
- RETURN m
- """
- component_result = connect_graph.run(component_query, model_id=model_id).data()
- components = []
-
- for item in component_result:
- component = dict(item['m'])
- component['id'] = item['m'].identity
- components.append(component)
-
- model_info['components'] = components
-
- # 获取data_model节点连接的use节点
- use_query = """
- MATCH (n:data_model)-[:use]->(u:data_model) WHERE id(n) = $model_id
- RETURN u
- """
- use_result = connect_graph.run(use_query, model_id=model_id).data()
- uses = []
-
- for item in use_result:
- use = dict(item['u'])
- use['id'] = item['u'].identity
- uses.append(use)
-
- model_info['uses'] = uses
-
- # 获取data_model节点连接的标签
- tag_query = """
- MATCH (n:data_model)-[:label]->(t:data_label) WHERE id(n) = $model_id
- RETURN t
- """
- tag_result = connect_graph.run(tag_query, model_id=model_id).data()
-
- if tag_result:
- tag = dict(tag_result[0]['t'])
- tag['id'] = tag_result[0]['t'].identity
- model_info['tag'] = tag
-
- return model_info
- # 数据模型列表
- def model_list(skip_count, page_size, en_name_filter=None, name_filter=None,
- category=None, tag=None, level=None):
- """
- 获取数据模型列表
-
- Args:
- skip_count: 跳过的记录数量
- page_size: 每页记录数量
- en_name_filter: 英文名称过滤条件
- name_filter: 名称过滤条件
- category: 分类过滤条件
- tag: 标签过滤条件
- level: 级别过滤条件
-
- Returns:
- tuple: (数据列表, 总记录数)
- """
- # 构建查询条件
- params = {}
- where_clause = []
-
- if name_filter:
- where_clause.append("n.name =~ $name_filter")
- params['name_filter'] = f"(?i).*{name_filter}.*"
-
- if en_name_filter:
- where_clause.append("n.en_name =~ $en_name_filter")
- params['en_name_filter'] = f"(?i).*{en_name_filter}.*"
-
- if level:
- where_clause.append("n.level = $level")
- params['level'] = level
-
- if category:
- where_clause.append("n.category = $category")
- params['category'] = category
-
- # 添加tag标签查询逻辑
- if tag:
- match_clause = "MATCH (n:data_model)"
- if tag:
- match_clause += "\nMATCH (n)-[:label]->(t:data_label) WHERE id(t) = $tag_id"
- params['tag_id'] = tag
- else:
- match_clause = "MATCH (n:data_model)"
-
- # 转换为字符串形式
- where_str = " AND ".join(where_clause)
- if where_str:
- where_str = "WHERE " + where_str
-
- # 获取数据总数
- count_query = f"""
- {match_clause}
- {where_str}
- RETURN count(n) as count
- """
-
- count = connect_graph.run(count_query, **params).evaluate()
-
- # 获取分页数据
- params['skip'] = skip_count
- params['limit'] = page_size
-
- data_query = f"""
- {match_clause}
- {where_str}
- OPTIONAL MATCH (n)-[:label]->(t:data_label)
- WITH n, t
- OPTIONAL MATCH (n)-[:component]->(m:meta_node)
- RETURN
- id(n) as id,
- n.name as name,
- n.en_name as en_name,
- n.category as category,
- n.description as description,
- n.time as time,
- n.level as level,
- t.name as tag_name,
- id(t) as tag_id,
- count(m) as component_count
- ORDER BY n.time DESC
- SKIP $skip
- LIMIT $limit
- """
-
- result = connect_graph.run(data_query, **params).data()
-
- return result, count
- # 有血缘关系的数据资源列表
- def model_resource_list(skip_count, page_size, name_filter=None, id=None,
- category=None, time=None):
- """
- 获取有血缘关系的数据资源列表
-
- Args:
- skip_count: 跳过的记录数量
- page_size: 每页记录数量
- name_filter: 名称过滤条件
- id: 数据资源ID
- category: 分类过滤条件
- time: 时间过滤条件
-
- Returns:
- tuple: (数据列表, 总记录数)
- """
- # 构建查询条件
- params = {'id': id}
- where_clause = []
-
- if name_filter:
- where_clause.append("n.name =~ $name_filter")
- params['name_filter'] = f"(?i).*{name_filter}.*"
-
- if category:
- where_clause.append("n.category = $category")
- params['category'] = category
-
- if time:
- where_clause.append("n.time >= $time")
- params['time'] = time
-
- # 转换为字符串形式
- where_str = " AND ".join(where_clause)
- if where_str:
- where_str = "WHERE " + where_str
-
- # 获取数据总数
- count_query = f"""
- MATCH (search:data_resource) WHERE id(search) = $id
- MATCH (search)-[:connection]->(mn:meta_node)<-[:connection]-(n:data_resource)
- {where_str}
- RETURN count(DISTINCT n) as count
- """
-
- count = connect_graph.run(count_query, **params).evaluate()
-
- # 获取分页数据
- params['skip'] = skip_count
- params['limit'] = page_size
-
- data_query = f"""
- MATCH (search:data_resource) WHERE id(search) = $id
- MATCH (search)-[:connection]->(mn:meta_node)<-[:connection]-(n:data_resource)
- {where_str}
- WITH DISTINCT n, mn
- RETURN
- id(n) as id,
- n.name as name,
- n.en_name as en_name,
- n.category as category,
- n.description as description,
- n.time as time,
- collect({{id: id(mn), name: mn.name}}) as common_meta
- ORDER BY n.time DESC
- SKIP $skip
- LIMIT $limit
- """
-
- result = connect_graph.run(data_query, **params).data()
-
- return result, count
- # 数据模型血缘图谱
- def model_kinship_graph(nodeid, meta=False):
- """
- 获取数据模型血缘图谱
-
- Args:
- nodeid: 节点ID
- meta: 是否返回元数据
-
- Returns:
- 图谱数据
- """
- if meta:
- query = """
- MATCH p = (n:data_model)-[r:component|resource*..3]-(m)
- WHERE id(n) = $nodeId
- WITH p, relationships(p) as rels
- RETURN p
- limit 300
- """
- else:
- query = """
- MATCH p = (n:data_model)-[r:resource*..3]-(m)
- WHERE id(n) = $nodeId and labels(m) <> ['meta_node']
- WITH p, relationships(p) as rels
- RETURN p
- limit 300
- """
-
- result = connect_graph.run(query, nodeId=nodeid)
-
- nodes = set()
- relationships = set()
- nodes_by_id = {}
-
- for record in result:
- path = record["p"]
-
- for node in path.nodes:
- if node.identity not in nodes:
- node_id = str(node.identity)
- node_type = list(node.labels)[0].split('_')[1]
- node_data = {
- "id": node_id,
- "text": node.get("name", ""),
- "type": node_type
- }
-
- nodes.add(node.identity)
- nodes_by_id[node.identity] = node_data
-
- for rel in path.relationships:
- relationship_id = f"{rel.start_node.identity}-{rel.end_node.identity}"
- if relationship_id not in relationships:
- relationship_data = {
- "from": str(rel.start_node.identity),
- "to": str(rel.end_node.identity),
- "text": type(rel).__name__
- }
- relationships.add(relationship_id)
-
- # 转换为所需格式
- return {
- "nodes": list(nodes_by_id.values()),
- "edges": [{"from": rel.split("-")[0], "to": rel.split("-")[1], "text": ""}
- for rel in relationships]
- }
- # 数据模型影响图谱
- def model_impact_graph(nodeid, meta=False):
- """
- 获取数据模型影响图谱
-
- Args:
- nodeid: 节点ID
- meta: 是否返回元数据
-
- Returns:
- 图谱数据
- """
- if meta:
- query = """
- MATCH p = (n:data_model)-[r:use*..3]-(m)
- WHERE id(n) = $nodeId
- WITH p, relationships(p) as rels
- RETURN p
- limit 300
- """
- else:
- query = """
- MATCH p = (n:data_model)-[r:use*..3]-(m)
- WHERE id(n) = $nodeId
- WITH p, relationships(p) as rels
- RETURN p
- limit 300
- """
-
- result = connect_graph.run(query, nodeId=nodeid)
-
- nodes = set()
- relationships = set()
- nodes_by_id = {}
-
- for record in result:
- path = record["p"]
-
- for node in path.nodes:
- if node.identity not in nodes:
- node_id = str(node.identity)
- node_type = list(node.labels)[0].split('_')[1]
- node_data = {
- "id": node_id,
- "text": node.get("name", ""),
- "type": node_type
- }
-
- nodes.add(node.identity)
- nodes_by_id[node.identity] = node_data
-
- for rel in path.relationships:
- relationship_id = f"{rel.start_node.identity}-{rel.end_node.identity}"
- if relationship_id not in relationships:
- relationship_data = {
- "from": str(rel.start_node.identity),
- "to": str(rel.end_node.identity),
- "text": type(rel).__name__
- }
- relationships.add(relationship_id)
-
- # 转换为所需格式
- return {
- "nodes": list(nodes_by_id.values()),
- "edges": [{"from": rel.split("-")[0], "to": rel.split("-")[1], "text": ""}
- for rel in relationships]
- }
- # 数据模型全部图谱
- def model_all_graph(nodeid, meta=False):
- """
- 获取数据模型全部图谱
-
- Args:
- nodeid: 节点ID
- meta: 是否返回元数据
-
- Returns:
- 图谱数据
- """
- if meta:
- query = """
- MATCH p = (n:data_model)-[r*..3]-(m)
- WHERE id(n) = $nodeId
- WITH p, relationships(p) as rels
- RETURN p
- limit 300
- """
- else:
- query = """
- MATCH p = (n:data_model)-[r*..3]-(m)
- WHERE id(n) = $nodeId and labels(m) <> ['meta_node']
- WITH p, relationships(p) as rels
- RETURN p
- limit 300
- """
-
- result = connect_graph.run(query, nodeId=nodeid)
-
- nodes = set()
- relationships = set()
- nodes_by_id = {}
-
- for record in result:
- path = record["p"]
-
- for node in path.nodes:
- if node.identity not in nodes:
- node_id = str(node.identity)
- node_type = list(node.labels)[0].split('_')[1]
- node_data = {
- "id": node_id,
- "text": node.get("name", ""),
- "type": node_type
- }
-
- nodes.add(node.identity)
- nodes_by_id[node.identity] = node_data
-
- for rel in path.relationships:
- relationship_id = f"{rel.start_node.identity}-{rel.end_node.identity}"
- if relationship_id not in relationships:
- relationship_data = {
- "from": str(rel.start_node.identity),
- "to": str(rel.end_node.identity),
- "text": type(rel).__name__
- }
- relationships.add(relationship_id)
-
- # 转换为所需格式
- return {
- "nodes": list(nodes_by_id.values()),
- "edges": [{"from": rel.split("-")[0], "to": rel.split("-")[1], "text": ""}
- for rel in relationships]
- }
- # 更新数据模型
- def data_model_edit(receiver):
- """
- 更新数据模型
-
- Args:
- receiver: 接收到的请求参数
-
- Returns:
- 更新结果
- """
- id = receiver.get('id')
- name = receiver.get('name')
- en_name = receiver.get('en_name')
- category = receiver.get('category')
- description = receiver.get('description')
- tag = receiver.get('tag')
-
- # 更新数据模型节点
- query = """
- MATCH (n:data_model) WHERE id(n) = $id
- SET n.name = $name, n.en_name = $en_name, n.category = $category, n.description = $description
- RETURN n
- """
-
- result = connect_graph.run(query, id=id, name=name, en_name=en_name,
- category=category, description=description).data()
-
- # 处理标签关系
- if tag:
- # 先删除所有标签关系
- delete_query = """
- MATCH (n:data_model)-[r:label]->() WHERE id(n) = $id
- DELETE r
- """
- connect_graph.run(delete_query, id=id)
-
- # 再创建新的标签关系
- tag_node = get_node_by_id('data_label', tag)
- if tag_node:
- model_node = get_node_by_id_no_label(id)
- if model_node and not relationship_exists(model_node, 'label', tag_node):
- connection = Relationship(model_node, 'label', tag_node)
- connect_graph.create(connection)
-
- return {"message": "数据模型更新成功"}
|