""" 数据模型核心业务逻辑模块 本模块包含了数据模型相关的所有核心业务逻辑函数,包括: - 数据模型的创建、更新、删除 - 数据模型与数据资源、元数据之间的关系处理 - 数据模型血缘关系管理 - 数据模型图谱生成 - 数据模型层级计算等功能 """ import math import threading from concurrent.futures import ThreadPoolExecutor import pandas as pd from py2neo import Relationship import logging import json from app.services.package_function import create_or_get_node, relationship_exists, get_node from app.core.graph.graph_operations import connect_graph from app.services.neo4j_driver import neo4j_driver from app.core.meta_data import get_formatted_time, handle_id_unstructured from app.core.common import delete_relationships, update_or_create_node, get_node_by_id_no_label from app.core.data_resource.resource import get_node_by_id # 根据child关系计算数据模型当前的level自动保存 def calculate_model_level(id): """ 根据child关系计算数据模型当前的level并自动保存 Args: id: 数据模型的节点ID Returns: None """ cql = """ MATCH (start_node:data_model {id: $nodeId}) CALL { WITH start_node OPTIONAL MATCH path = (start_node)-[:child*]->(end_node) RETURN length(path) AS level } WITH coalesce(max(level), 0) AS max_level RETURN max_level """ data = connect_graph.run(cql, nodeId=id).evaluate() # 更新level属性 update_query = """ MATCH (n:data_model {id: $nodeId}) SET n.level = $level RETURN n """ connect_graph.run(update_query, nodeId=id, level=data) # 处理数据模型血缘关系 def handle_model_relation(resource_ids): """ 处理数据模型血缘关系 Args: resource_ids: 数据资源ID Returns: 血缘关系数据 """ query = """ MATCH (search:data_resource)-[:connection]->(common_node:meta_node)<-[:connection]-(connect:data_resource) WHERE id(search) = $resource_Ids WITH search, connect, common_node MATCH (search)-[:connection]->(search_node:meta_node) WITH search, connect, common_node, collect(DISTINCT id(search_node)) AS search_nodes MATCH (connect)-[:connection]->(connect_node:meta_node) WITH search, connect, common_node, search_nodes, collect(DISTINCT id(connect_node)) AS connect_nodes WITH search, connect, search_nodes, connect_nodes, collect(DISTINCT id(common_node)) AS common_nodes // 剔除 search_nodes 和 connect_nodes 中包含在 common_nodes 中的内容 WITH search, connect, common_nodes, [node IN search_nodes WHERE NOT node IN common_nodes] AS filtered_search_nodes, [node IN connect_nodes WHERE NOT node IN common_nodes] AS filtered_connect_nodes RETURN id(connect) as blood_resources, common_nodes, filtered_search_nodes as origin_nodes, filtered_connect_nodes as blood_nodes """ result = connect_graph.run(query, resource_Ids=resource_ids) return result.data() # 创建一个数据模型节点 def handle_data_model(data_model, result_list, result, receiver): """ 创建一个数据模型节点 Args: data_model: 数据模型名称 result_list: 数据模型英文名列表 result: 序列化的ID列表 receiver: 接收到的请求参数 Returns: tuple: (id, data_model_node) """ # 添加数据资源 血缘关系的字段 blood_resource data_model_en = result_list[0] receiver['id_list'] = result add_attribute = { 'time': get_formatted_time(), 'en_name': data_model_en } receiver.update(add_attribute) data_model_node = get_node('data_model', name=data_model) or create_or_get_node('data_model', **receiver) child_list = receiver['childrenId'] for child_id in child_list: child = get_node_by_id_no_label(child_id) # 建立关系:当前节点的childrenId指向,以及关系child res = relationship_exists(data_model_node, 'child', child) if child and not res: connect_graph.create(Relationship(data_model_node, 'child', child)) # 根据传入参数id,和数据标签建立关系 if receiver['tag']: # 使用 Cypher 查询通过 id 查找节点 tag = get_node_by_id('data_label', receiver['tag']) if tag and not relationship_exists(data_model_node, 'label', tag): connection = Relationship(data_model_node, 'label', tag) connect_graph.create(connection) id = data_model_node.identity return id, data_model_node # (从数据资源中选取) def resource_handle_meta_data_model(id_lists, data_model_node_id): """ 处理从数据资源中选取的数据模型与元数据的关系 Args: id_lists: ID列表 data_model_node_id: 数据模型节点ID Returns: None """ # 构建meta_id和resouce_id的列表 resouce_ids = [record['resource_id'] for record in id_lists] meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']] metaData = [record['data_standard'] for id_list in id_lists for record in id_list['metaData']] # 创建与meta_node的关系 组成关系 if meta_ids: query = """ MATCH (source:data_model), (target:meta_node) WHERE id(source)=$source_id AND id(target) IN $target_ids MERGE (source)-[:component]->(target) """ with neo4j_driver.get_session() as session: session.run(query, source_id=data_model_node_id, target_ids=meta_ids) # 创建与data_resource的关系 资源关系 if resouce_ids: query = """ MATCH (source:data_model), (target:data_resource) WHERE id(source)=$source_id AND id(target) IN $target_ids MERGE (source)-[:resource]->(target) """ with neo4j_driver.get_session() as session: session.run(query, source_id=data_model_node_id, target_ids=resouce_ids) # (从数据模型中选取) def model_handle_meta_data_model(id_lists, data_model_node_id): """ 处理从数据模型中选取的数据模型与元数据的关系 Args: id_lists: ID列表 data_model_node_id: 数据模型节点ID Returns: None """ # 构建meta_id和model_id的列表 model_ids = [record['model_id'] for record in id_lists] meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']] # 创建与meta_node的关系 组成关系 if meta_ids: query = """ MATCH (source:data_model), (target:meta_node) WHERE id(source)=$source_id AND id(target) IN $target_ids MERGE (source)-[:component]->(target) """ with neo4j_driver.get_session() as session: session.run(query, source_id=data_model_node_id, target_ids=meta_ids) # 创建与data_model的关系 模型关系 if model_ids: query = """ MATCH (source:data_model), (target:data_model) WHERE id(source)=$source_id AND id(target) IN $target_ids MERGE (source)-[:use]->(target) """ with neo4j_driver.get_session() as session: session.run(query, source_id=data_model_node_id, target_ids=model_ids) # (从DDL中选取) def handle_no_meta_data_model(id_lists, receiver, data_model_node): """ 处理从DDL中选取的没有元数据的数据模型 Args: id_lists: ID列表 receiver: 接收到的请求参数 data_model_node: 数据模型节点 Returns: None """ # 构建meta_id和resouce_id的列表 resouce_ids = [record['resource_id'] for record in id_lists] meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']] # 创建与data_resource的关系 资源关系 if resouce_ids: query = """ MATCH (source:data_model), (target:data_resource) WHERE id(source)=$source_id AND id(target) IN $target_ids MERGE (source)-[:resource]->(target) """ with neo4j_driver.get_session() as session: session.run(query, source_id=data_model_node.identity, target_ids=resouce_ids) if meta_ids: meta_node_list = [] for id in meta_ids: query = """ MATCH (n) WHERE id(n) = $node_id RETURN n """ result = connect_graph.run(query, node_id=id) if result: record = result.data() if record: meta_node_list.append(record[0]['n']) # 提取接收到的数据并创建meta_node节点 meta_node = None resource_ids = [] for item in id_lists: resource_id = item['resource_id'] resource_ids.append(resource_id) for meta_item in item['metaData']: meta_id = meta_item['id'] data_standard = meta_item.get('data_standard', '') en_name_zh = meta_item.get('en_name_zh', '') data_name = meta_item.get('data_name', '') # 使用传递的参数创建meta_node节点 meta_params = { 'name': data_name, 'cn_name': en_name_zh, 'standard': data_standard, 'time': get_formatted_time() } # 创建meta_node节点 meta_node = create_or_get_node('meta_node', **meta_params) # 创建与data_model的关系 if meta_node and not relationship_exists(data_model_node, 'component', meta_node): connection = Relationship(data_model_node, 'component', meta_node) connect_graph.create(connection) # 数据模型详情 def handle_id_model(model_id): """ 获取数据模型详情 Args: model_id: 数据模型ID Returns: 数据模型详情 """ model_detail_query = """ MATCH (n:data_model) WHERE id(n) = $model_id RETURN n """ model_detail_result = connect_graph.run(model_detail_query, model_id=model_id).data() if not model_detail_result: return None model_detail = model_detail_result[0]['n'] model_info = dict(model_detail) model_info['id'] = model_id # 获取data_model节点连接的resource节点 resource_query = """ MATCH (n:data_model)-[:resource]->(r:data_resource) WHERE id(n) = $model_id RETURN r """ resource_result = connect_graph.run(resource_query, model_id=model_id).data() resources = [] for item in resource_result: resource = dict(item['r']) resource['id'] = item['r'].identity resources.append(resource) model_info['resources'] = resources # 获取data_model节点连接的component节点 component_query = """ MATCH (n:data_model)-[:component]->(m:meta_node) WHERE id(n) = $model_id RETURN m """ component_result = connect_graph.run(component_query, model_id=model_id).data() components = [] for item in component_result: component = dict(item['m']) component['id'] = item['m'].identity components.append(component) model_info['components'] = components # 获取data_model节点连接的use节点 use_query = """ MATCH (n:data_model)-[:use]->(u:data_model) WHERE id(n) = $model_id RETURN u """ use_result = connect_graph.run(use_query, model_id=model_id).data() uses = [] for item in use_result: use = dict(item['u']) use['id'] = item['u'].identity uses.append(use) model_info['uses'] = uses # 获取data_model节点连接的标签 tag_query = """ MATCH (n:data_model)-[:label]->(t:data_label) WHERE id(n) = $model_id RETURN t """ tag_result = connect_graph.run(tag_query, model_id=model_id).data() if tag_result: tag = dict(tag_result[0]['t']) tag['id'] = tag_result[0]['t'].identity model_info['tag'] = tag return model_info # 数据模型列表 def model_list(skip_count, page_size, en_name_filter=None, name_filter=None, category=None, tag=None, level=None): """ 获取数据模型列表 Args: skip_count: 跳过的记录数量 page_size: 每页记录数量 en_name_filter: 英文名称过滤条件 name_filter: 名称过滤条件 category: 分类过滤条件 tag: 标签过滤条件 level: 级别过滤条件 Returns: tuple: (数据列表, 总记录数) """ # 构建查询条件 params = {} match_clause = "MATCH (n:data_model)" where_clause = [] if tag: match_clause = "MATCH (n:data_model)-[:label]->(t:data_label)" where_clause.append("id(t) = $tag") params['tag'] = int(tag) if name_filter: where_clause.append("n.name =~ $name_filter") params['name_filter'] = f"(?i).*{name_filter}.*" if en_name_filter: where_clause.append("n.en_name =~ $en_name_filter") params['en_name_filter'] = f"(?i).*{en_name_filter}.*" if category: where_clause.append("n.category = $category") params['category'] = category if level: where_clause.append("n.level = $level") params['level'] = level # 转换为字符串形式 where_str = " AND ".join(where_clause) if where_str: where_str = "WHERE " + where_str # 获取数据总数 count_query = f""" {match_clause} {where_str} RETURN count(n) as count """ # 使用正确的session方式执行查询 driver = connect_graph() if not driver: return [], 0 with driver.session() as session: count_result = session.run(count_query, **params) count = count_result.single()["count"] # 获取分页数据 params['skip'] = skip_count params['limit'] = page_size data_query = f""" {match_clause} {where_str} OPTIONAL MATCH (n)-[:label]->(t:data_label) WITH n, t OPTIONAL MATCH (n)-[:component]->(m:meta_node) RETURN id(n) as id, n.name as name, n.en_name as en_name, n.category as category, n.description as description, n.time as time, n.level as level, t.name as tag_name, id(t) as tag_id, count(m) as component_count ORDER BY n.time DESC SKIP $skip LIMIT $limit """ result = session.run(data_query, **params) data = result.data() return data, count # 有血缘关系的数据资源列表 def model_resource_list(skip_count, page_size, name_filter=None, id=None, category=None, time=None): """ 获取有血缘关系的数据资源列表 Args: skip_count: 跳过的记录数量 page_size: 每页记录数量 name_filter: 名称过滤条件 id: 数据资源ID category: 分类过滤条件 time: 时间过滤条件 Returns: tuple: (数据列表, 总记录数) """ # 构建查询条件 params = {'id': id} where_clause = [] if name_filter: where_clause.append("n.name =~ $name_filter") params['name_filter'] = f"(?i).*{name_filter}.*" if category: where_clause.append("n.category = $category") params['category'] = category if time: where_clause.append("n.time >= $time") params['time'] = time # 转换为字符串形式 where_str = " AND ".join(where_clause) if where_str: where_str = "WHERE " + where_str # 获取数据总数 count_query = f""" MATCH (search:data_resource) WHERE id(search) = $id MATCH (search)-[:connection]->(mn:meta_node)<-[:connection]-(n:data_resource) {where_str} RETURN count(DISTINCT n) as count """ count = connect_graph.run(count_query, **params).evaluate() # 获取分页数据 params['skip'] = skip_count params['limit'] = page_size data_query = f""" MATCH (search:data_resource) WHERE id(search) = $id MATCH (search)-[:connection]->(mn:meta_node)<-[:connection]-(n:data_resource) {where_str} WITH DISTINCT n, mn RETURN id(n) as id, n.name as name, n.en_name as en_name, n.category as category, n.description as description, n.time as time, collect({{id: id(mn), name: mn.name}}) as common_meta ORDER BY n.time DESC SKIP $skip LIMIT $limit """ result = connect_graph.run(data_query, **params).data() return result, count # 数据模型血缘图谱 def model_kinship_graph(nodeid, meta=False): """ 获取数据模型血缘图谱 Args: nodeid: 节点ID meta: 是否返回元数据 Returns: 图谱数据 """ if meta: query = """ MATCH p = (n:data_model)-[r:component|resource*..3]-(m) WHERE id(n) = $nodeId WITH p, relationships(p) as rels RETURN p limit 300 """ else: query = """ MATCH p = (n:data_model)-[r:resource*..3]-(m) WHERE id(n) = $nodeId and labels(m) <> ['meta_node'] WITH p, relationships(p) as rels RETURN p limit 300 """ result = connect_graph.run(query, nodeId=nodeid) nodes = set() relationships = set() nodes_by_id = {} for record in result: path = record["p"] for node in path.nodes: if node.identity not in nodes: node_id = str(node.identity) node_type = list(node.labels)[0].split('_')[1] node_data = { "id": node_id, "text": node.get("name", ""), "type": node_type } nodes.add(node.identity) nodes_by_id[node.identity] = node_data for rel in path.relationships: relationship_id = f"{rel.start_node.identity}-{rel.end_node.identity}" if relationship_id not in relationships: relationship_data = { "from": str(rel.start_node.identity), "to": str(rel.end_node.identity), "text": type(rel).__name__ } relationships.add(relationship_id) # 转换为所需格式 return { "nodes": list(nodes_by_id.values()), "edges": [{"from": rel.split("-")[0], "to": rel.split("-")[1], "text": ""} for rel in relationships] } # 数据模型影响图谱 def model_impact_graph(nodeid, meta=False): """ 获取数据模型影响图谱 Args: nodeid: 节点ID meta: 是否返回元数据 Returns: 图谱数据 """ if meta: query = """ MATCH p = (n:data_model)-[r:use*..3]-(m) WHERE id(n) = $nodeId WITH p, relationships(p) as rels RETURN p limit 300 """ else: query = """ MATCH p = (n:data_model)-[r:use*..3]-(m) WHERE id(n) = $nodeId WITH p, relationships(p) as rels RETURN p limit 300 """ result = connect_graph.run(query, nodeId=nodeid) nodes = set() relationships = set() nodes_by_id = {} for record in result: path = record["p"] for node in path.nodes: if node.identity not in nodes: node_id = str(node.identity) node_type = list(node.labels)[0].split('_')[1] node_data = { "id": node_id, "text": node.get("name", ""), "type": node_type } nodes.add(node.identity) nodes_by_id[node.identity] = node_data for rel in path.relationships: relationship_id = f"{rel.start_node.identity}-{rel.end_node.identity}" if relationship_id not in relationships: relationship_data = { "from": str(rel.start_node.identity), "to": str(rel.end_node.identity), "text": type(rel).__name__ } relationships.add(relationship_id) # 转换为所需格式 return { "nodes": list(nodes_by_id.values()), "edges": [{"from": rel.split("-")[0], "to": rel.split("-")[1], "text": ""} for rel in relationships] } # 数据模型全部图谱 def model_all_graph(nodeid, meta=False): """ 获取数据模型全部图谱 Args: nodeid: 节点ID meta: 是否返回元数据 Returns: 图谱数据 """ if meta: query = """ MATCH p = (n:data_model)-[r*..3]-(m) WHERE id(n) = $nodeId WITH p, relationships(p) as rels RETURN p limit 300 """ else: query = """ MATCH p = (n:data_model)-[r*..3]-(m) WHERE id(n) = $nodeId and labels(m) <> ['meta_node'] WITH p, relationships(p) as rels RETURN p limit 300 """ result = connect_graph.run(query, nodeId=nodeid) nodes = set() relationships = set() nodes_by_id = {} for record in result: path = record["p"] for node in path.nodes: if node.identity not in nodes: node_id = str(node.identity) node_type = list(node.labels)[0].split('_')[1] node_data = { "id": node_id, "text": node.get("name", ""), "type": node_type } nodes.add(node.identity) nodes_by_id[node.identity] = node_data for rel in path.relationships: relationship_id = f"{rel.start_node.identity}-{rel.end_node.identity}" if relationship_id not in relationships: relationship_data = { "from": str(rel.start_node.identity), "to": str(rel.end_node.identity), "text": type(rel).__name__ } relationships.add(relationship_id) # 转换为所需格式 return { "nodes": list(nodes_by_id.values()), "edges": [{"from": rel.split("-")[0], "to": rel.split("-")[1], "text": ""} for rel in relationships] } # 更新数据模型 def data_model_edit(receiver): """ 更新数据模型 Args: receiver: 接收到的请求参数 Returns: 更新结果 """ id = receiver.get('id') name = receiver.get('name') en_name = receiver.get('en_name') category = receiver.get('category') description = receiver.get('description') tag = receiver.get('tag') # 更新数据模型节点 query = """ MATCH (n:data_model) WHERE id(n) = $id SET n.name = $name, n.en_name = $en_name, n.category = $category, n.description = $description RETURN n """ result = connect_graph.run(query, id=id, name=name, en_name=en_name, category=category, description=description).data() # 处理标签关系 if tag: # 先删除所有标签关系 delete_query = """ MATCH (n:data_model)-[r:label]->() WHERE id(n) = $id DELETE r """ connect_graph.run(delete_query, id=id) # 再创建新的标签关系 tag_node = get_node_by_id('data_label', tag) if tag_node: model_node = get_node_by_id_no_label(id) if model_node and not relationship_exists(model_node, 'label', tag_node): connection = Relationship(model_node, 'label', tag_node) connect_graph.create(connection) return {"message": "数据模型更新成功"}