""" 数据模型核心业务逻辑模块 本模块包含了数据模型相关的所有核心业务逻辑函数,包括: - 数据模型的创建、更新、删除 - 数据模型与数据资源、元数据之间的关系处理 - 数据模型血缘关系管理 - 数据模型图谱生成 - 数据模型层级计算等功能 """ import math import threading from concurrent.futures import ThreadPoolExecutor import pandas as pd from py2neo import Relationship import logging import json from app.core.graph.graph_operations import relationship_exists from app.core.graph.graph_operations import connect_graph,create_or_get_node,get_node from app.services.neo4j_driver import neo4j_driver from app.core.meta_data import get_formatted_time, handle_id_unstructured from app.core.common import delete_relationships, update_or_create_node, get_node_by_id_no_label from app.core.data_resource.resource import get_node_by_id # 根据child关系计算数据模型当前的level自动保存 def calculate_model_level(id): """ 根据child关系计算数据模型当前的level并自动保存 Args: id: 数据模型的节点ID(整数) Returns: None """ # 确保id是整数类型 node_id = int(id) if id is not None else None cql = """ MATCH (start_node:data_model) WHERE id(start_node) = $nodeId CALL { WITH start_node OPTIONAL MATCH path = (start_node)-[:child*]->(end_node) RETURN length(path) AS level } WITH coalesce(max(level), 0) AS max_level RETURN max_level """ with connect_graph().session() as session: result = session.run(cql, nodeId=node_id) record = result.single() data = record["max_level"] if record and "max_level" in record else 0 # 更新level属性 update_query = """ MATCH (n:data_model) WHERE id(n) = $nodeId SET n.level = $level RETURN n """ with connect_graph().session() as session: session.run(update_query, nodeId=node_id, level=data) # 处理数据模型血缘关系 def handle_model_relation(resource_ids): """ 处理数据模型血缘关系 Args: resource_ids: 数据资源ID Returns: 血缘关系数据 """ query = """ MATCH (search:data_resource)-[:connection]->(common_node:meta_node)<-[:connection]-(connect:data_resource) WHERE id(search) = $resource_Ids WITH search, connect, common_node MATCH (search)-[:connection]->(search_node:meta_node) WITH search, connect, common_node, collect(DISTINCT id(search_node)) AS search_nodes MATCH (connect)-[:connection]->(connect_node:meta_node) WITH search, connect, common_node, search_nodes, collect(DISTINCT id(connect_node)) AS connect_nodes WITH search, connect, search_nodes, connect_nodes, collect(DISTINCT id(common_node)) AS common_nodes // 剔除 search_nodes 和 connect_nodes 中包含在 common_nodes 中的内容 WITH search, connect, common_nodes, [node IN search_nodes WHERE NOT node IN common_nodes] AS filtered_search_nodes, [node IN connect_nodes WHERE NOT node IN common_nodes] AS filtered_connect_nodes RETURN id(connect) as blood_resources, common_nodes, filtered_search_nodes as origin_nodes, filtered_connect_nodes as blood_nodes """ with connect_graph().session() as session: result = session.run(query, resource_Ids=resource_ids) return result.data() # 创建一个数据模型节点 def handle_data_model(data_model, result_list, result, receiver): """ 创建一个数据模型节点 Args: data_model: 数据模型名称 result_list: 数据模型英文名列表 result: 序列化的ID列表 receiver: 接收到的请求参数 Returns: tuple: (id, data_model_node) """ try: # 添加数据资源 血缘关系的字段 blood_resource data_model_en = result_list[0] if result_list and len(result_list) > 0 else "" receiver['id_list'] = result add_attribute = { 'time': get_formatted_time(), 'en_name': data_model_en } receiver.update(add_attribute) data_model_node = get_node('data_model', name=data_model) or create_or_get_node('data_model', **receiver) # 安全地处理子节点关系 child_list = receiver.get('childrenId', []) for child_id in child_list: child_node = get_node_by_id_no_label(child_id) if child_node and not relationship_exists(data_model_node, 'child', child_node): with connect_graph().session() as session: session.execute_write( lambda tx: tx.run( "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:child]->(b)", a_id=data_model_node.id, b_id=child_node.id ) ) # 根据传入参数id,和数据标签建立关系 if receiver.get('tag'): tag = get_node_by_id('data_label', receiver['tag']) if tag and not relationship_exists(data_model_node, 'label', tag): with connect_graph().session() as session: session.execute_write( lambda tx: tx.run( "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:label]->(b)", a_id=data_model_node.id, b_id=tag.id ) ) # 获取节点ID node_id = None if hasattr(data_model_node, 'id'): node_id = data_model_node.id else: # 如果节点没有identity属性,尝试通过查询获取 query = """ MATCH (n:data_model {name: $name}) RETURN id(n) as node_id """ with connect_graph().session() as session: result = session.run(query, name=data_model) record = result.single() if record and "node_id" in record: node_id = record["node_id"] return node_id, data_model_node except Exception as e: logging.error(f"Error in handle_data_model: {str(e)}") raise # (从数据资源中选取) def resource_handle_meta_data_model(id_lists, data_model_node_id): """ 处理从数据资源中选取的数据模型与元数据的关系 Args: id_lists: ID列表 data_model_node_id: 数据模型节点ID Returns: None """ # 构建meta_id和resouce_id的列表 resouce_ids = [record['resource_id'] for record in id_lists] meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']] metaData = [record['data_standard'] for id_list in id_lists for record in id_list['metaData']] # 创建与meta_node的关系 组成关系 if meta_ids: query = """ MATCH (source:data_model), (target:meta_node) WHERE id(source)=$source_id AND id(target) IN $target_ids MERGE (source)-[:component]->(target) """ with neo4j_driver.get_session() as session: session.run(query, source_id=data_model_node_id, target_ids=meta_ids) # 创建与data_resource的关系 资源关系 if resouce_ids: query = """ MATCH (source:data_model), (target:data_resource) WHERE id(source)=$source_id AND id(target) IN $target_ids MERGE (source)-[:resource]->(target) """ with neo4j_driver.get_session() as session: session.run(query, source_id=data_model_node_id, target_ids=resouce_ids) # (从数据模型中选取) def model_handle_meta_data_model(id_lists, data_model_node_id): """ 处理从数据模型中选取的数据模型与元数据的关系 Args: id_lists: ID列表 data_model_node_id: 数据模型节点ID Returns: None """ # 构建meta_id和model_id的列表 model_ids = [record['model_id'] for record in id_lists] meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']] # 创建与meta_node的关系 组成关系 if meta_ids: query = """ MATCH (source:data_model), (target:meta_node) WHERE id(source)=$source_id AND id(target) IN $target_ids MERGE (source)-[:component]->(target) """ with neo4j_driver.get_session() as session: session.run(query, source_id=data_model_node_id, target_ids=meta_ids) # 创建与data_model的关系 模型关系 if model_ids: query = """ MATCH (source:data_model), (target:data_model) WHERE id(source)=$source_id AND id(target) IN $target_ids MERGE (source)-[:use]->(target) """ with neo4j_driver.get_session() as session: session.run(query, source_id=data_model_node_id, target_ids=model_ids) # (从DDL中选取) def handle_no_meta_data_model(id_lists, receiver, data_model_node): """ 处理从DDL中选取的没有元数据的数据模型 Args: id_lists: ID列表 receiver: 接收到的请求参数 data_model_node: 数据模型节点 Returns: None """ # 构建meta_id和resouce_id的列表 resouce_ids = [record['resource_id'] for record in id_lists] meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']] # 获取数据模型节点ID data_model_node_id = None if hasattr(data_model_node, 'id'): data_model_node_id = data_model_node.id else: # 如果节点没有id属性,尝试通过查询获取 query = """ MATCH (n:data_model {name: $name}) RETURN id(n) as node_id """ with connect_graph().session() as session: result = session.run(query, name=data_model_node.get('name')) record = result.single() if record: data_model_node_id = record["node_id"] if not data_model_node_id: return # 创建与data_resource的关系 资源关系 if resouce_ids: query = """ MATCH (source:data_model), (target:data_resource) WHERE id(source)=$source_id AND id(target) IN $target_ids MERGE (source)-[:resource]->(target) """ with connect_graph().session() as session: session.run(query, source_id=data_model_node_id, target_ids=resouce_ids) if meta_ids: meta_node_list = [] for id in meta_ids: query = """ MATCH (n) WHERE id(n) = $node_id RETURN n """ with connect_graph().session() as session: result = session.run(query, node_id=id) if result: record = result.data() if record: meta_node_list.append(record[0]['n']) # 提取接收到的数据并创建meta_node节点 meta_node = None resource_ids = [] for item in id_lists: resource_id = item['resource_id'] resource_ids.append(resource_id) for meta_item in item['metaData']: meta_id = meta_item['id'] data_standard = meta_item.get('data_standard', '') en_name_zh = meta_item.get('en_name_zh', '') data_name = meta_item.get('data_name', '') # 使用传递的参数创建meta_node节点 meta_params = { 'name': data_name, 'cn_name': en_name_zh, 'standard': data_standard, 'time': get_formatted_time() } # 创建meta_node节点 meta_node = create_or_get_node('meta_node', **meta_params) # 创建与data_model的关系 if meta_node and not relationship_exists(data_model_node, 'component', meta_node): with connect_graph().session() as session: session.execute_write( lambda tx: tx.run( "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:component]->(b)", a_id=data_model_node.id, b_id=meta_node.id ) ) # 数据模型-详情接口 def handle_id_model(id): """ 获取数据模型详情 Args: id: 数据模型的节点ID Returns: dict: 包含数据模型详情的字典,格式为: {"data_model": { "resource_selected": [...], "leader": ..., "origin": ..., "frequency": ..., "childrenId": [...], "organization": ..., "name": ..., "en_name": ..., "data_sensitivity": ..., "describe": ..., "tag": ..., "time": ..., "category": ..., "status": ... }} """ node_id = id cql = """ MATCH (n:data_model) WHERE id(n) = $nodeId OPTIONAL MATCH (n)-[:connection]->(meta:meta_node) OPTIONAL MATCH (n)<-[:belongs_to]-(resource:data_resource) OPTIONAL MATCH (n)-[:label]->(tag:data_label) OPTIONAL MATCH (uses:model_use)-[:use]->(n) OPTIONAL MATCH (n)-[:has_component]->(component) WITH n, collect(DISTINCT meta) as meta_nodes, collect(DISTINCT resource) as resources, collect(DISTINCT component) as components, collect(DISTINCT uses) as uses, collect(DISTINCT tag) as tags, CASE WHEN n.childrenId IS NOT NULL THEN n.childrenId ELSE [] END as children RETURN { // 基本信息 id: id(n), name: n.name, en_name: n.en_name, time: n.time, description: n.description, describe: n.description, // 使用description作为describe字段 category: n.category, level: n.level, tag: CASE WHEN size(tags) > 0 AND tags[0] IS NOT NULL THEN {id: id(tags[0]), name: tags[0].name} ELSE null END, // 添加其他必需字段 leader: n.leader, origin: n.origin, blood_resource: n.blood_resource, frequency: n.frequency, organization: n.organization, data_sensitivity: n.data_sensitivity, status: n.status, // 子节点列表 childrenId: children } AS result, // 资源列表 [{ data_resource: [resource IN resources WHERE resource IS NOT NULL | { id: id(resource), name: resource.name, en_name: resource.en_name, description: resource.description }], resource_id: [resource IN resources WHERE resource IS NOT NULL | id(resource)], meta_ids: [meta IN meta_nodes WHERE meta IS NOT NULL | { id: id(meta), name: meta.name, en_name: meta.en_name, data_type: meta.data_type }] }] AS resource_selected """ with connect_graph().session() as session: result = session.run(cql, nodeId=node_id) # 处理查询结果 record = result.single() logging.info(f"获得查询结果---------->>>{record}") if record: # 获取基本属性和资源选择列表 properties = record["result"] resource_selected = record["resource_selected"] # 确保所有必需字段都有默认值,避免空值 required_fields = ['tag', 'description', 'leader', 'origin', 'blood_resource', 'frequency', 'describe', 'organization', 'name', 'en_name', 'data_sensitivity', 'time', 'category', 'status', 'childrenId'] for field in required_fields: if field not in properties or properties[field] is None: if field == 'tag': properties[field] = {} elif field == 'childrenId': properties[field] = [] else: properties[field] = "" # 构建最终返回格式 final_data = { "resource_selected": resource_selected, **properties } return {"data_model": final_data} else: # 如果没有查询到结果,返回空的结构 return {"data_model": { "resource_selected": [{"meta_ids": [], "data_resource": None, "resource_id": None}], "leader": None, "origin": None, "frequency": None, "childrenId": [], "organization": None, "name": None, "en_name": None, "data_sensitivity": None, "describe": None, "tag": {}, "time": None, "category": None, "status": None }} # 数据模型列表 def model_list(skip_count, page_size, en_name_filter=None, name_filter=None, category=None, tag=None, level=None): """ 获取数据模型列表 Args: skip_count: 跳过的数量 page_size: 页面大小 en_name_filter: 英文名称过滤条件 name_filter: 名称过滤条件 category: 类别过滤条件 tag: 标签过滤条件 level: 层级过滤条件 Returns: tuple: (数据模型列表, 总数量) """ try: # 构建where子句 where_clause = [] params = {} if name_filter is not None: where_clause.append("n.name =~ $name") params['name'] = f".*{name_filter}.*" if en_name_filter is not None: where_clause.append("n.en_name =~ $en_name") params['en_name'] = f".*{en_name_filter}.*" if category is not None: where_clause.append("n.category = $category") params['category'] = category if level is not None: where_clause.append("n.level = $level") params['level'] = level if tag is not None: where_clause.append("id(t) = $tag") params['tag'] = tag # At the end of where_clause construction where_str = " AND ".join(where_clause) if where_str: where_str = f"WHERE {where_str}" # 构建查询 with connect_graph().session() as session: # 计算总数量 count_query = f""" MATCH (n:data_model) OPTIONAL MATCH (n)-[:label]->(t) {where_str} RETURN COUNT(DISTINCT n) AS count """ count_result = session.run(count_query, **params) count_record = count_result.single() total = count_record['count'] if count_record else 0 # 查询数据 query = f""" MATCH (n:data_model) OPTIONAL MATCH (n)-[:label]->(t) {where_str} RETURN DISTINCT id(n) as id, n.name as name, n.en_name as en_name, n.time as time, n.description as description, n.level as level, id(t) as tag_id, t.name as tag_name ORDER BY n.time DESC SKIP $skip LIMIT $limit """ result = session.run(query, skip=skip_count, limit=page_size, **params) # 处理结果 data = [] for record in result: item = { "id": record['id'], "name": record['name'], "en_name": record['en_name'], "time": record['time'], "description": record['description'], "level": record['level'], "tag": {"id": record['tag_id'], "name": record['tag_name']} if record['tag_id'] is not None else None } data.append(item) return data, total except Exception as e: print(f"Error in model_list: {str(e)}") import traceback traceback.print_exc() return [], 0 # 有血缘关系的数据资源列表 def model_resource_list(skip_count, page_size, name_filter=None, id=None, category=None, time=None): """ 获取数据模型相关的数据资源列表 Args: skip_count: 跳过的数量 page_size: 页面大小 name_filter: 名称过滤条件 id: 数据模型ID category: 类别过滤条件 time: 时间过滤条件 Returns: tuple: (数据资源列表, 总数量) """ try: # 构建基础查询 base_query = """ MATCH (n:data_model) WHERE id(n) = $nodeId MATCH (n)-[:children]->(m:data_resource) """ # 计算总数量 count_query = base_query + """ RETURN COUNT(m) as count """ with connect_graph().session() as session: # 执行计数查询 count_result = session.run(count_query, nodeId=id) count_record = count_result.single() total = count_record['count'] if count_record else 0 # 使用分页和筛选条件构建主查询 main_query = base_query + """ MATCH (m)-[:label]->(l) WHERE id(n) = $nodeId and labels(m) <> ['meta_node'] RETURN m.name as name, m.en_name as en_name, id(m) as id, l.name as label, m.time as time, m.description as description, m.category as category ORDER BY m.time DESC SKIP $skip LIMIT $limit """ # 执行主查询 result = session.run(main_query, nodeId=id, skip=skip_count, limit=page_size) # 处理结果 data = [] for record in result: item = { "name": record['name'], "en_name": record['en_name'], "id": record['id'], "label": record['label'], "time": record['time'], "description": record['description'], "category": record['category'] } data.append(item) return data, total except Exception as e: print(f"Error in model_resource_list: {str(e)}") import traceback traceback.print_exc() return [], 0 # 数据模型血缘图谱 def model_kinship_graph(nodeid, meta=False): """ 生成数据模型的血缘关系图谱 Args: nodeid: 节点ID meta: 是否包含元数据 Returns: dict: 包含节点和连线信息的图谱数据 """ result = {} with connect_graph().session() as session: # 查询起始模型节点 start_node_query = """ MATCH (n:data_model) WHERE id(n) = $nodeId RETURN n.name as name, n.en_name as en_name """ start_result = session.run(start_node_query, nodeId=nodeid) start_record = start_result.single() if not start_record: return {"nodes": [], "lines": []} # 查询与模型关联的数据资源 resource_query = """ MATCH (n:data_model) WHERE id(n) = $nodeId MATCH p = (n)-[:children]->(resource:data_resource) RETURN resource """ resource_result = session.run(resource_query, nodeId=nodeid) nodes = [{"id": str(nodeid), "text": start_record['name'], "type": "model"}] lines = [] # 处理资源节点 for record in resource_result: if 'resource' in record: resource = record['resource'] resource_id = str(resource.id) resource_name = resource.get('name', '') resource_en_name = resource.get('en_name', '') # 创建资源节点 resource_node = { "id": resource_id, "text": resource_name, "type": "resource" } nodes.append(resource_node) # 创建资源到模型的关系 line = { "from": str(nodeid), "to": resource_id, "text": "resource" } lines.append(line) # 处理元数据节点 if meta: meta_query = """ MATCH (n:data_model) WHERE id(n) = $nodeId and labels(m) <> ['meta_node'] MATCH p = (n)-[:meta]->(meta:meta_node) RETURN meta """ meta_result = session.run(meta_query, nodeId=nodeid) for record in meta_result: if 'meta' in record: meta_node = record['meta'] meta_id = str(meta.id) meta_name = meta.get('name', '') meta_en_name = meta.get('en_name', '') # 创建元数据节点 meta_node = { "id": meta_id, "text": meta_name, "type": "meta" } nodes.append(meta_node) # 创建模型到元数据的标签关系 tag_line = { "from": str(nodeid), "to": meta_id, "text": "component" } lines.append(tag_line) # 构建结果 result = { "nodes": nodes, "lines": lines } return result # 数据模型影响图谱 def model_impact_graph(nodeid, meta=False): """ 生成数据模型的影响关系图谱 Args: nodeid: 节点ID meta: 是否包含元数据 Returns: dict: 包含节点和连线信息的图谱数据 """ result = {} with connect_graph().session() as session: # 查询起始模型节点 start_node_query = """ MATCH (n:data_model) WHERE id(n) = $nodeId RETURN n.name as name, n.en_name as en_name """ start_result = session.run(start_node_query, nodeId=nodeid) start_record = start_result.single() if not start_record: return {"nodes": [], "lines": []} # 查询影响模型的数据资源 resource_query = """ MATCH (n:data_model) WHERE id(n) = $nodeId MATCH p = (n)-[:children]->(resource:data_resource) RETURN resource """ resource_result = session.run(resource_query, nodeId=nodeid) nodes = [{"id": str(nodeid), "text": start_record['name'], "type": "model"}] lines = [] # 处理资源节点 for record in resource_result: if 'resource' in record: resource = record['resource'] resource_id = str(resource.id) resource_name = resource.get('name', '') resource_en_name = resource.get('en_name', '') # 创建资源节点 resource_node = { "id": resource_id, "text": resource_name, "type": "resource" } nodes.append(resource_node) # 创建资源到模型的关系 line = { "from": str(nodeid), "to": resource_id, "text": "resource" } lines.append(line) # 处理元数据节点 if meta: meta_query = """ MATCH (n:data_model) WHERE id(n) = $nodeId and labels(m) <> ['meta_node'] MATCH p = (n)-[:meta]->(meta:meta_node) RETURN meta """ meta_result = session.run(meta_query, nodeId=nodeid) for record in meta_result: if 'meta' in record: meta_node = record['meta'] meta_id = str(meta.id) meta_name = meta.get('name', '') meta_en_name = meta.get('en_name', '') # 创建元数据节点 meta_node = { "id": meta_id, "text": meta_name, "type": "meta" } nodes.append(meta_node) # 创建模型到元数据的标签关系 tag_line = { "from": str(nodeid), "to": meta_id, "text": "component" } lines.append(tag_line) # 构建结果 result = { "nodes": nodes, "lines": lines } return result # 数据模型全部图谱 def model_all_graph(nodeid, meta=False): """ 生成数据模型的所有关系图谱 Args: nodeid: 节点ID meta: 是否包含元数据 Returns: dict: 包含节点和连线信息的图谱数据 """ result = {} with connect_graph().session() as session: # 查询起始模型节点 start_node_query = """ MATCH (n:data_model) WHERE id(n) = $nodeId RETURN n.name as name, n.en_name as en_name """ start_result = session.run(start_node_query, nodeId=nodeid) start_record = start_result.single() if not start_record: return {"nodes": [], "lines": []} # 查询与模型关联的数据资源 resource_query = """ MATCH (n:data_model) WHERE id(n) = $nodeId MATCH p = (n)-[:children]->(resource:data_resource) RETURN resource """ resource_result = session.run(resource_query, nodeId=nodeid) # 查询与模型关联的元数据 meta_query = """ MATCH (n:data_model) WHERE id(n) = $nodeId and labels(m) <> ['meta_node'] MATCH p = (n)-[:meta]->(meta:meta_node) RETURN meta """ nodes = [{"id": str(nodeid), "text": start_record['name'], "type": "model"}] lines = [] # 处理资源节点 for record in resource_result: if 'resource' in record: resource = record['resource'] resource_id = str(resource.id) resource_name = resource.get('name', '') resource_en_name = resource.get('en_name', '') # 创建资源节点 resource_node = { "id": resource_id, "text": resource_name, "type": "resource" } nodes.append(resource_node) # 创建资源到模型的关系 line = { "from": str(nodeid), "to": resource_id, "text": "resource" } lines.append(line) # 处理元数据节点 if meta: meta_result = session.run(meta_query, nodeId=nodeid) for record in meta_result: if 'meta' in record: meta_node = record['meta'] meta_id = str(meta.id) meta_name = meta.get('name', '') meta_en_name = meta.get('en_name', '') # 创建元数据节点 meta_node = { "id": meta_id, "text": meta_name, "type": "meta" } nodes.append(meta_node) # 创建模型到元数据的标签关系 tag_line = { "from": str(nodeid), "to": meta_id, "text": "component" } lines.append(tag_line) # 构建结果 result = { "nodes": nodes, "lines": lines } return result # 更新数据模型 def data_model_edit(receiver): """ 更新数据模型 Args: receiver: 接收到的请求参数 Returns: 更新结果 """ id = receiver.get('id') name = receiver.get('name') en_name = receiver.get('en_name') category = receiver.get('category') description = receiver.get('description') tag = receiver.get('tag') # 更新数据模型节点 query = """ MATCH (n:data_model) WHERE id(n) = $id SET n.name = $name, n.en_name = $en_name, n.category = $category, n.description = $description RETURN n """ with connect_graph().session() as session: result = session.run(query, id=id, name=name, en_name=en_name, category=category, description=description).data() # 处理标签关系 if tag: # 先删除所有标签关系 delete_query = """ MATCH (n:data_model)-[r:label]->() WHERE id(n) = $id DELETE r """ with connect_graph().session() as session: session.run(delete_query, id=id) # 再创建新的标签关系 tag_node = get_node_by_id('data_label', tag) if tag_node: model_node = get_node_by_id_no_label(id) if model_node and not relationship_exists(model_node, 'label', tag_node): with connect_graph().session() as session: session.execute_write( lambda tx: tx.run( "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:label]->(b)", a_id=model_node.id, b_id=tag_node.id ) ) return {"message": "数据模型更新成功"}