""" 数据模型核心业务逻辑模块 本模块包含了数据模型相关的所有核心业务逻辑函数,包括: - 数据模型的创建、更新、删除 - 数据模型与数据资源、元数据之间的关系处理 - 数据模型血缘关系管理 - 数据模型图谱生成 - 数据模型层级计算等功能 """ import math import threading from concurrent.futures import ThreadPoolExecutor import pandas as pd from py2neo import Relationship import logging import json # Configure logger logger = logging.getLogger(__name__) from app.core.graph.graph_operations import relationship_exists from app.core.graph.graph_operations import connect_graph,create_or_get_node,get_node from app.services.neo4j_driver import neo4j_driver from app.core.meta_data import get_formatted_time, handle_id_unstructured from app.core.common import delete_relationships, update_or_create_node, get_node_by_id_no_label from app.core.data_resource.resource import get_node_by_id, serialize_node_properties # 根据child关系计算数据模型当前的level自动保存 def calculate_model_level(id): """ 根据child关系计算数据模型当前的level并自动保存 Args: id: 数据模型的节点ID(整数) Returns: None """ # 确保id是整数类型 node_id = int(id) if id is not None else None cql = """ MATCH (start_node:DataModel) WHERE id(start_node) = $nodeId CALL { WITH start_node OPTIONAL MATCH path = (start_node)-[:child*]->(end_node) RETURN length(path) AS level } WITH coalesce(max(level), 0) AS max_level RETURN max_level """ with connect_graph().session() as session: result = session.run(cql, nodeId=node_id) record = result.single() data = record["max_level"] if record and "max_level" in record else 0 # 更新level属性 update_query = """ MATCH (n:DataModel) WHERE id(n) = $nodeId SET n.level = $level RETURN n """ with connect_graph().session() as session: session.run(update_query, nodeId=node_id, level=data) # 处理数据模型血缘关系 def handle_model_relation(resource_ids): """ 处理数据模型血缘关系 Args: resource_ids: 数据资源ID Returns: 血缘关系数据 """ query = """ MATCH (search:DataResource)-[:connection]->(common_node:meta_node)<-[:connection]-(connect:DataResource) WHERE id(search) = $resource_Ids WITH search, connect, common_node MATCH (search)-[:connection]->(search_node:meta_node) WITH search, connect, common_node, collect(DISTINCT id(search_node)) AS search_nodes MATCH (connect)-[:connection]->(connect_node:meta_node) WITH search, connect, common_node, search_nodes, collect(DISTINCT id(connect_node)) AS connect_nodes WITH search, connect, search_nodes, connect_nodes, collect(DISTINCT id(common_node)) AS common_nodes // 剔除 search_nodes 和 connect_nodes 中包含在 common_nodes 中的内容 WITH search, connect, common_nodes, [node IN search_nodes WHERE NOT node IN common_nodes] AS filtered_search_nodes, [node IN connect_nodes WHERE NOT node IN common_nodes] AS filtered_connect_nodes RETURN id(connect) as blood_resources, common_nodes, filtered_search_nodes as origin_nodes, filtered_connect_nodes as blood_nodes """ with connect_graph().session() as session: result = session.run(query, resource_Ids=resource_ids) return result.data() # 创建一个数据模型节点 def handle_data_model(data_model, result_list, result, receiver): """ 创建一个数据模型节点 Args: data_model: 数据模型名称 result_list: 数据模型英文名列表 result: 序列化的ID列表 receiver: 接收到的请求参数 Returns: tuple: (id, data_model_node) """ try: # 添加数据资源 血缘关系的字段 blood_resource data_model_en = result_list[0] if result_list and len(result_list) > 0 else "" receiver['id_list'] = result add_attribute = { 'time': get_formatted_time(), 'en_name': data_model_en } receiver.update(add_attribute) data_model_node = get_node('DataModel', name=data_model) or create_or_get_node('DataModel', **receiver) logger.info(f"通过查询或创建节点获得节点ID111,data_model_node: {data_model_node}") # 获取节点ID,确保我们能安全地访问节点ID node_id = data_model_node if hasattr(data_model_node, 'id'): logger.info(f"通过节点ID获取节点ID222,data_model_node: {data_model_node}") node_id = data_model_node.id else: logger.info(f"通过查询节点名称获取节点ID333,data_model_node: {data_model_node}") # 如果节点没有id属性,尝试通过查询获取 query = """ MATCH (n:DataModel {name: $name}) RETURN id(n) as node_id """ with connect_graph().session() as session: result = session.run(query, name=data_model) record = result.single() logger.info(f"通过查询节点名称获取节点ID444,record: {record}") if record and "node_id" in record: logger.info(f"通过查询节点名称获取节点ID555,record: {record}") node_id = record["node_id"] # 安全地处理子节点关系 child_list = receiver.get('childrenId', []) for child_id in child_list: child_node = get_node_by_id_no_label(child_id) if child_node: # 直接使用Cypher查询检查关系是否存在 with connect_graph().session() as session: rel_query = """ MATCH (a)-[r:child]->(b) WHERE id(a) = $start_id AND id(b) = $end_id RETURN count(r) > 0 as exists """ rel_result = session.run(rel_query, start_id=int(node_id), end_id=int(child_node.id)).single() # 如果关系不存在,则创建关系 if not (rel_result and rel_result["exists"]): session.execute_write( lambda tx: tx.run( "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:child]->(b)", a_id=int(node_id), b_id=int(child_node.id) ) ) # 根据传入参数id,和数据标签建立关系 if receiver.get('tag'): tag = get_node_by_id('DataLabel', receiver['tag']) if tag: # 直接使用Cypher查询检查关系是否存在 with connect_graph().session() as session: rel_query = """ MATCH (a)-[r:LABEL]->(b) WHERE id(a) = $start_id AND id(b) = $end_id RETURN count(r) > 0 as exists """ rel_result = session.run(rel_query, start_id=int(node_id), end_id=int(tag.id)).single() # 如果关系不存在,则创建关系 if not (rel_result and rel_result["exists"]): session.execute_write( lambda tx: tx.run( "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:LABEL]->(b)", a_id=int(node_id), b_id=int(tag.id) ) ) return node_id, data_model_node except Exception as e: logging.error(f"Error in handle_data_model: {str(e)}") raise # (从数据资源中选取) def resource_handle_meta_data_model(id_lists, data_model_node_id): """ 处理从数据资源中选取的数据模型与元数据的关系 Args: id_lists: ID列表 data_model_node_id: 数据模型节点ID Returns: None """ try: logger.info(f"开始处理数据模型与元数据的关系,数据模型ID: {data_model_node_id}") # 构建meta_id和resouce_id的列表 resouce_ids = [record['resource_id'] for record in id_lists] meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']] logger.info(f"资源ID列表: {resouce_ids}") logger.info(f"元数据ID列表: {meta_ids}") # 创建与meta_node的关系 组成关系 if meta_ids: logger.info("开始创建数据模型与元数据的关系") query = """ MATCH (source:DataModel), (target:DataMeta) WHERE id(source)=$source_id AND id(target) IN $target_ids MERGE (source)-[:INCLUDES]->(target) RETURN count(*) as count """ with connect_graph().session() as session: result = session.run(query, source_id=data_model_node_id, target_ids=meta_ids) count = result.single()["count"] logger.info(f"成功创建 {count} 个数据模型与元数据的关系") # 创建与DataResource的关系 资源关系 # 不在创建Modle时创建资源关系,将资源关系创建放在数据流程创建时处理 # 关系名称为DERIVED_FROM # commented by mxl 2025-06-27 # # if resouce_ids: # logger.info("开始创建数据模型与数据资源的关系") # query = """ # MATCH (source:DataModel), (target:DataResource) # WHERE id(source)=$source_id AND id(target) IN $target_ids # MERGE (source)-[:DERIVED_FROM]->(target) # RETURN count(*) as count # """ # with connect_graph().session() as session: # result = session.run(query, source_id=data_model_node_id, target_ids=resouce_ids) # count = result.single()["count"] # logger.info(f"成功创建 {count} 个数据模型与数据资源的关系") except Exception as e: logger.error(f"处理数据模型与元数据的关系时发生错误: {str(e)}") raise # (从数据模型中选取) def model_handle_meta_data_model(id_lists, data_model_node_id): """ 处理从数据模型中选取的数据模型与元数据的关系 Args: id_lists: ID列表 data_model_node_id: 数据模型节点ID Returns: None """ # 构建meta_id和model_id的列表 model_ids = [record['model_id'] for record in id_lists] meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']] # 创建与meta_node的关系 组成关系 if meta_ids: query = """ MATCH (source:DataModel), (target:DataMeta) WHERE id(source)=$source_id AND id(target) IN $target_ids MERGE (source)-[:component]->(target) """ with neo4j_driver.get_session() as session: session.run(query, source_id=data_model_node_id, target_ids=meta_ids) # 创建与data_model的关系 模型关系 if model_ids: query = """ MATCH (source:DataModel), (target:DataModel) WHERE id(source)=$source_id AND id(target) IN $target_ids MERGE (source)-[:use]->(target) """ with neo4j_driver.get_session() as session: session.run(query, source_id=data_model_node_id, target_ids=model_ids) # (从DDL中选取) def handle_no_meta_data_model(id_lists, receiver, data_model_node): """ 处理从DDL中选取的没有元数据的数据模型 Args: id_lists: ID列表 receiver: 接收到的请求参数 data_model_node: 数据模型节点 Returns: None """ # 构建meta_id和resouce_id的列表 resouce_ids = [record['resource_id'] for record in id_lists] meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']] # 获取数据模型节点ID data_model_node_id = None if hasattr(data_model_node, 'id'): data_model_node_id = data_model_node.id else: # 如果节点没有id属性,尝试通过查询获取 query = """ MATCH (n:DataModel {name: $name}) RETURN id(n) as node_id """ with connect_graph().session() as session: result = session.run(query, name=data_model_node.get('name')) record = result.single() if record: data_model_node_id = record["node_id"] if not data_model_node_id: return # 创建与DataResource的关系 资源关系 if resouce_ids: query = """ MATCH (source:DataModel), (target:DataResource) WHERE id(source)=$source_id AND id(target) IN $target_ids MERGE (source)-[:resource]->(target) """ with connect_graph().session() as session: session.run(query, source_id=data_model_node_id, target_ids=resouce_ids) if meta_ids: meta_node_list = [] for id in meta_ids: query = """ MATCH (n) WHERE id(n) = $node_id RETURN n """ with connect_graph().session() as session: result = session.run(query, node_id=id) if result: record = result.data() if record: meta_node_list.append(record[0]['n']) # 提取接收到的数据并创建meta_node节点 meta_node = None resource_ids = [] for item in id_lists: resource_id = item['resource_id'] resource_ids.append(resource_id) for meta_item in item['metaData']: meta_id = meta_item['id'] data_standard = meta_item.get('data_standard', '') en_name_zh = meta_item.get('en_name_zh', '') data_name = meta_item.get('data_name', '') # 使用传递的参数创建meta_node节点 meta_params = { 'name': data_name, 'cn_name': en_name_zh, 'standard': data_standard, 'time': get_formatted_time() } # 创建meta_node节点 meta_node = create_or_get_node('DataMeta', **meta_params) # 获取数据模型节点ID dm_id = data_model_node_id if data_model_node_id is not None else data_model_node if meta_node: # 直接使用Cypher查询检查关系是否存在 with connect_graph().session() as session: rel_query = """ MATCH (a)-[r:component]->(b) WHERE id(a) = $start_id AND id(b) = $end_id RETURN count(r) > 0 as exists """ rel_result = session.run(rel_query, start_id=int(dm_id), end_id=int(meta_node)).single() # 如果关系不存在,则创建关系 if not (rel_result and rel_result["exists"]): session.execute_write( lambda tx: tx.run( "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:component]->(b)", a_id=int(dm_id), b_id=int(meta_node) ) ) # 数据模型-详情接口 def handle_id_model(id): """ 获取数据模型详情 Args: id: 数据模型的节点ID Returns: dict: 包含数据模型详情的字典,格式为: {"data_model": { "resource_selected": [...], "leader": ..., "origin": ..., "frequency": ..., "childrenId": [...], "organization": ..., "name": ..., "en_name": ..., "data_sensitivity": ..., "describe": ..., "tag": ..., "time": ..., "category": ..., "status": ... }} """ node_id = id cql = """ MATCH (n:DataModel) WHERE id(n) = $nodeId OPTIONAL MATCH (n)-[:INCLUDES]->(meta:DataMeta) OPTIONAL MATCH (n)-[:DERIVED_FROM]->(resource:DataResource) OPTIONAL MATCH (n)-[:LABEL]->(tag:DataLabel) OPTIONAL MATCH (uses:model_use)-[:use]->(n) OPTIONAL MATCH (n)-[:has_component]->(component) WITH n, collect(DISTINCT meta) as meta_nodes, collect(DISTINCT resource) as resources, collect(DISTINCT component) as components, collect(DISTINCT uses) as uses, collect(DISTINCT tag) as tags, CASE WHEN n.childrenId IS NOT NULL THEN n.childrenId ELSE [] END as children RETURN { // 基本信息 id: id(n), name: n.name, en_name: n.en_name, time: n.time, describe: n.describe, category: n.category, level: n.level, tag: CASE WHEN size(tags) > 0 AND tags[0] IS NOT NULL THEN {id: id(tags[0]), name: tags[0].name} ELSE null END, // 添加其他必需字段 leader: n.leader, origin: n.origin, blood_resource: n.blood_resource, frequency: n.frequency, organization: n.organization, data_sensitivity: n.data_sensitivity, status: n.status, // 子节点列表 childrenId: children } AS result, // 资源列表 [{ data_resource: [resource IN resources WHERE resource IS NOT NULL | { id: id(resource), name: resource.name, en_name: resource.en_name, description: resource.description }], resource_id: [resource IN resources WHERE resource IS NOT NULL | id(resource)], meta_ids: [meta IN meta_nodes WHERE meta IS NOT NULL | { id: id(meta), name: meta.name, en_name: meta.en_name, data_type: meta.data_type }] }] AS resource_selected """ with connect_graph().session() as session: result = session.run(cql, nodeId=node_id) # 处理查询结果 record = result.single() logging.info(f"获得查询结果---------->>>{record}") if record: # 获取基本属性和资源选择列表 properties = record["result"] resource_selected = record["resource_selected"] # 确保所有必需字段都有默认值,避免空值 required_fields = ['tag', 'leader', 'origin', 'blood_resource', 'frequency', 'describe', 'organization', 'name', 'en_name', 'data_sensitivity', 'time', 'category', 'status', 'childrenId'] for field in required_fields: if field not in properties or properties[field] is None: if field == 'tag': properties[field] = {} elif field == 'childrenId': properties[field] = [] else: properties[field] = "" # 构建最终返回格式 final_data = { "resource_selected": resource_selected, **properties } return {"data_model": final_data} else: # 如果没有查询到结果,返回空的结构 return {"data_model": { "resource_selected": [{"meta_ids": [], "data_resource": None, "resource_id": None}], "leader": None, "origin": None, "frequency": None, "childrenId": [], "organization": None, "name": None, "en_name": None, "data_sensitivity": None, "describe": None, "tag": {}, "time": None, "category": None, "status": None }} # 数据模型列表 def model_list(skip_count, page_size, en_name_filter=None, name_filter=None, category=None, tag=None, level=None): """ 获取数据模型列表 Args: skip_count: 跳过的数量 page_size: 页面大小 en_name_filter: 英文名称过滤条件 name_filter: 名称过滤条件 category: 类别过滤条件 tag: 标签过滤条件 level: 层级过滤条件 Returns: tuple: (数据模型列表, 总数量) """ try: # 构建where子句 - 只针对DataModel节点的过滤条件 datamodel_where_clause = [] params = {} if name_filter is not None: datamodel_where_clause.append("n.name =~ $name") params['name'] = f".*{name_filter}.*" if en_name_filter is not None: datamodel_where_clause.append("n.en_name =~ $en_name") params['en_name'] = f".*{en_name_filter}.*" if category is not None: datamodel_where_clause.append("n.category = $category") params['category'] = category if level is not None: datamodel_where_clause.append("n.level = $level") params['level'] = level # 处理标签查询 if tag is not None: # 确保tag参数是整数类型 try: tag_id = int(tag) params['tag'] = tag_id except (ValueError, TypeError): logger.warning(f"Invalid tag parameter: {tag}, expected integer") return [], 0 # 有标签查询条件时,需要确保标签关系存在 match_clause = "MATCH (n:DataModel)-[:LABEL]->(t)" datamodel_where_clause.append("id(t) = $tag") else: # 没有标签查询条件时,先匹配DataModel,然后可选连接标签 match_clause = "MATCH (n:DataModel)" # 构建DataModel节点的WHERE子句 datamodel_where_str = " AND ".join(datamodel_where_clause) if datamodel_where_str: datamodel_where_str = f"WHERE {datamodel_where_str}" # 构建查询 with connect_graph().session() as session: # 计算总数量 if tag is not None: # 有标签查询时,直接使用标签连接 count_query = f""" {match_clause} {datamodel_where_str} RETURN COUNT(DISTINCT n) AS count """ else: # 无标签查询时,只计算DataModel节点 count_query = f""" MATCH (n:DataModel) {datamodel_where_str} RETURN COUNT(n) AS count """ logger.debug(f"Count query: {count_query}") logger.debug(f"Query parameters: {params}") count_result = session.run(count_query, **params) count_record = count_result.single() total = count_record['count'] if count_record else 0 # 查询数据 - 修复OPTIONAL MATCH的笛卡尔积问题 if tag is not None: # 有标签查询时,直接使用标签连接 query = f""" {match_clause} {datamodel_where_str} RETURN DISTINCT id(n) as id, n.name as name, n.en_name as en_name, n.time as time, n.describe as describe, n.level as level, n.category as category, n.status as status, n.leader as leader, n.origin as origin, n.blood_resource as blood_resource, n.organization as organization, id(t) as tag_id, t.name as tag_name ORDER BY time DESC SKIP $skip LIMIT $limit """ else: # 无标签查询时,先过滤DataModel节点,然后可选连接标签 query = f""" MATCH (n:DataModel) {datamodel_where_str} WITH n OPTIONAL MATCH (n)-[:LABEL]->(t) RETURN id(n) as id, n.name as name, n.en_name as en_name, n.time as time, n.describe as describe, n.level as level, n.category as category, n.status as status, n.leader as leader, n.origin as origin, n.blood_resource as blood_resource, n.organization as organization, id(t) as tag_id, t.name as tag_name ORDER BY n.time DESC SKIP $skip LIMIT $limit """ logger.debug(f"Main query: {query}") result = session.run(query, skip=skip_count, limit=page_size, **params) # 处理结果 data = [] for record in result: item = { "id": record['id'], "name": record['name'], "en_name": record['en_name'], "time": record['time'], "describe": record['describe'], "category": record['category'], "status": record['status'], "leader": record['leader'], "origin": record['origin'], "blood_resource": record['blood_resource'], "organization": record['organization'], "level": record['level'], "tag": {"id": record['tag_id'], "name": record['tag_name']} if record['tag_id'] is not None else None } data.append(item) logger.info(f"Query returned {len(data)} items out of {total} total") return data, total except Exception as e: logger.error(f"Error in model_list: {str(e)}") import traceback traceback.print_exc() return [], 0 # 有血缘关系的数据资源列表 def model_resource_list(skip_count, page_size, name_filter=None, id=None, category=None, time=None): """ 获取数据模型相关的数据资源列表 Args: skip_count: 跳过的数量 page_size: 页面大小 name_filter: 名称过滤条件 id: 数据模型ID category: 类别过滤条件 time: 时间过滤条件 Returns: tuple: (数据资源列表, 总数量) """ try: # 构建基础查询 base_query = """ MATCH (n:DataModel) WHERE id(n) = $nodeId MATCH (n)-[:children]->(m:DataResource) """ # 计算总数量 count_query = base_query + """ RETURN COUNT(m) as count """ with connect_graph().session() as session: # 执行计数查询 count_result = session.run(count_query, nodeId=id) count_record = count_result.single() total = count_record['count'] if count_record else 0 # 使用分页和筛选条件构建主查询 main_query = base_query + """ MATCH (m)-[:LABEL]->(l) WHERE id(n) = $nodeId and labels(m) <> ['DataMeta'] RETURN m.name as name, m.en_name as en_name, id(m) as id, l.name as label, m.time as time, m.description as description, m.category as category ORDER BY m.time DESC SKIP $skip LIMIT $limit """ # 执行主查询 result = session.run(main_query, nodeId=id, skip=skip_count, limit=page_size) # 处理结果 data = [] for record in result: item = { "name": record['name'], "en_name": record['en_name'], "id": record['id'], "label": record['label'], "time": record['time'], "description": record['description'], "category": record['category'] } data.append(item) return data, total except Exception as e: print(f"Error in model_resource_list: {str(e)}") import traceback traceback.print_exc() return [], 0 # 数据模型血缘图谱 def model_kinship_graph(nodeid, meta=False): """ 生成数据模型的血缘关系图谱 按照DERIVED_FROM关系进行递归查找,从当前节点作为起点查找所有DERIVED_FROM关系指向的节点 Args: nodeid: 节点ID meta: 是否包含元数据 Returns: dict: 包含节点和连线信息的图谱数据 """ try: with connect_graph().session() as session: # 确保nodeid为整数 try: nodeid_int = int(nodeid) except (ValueError, TypeError): logger.error(f"节点ID不是有效的整数: {nodeid}") return {"nodes": [], "lines": []} # 查询起始模型节点是否存在 start_node_query = """ MATCH (n:DataModel) WHERE id(n) = $nodeId RETURN n """ start_result = session.run(start_node_query, nodeId=nodeid_int) start_record = start_result.single() if not start_record: logger.error(f"未找到ID为{nodeid_int}的DataModel节点") return {"nodes": [], "lines": []} # 递归查找DERIVED_FROM关系 cypher = """ MATCH (start:DataModel) WHERE id(start) = $nodeId MATCH path = (start)-[:DERIVED_FROM*0..]->(target) WHERE target:DataResource OR target:DataModel RETURN path """ result = session.run(cypher, nodeId=nodeid_int) # 收集节点和关系 nodes = {} lines = {} for record in result: # 处理路径 path = record['path'] logger.debug(f"处理路径,长度: {len(path)}, 节点数: {len(path.nodes)}, 关系数: {len(path.relationships)}") # 处理路径中的所有节点 for node in path.nodes: node_id = int(node.id) # 直接转换为整数 if node_id not in nodes: node_dict = serialize_node_properties(node) node_dict["id"] = str(node_id) node_dict["node_type"] = list(node.labels)[0] if node.labels else "" nodes[node_id] = node_dict logger.debug(f"添加节点: ID={node_id}, 标签={list(node.labels)}") # 处理路径中的所有关系 for rel in path.relationships: rel_id = int(rel.id) # 直接转换为整数 if rel_id not in lines: rel_dict = { "id": str(rel_id), "from": str(int(rel.start_node.id)), "to": str(int(rel.end_node.id)), "text": rel.type } lines[rel_id] = rel_dict logger.debug(f"添加关系: ID={rel_id}, 类型={rel.type}, 从{int(rel.start_node.id)}到{int(rel.end_node.id)}") # 如果需要元数据,查询INCLUDES关系 if meta: meta_cypher = """ MATCH (start:DataModel)-[r:INCLUDES]->(meta:DataMeta) WHERE id(start) = $nodeId RETURN start, r, meta """ meta_result = session.run(meta_cypher, nodeId=nodeid_int) for meta_record in meta_result: start_node = meta_record['start'] rel = meta_record['r'] meta_node = meta_record['meta'] # 添加元数据节点 meta_node_id = int(meta_node.id) if meta_node_id not in nodes: node_dict = serialize_node_properties(meta_node) node_dict["id"] = str(meta_node_id) node_dict["node_type"] = list(meta_node.labels)[0] if meta_node.labels else "" nodes[meta_node_id] = node_dict # 添加INCLUDES关系 rel_id = int(rel.id) if rel_id not in lines: rel_dict = { "id": str(rel_id), "from": str(nodeid_int), "to": str(meta_node_id), "text": rel.type } lines[rel_id] = rel_dict logger.info(f"成功获取血缘关系图谱,ID: {nodeid_int}, 节点数: {len(nodes)}, 关系数: {len(lines)}") return { "nodes": list(nodes.values()), "lines": list(lines.values()) } except Exception as e: logger.error(f"获取数据模型血缘关系图谱失败: {str(e)}") import traceback logger.error(f"错误详情: {traceback.format_exc()}") return {"nodes": [], "lines": []} # 数据模型影响图谱 def model_impact_graph(nodeid, meta=False): """ 生成数据模型的影响关系图谱 按照DERIVED_FROM关系进行递归查找,从当前节点作为终点查找所有指向这个终点的节点 Args: nodeid: 节点ID meta: 是否包含元数据 Returns: dict: 包含节点和连线信息的图谱数据 """ try: with connect_graph().session() as session: # 确保nodeid为整数 try: nodeid_int = int(nodeid) except (ValueError, TypeError): logger.error(f"节点ID不是有效的整数: {nodeid}") return {"nodes": [], "lines": []} # 查询起始模型节点是否存在 start_node_query = """ MATCH (n:DataModel) WHERE id(n) = $nodeId RETURN n """ start_result = session.run(start_node_query, nodeId=nodeid_int) start_record = start_result.single() if not start_record: logger.error(f"未找到ID为{nodeid_int}的DataModel节点") return {"nodes": [], "lines": []} # 递归查找指向当前节点的DERIVED_FROM关系 cypher = """ MATCH (target:DataModel) WHERE id(target) = $nodeId MATCH path = (source)-[:DERIVED_FROM*0..]->(target) WHERE source:DataResource OR source:DataModel RETURN path """ result = session.run(cypher, nodeId=nodeid_int) # 收集节点和关系 nodes = {} lines = {} for record in result: # 处理路径 path = record['path'] logger.debug(f"处理影响路径,长度: {len(path)}, 节点数: {len(path.nodes)}, 关系数: {len(path.relationships)}") # 处理路径中的所有节点 for node in path.nodes: node_id = int(node.id) # 直接转换为整数 if node_id not in nodes: node_dict = serialize_node_properties(node) node_dict["id"] = str(node_id) node_dict["node_type"] = list(node.labels)[0] if node.labels else "" nodes[node_id] = node_dict logger.debug(f"添加影响节点: ID={node_id}, 标签={list(node.labels)}") # 处理路径中的所有关系 for rel in path.relationships: rel_id = int(rel.id) # 直接转换为整数 if rel_id not in lines: rel_dict = { "id": str(rel_id), "from": str(int(rel.start_node.id)), "to": str(int(rel.end_node.id)), "text": rel.type } lines[rel_id] = rel_dict logger.debug(f"添加影响关系: ID={rel_id}, 类型={rel.type}, 从{int(rel.start_node.id)}到{int(rel.end_node.id)}") # 如果需要元数据,查询INCLUDES关系 if meta: meta_cypher = """ MATCH (target:DataModel)-[r:INCLUDES]->(meta:DataMeta) WHERE id(target) = $nodeId RETURN target, r, meta """ meta_result = session.run(meta_cypher, nodeId=nodeid_int) for meta_record in meta_result: target_node = meta_record['target'] rel = meta_record['r'] meta_node = meta_record['meta'] # 添加元数据节点 meta_node_id = int(meta_node.id) if meta_node_id not in nodes: node_dict = serialize_node_properties(meta_node) node_dict["id"] = str(meta_node_id) node_dict["node_type"] = list(meta_node.labels)[0] if meta_node.labels else "" nodes[meta_node_id] = node_dict # 添加INCLUDES关系 rel_id = int(rel.id) if rel_id not in lines: rel_dict = { "id": str(rel_id), "from": str(nodeid_int), "to": str(meta_node_id), "text": rel.type } lines[rel_id] = rel_dict logger.info(f"成功获取影响关系图谱,ID: {nodeid_int}, 节点数: {len(nodes)}, 关系数: {len(lines)}") return { "nodes": list(nodes.values()), "lines": list(lines.values()) } except Exception as e: logger.error(f"获取数据模型影响关系图谱失败: {str(e)}") import traceback logger.error(f"错误详情: {traceback.format_exc()}") return {"nodes": [], "lines": []} # 数据模型全部图谱 def model_all_graph(nodeid, meta=False): """ 生成数据模型的所有关系图谱 分别调用model_impact_graph查找影响关系,调用model_kinship_graph查找血缘关系, 然后合并两部分数据返回 Args: nodeid: 节点ID meta: 是否包含元数据 Returns: dict: 包含节点和连线信息的图谱数据 """ try: # 获取血缘关系图谱 kinship_data = model_kinship_graph(nodeid, meta) # 获取影响关系图谱 impact_data = model_impact_graph(nodeid, meta) # 合并节点数据,使用字典去重 merged_nodes = {} merged_lines = {} # 添加血缘关系的节点和连线 if kinship_data and 'nodes' in kinship_data: for node in kinship_data['nodes']: node_id = node.get('id') if node_id: merged_nodes[node_id] = node if kinship_data and 'lines' in kinship_data: for line in kinship_data['lines']: line_id = line.get('id') if line_id: merged_lines[line_id] = line # 添加影响关系的节点和连线 if impact_data and 'nodes' in impact_data: for node in impact_data['nodes']: node_id = node.get('id') if node_id: merged_nodes[node_id] = node if impact_data and 'lines' in impact_data: for line in impact_data['lines']: line_id = line.get('id') if line_id: merged_lines[line_id] = line # 构建最终结果 result = { "nodes": list(merged_nodes.values()), "lines": list(merged_lines.values()) } logger.info(f"成功获取完整关系图谱,ID: {nodeid}, 节点数: {len(merged_nodes)}, 关系数: {len(merged_lines)}") return result except Exception as e: logger.error(f"获取数据模型完整关系图谱失败: {str(e)}") return {"nodes": [], "lines": []} # 更新数据模型 def data_model_edit(receiver): """ 更新数据模型 Args: receiver: 接收到的请求参数 Returns: 更新结果 """ id = receiver.get('id') name = receiver.get('name') en_name = receiver.get('en_name') category = receiver.get('category') describe = receiver.get('describe') tag = receiver.get('tag') frequency = receiver.get('frequency') leader = receiver.get('leader') organization = receiver.get('organization') status = bool(receiver.get('status')) if receiver.get('status') is not None else None meta_data = receiver.get('metaData', []) # 更新数据模型节点 - 添加新的字段 query = """ MATCH (n:DataModel) WHERE id(n) = $id SET n.name = $name, n.en_name = $en_name, n.category = $category, n.describe = $describe, n.frequency = $frequency, n.leader = $leader, n.organization = $organization, n.status = $status, n.updateTime = $update_time RETURN n """ update_time = get_formatted_time() with connect_graph().session() as session: result = session.run(query, id=id, name=name, en_name=en_name, category=category, describe=describe, frequency=frequency, leader=leader, organization=organization, status=status, update_time=update_time).data() # 处理标签关系 if tag: # 先删除所有标签关系 delete_query = """ MATCH (n:DataModel)-[r:LABEL]->() WHERE id(n) = $id DELETE r """ with connect_graph().session() as session: session.run(delete_query, id=id) # 再创建新的标签关系 tag_node = get_node_by_id('DataLabel', tag) if tag_node: model_node = get_node_by_id_no_label(id) if model_node: # 获取节点ID model_id = model_node.id if hasattr(model_node, 'id') else model_node tag_id = tag_node.id if hasattr(tag_node, 'id') else tag_node # 直接使用Cypher查询检查关系是否存在 with connect_graph().session() as session: rel_query = """ MATCH (a)-[r:LABEL]->(b) WHERE id(a) = $start_id AND id(b) = $end_id RETURN count(r) > 0 as exists """ rel_result = session.run(rel_query, start_id=int(model_id), end_id=int(tag_id)).single() # 如果关系不存在,则创建关系 if not (rel_result and rel_result["exists"]): session.execute_write( lambda tx: tx.run( "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:LABEL]->(b)", a_id=int(model_id), b_id=int(tag_id) ) ) # 处理DataMeta节点关系更新 with connect_graph().session() as session: # 先删除DataModel关联的所有DataMeta关系 delete_meta_query = """ MATCH (n:DataModel)-[r:INCLUDES]->(m:DataMeta) WHERE id(n) = $id DELETE r """ session.run(delete_meta_query, id=id) logger.info(f"已删除DataModel({id})的所有DataMeta关系") # 根据上传的metaData数据是否有值来决定是否重新构建INCLUDES关系 if meta_data: # 根据上传的metaData数据重新构建INCLUDES关系 for meta_item in meta_data: meta_id = meta_item.get('id') if meta_id: try: meta_id = int(meta_id) # 验证DataMeta节点是否存在 check_meta_query = """ MATCH (m:DataMeta) WHERE id(m) = $meta_id RETURN m """ meta_result = session.run(check_meta_query, meta_id=meta_id) if meta_result.single(): # 创建INCLUDES关系 create_includes_query = """ MATCH (n:DataModel), (m:DataMeta) WHERE id(n) = $model_id AND id(m) = $meta_id CREATE (n)-[:INCLUDES]->(m) RETURN n, m """ session.run(create_includes_query, model_id=id, meta_id=meta_id) logger.info(f"成功创建INCLUDES关系: DataModel({id}) -> DataMeta({meta_id})") else: logger.warning(f"DataMeta节点不存在,ID: {meta_id}") except (ValueError, TypeError) as e: logger.error(f"无效的meta_id: {meta_id}, 错误: {str(e)}") else: logger.info(f"meta_data为空,不需要重新创建INCLUDES关系,DataModel({id})将不关联任何DataMeta节点") return {"message": "数据模型更新成功"} def model_community(tag=None): """ 查询DataModel的所有节点及DERIVED_FROM关系 Args: tag: 可选的标签ID,如果指定则只查找有该标签的DataModel节点 Returns: dict: 包含节点和连线信息的图谱数据,格式与model_kinship_graph相同 """ try: with connect_graph().session() as session: # 构建查询条件 if tag is not None: # 确保tag参数是整数类型 try: tag_id = int(tag) except (ValueError, TypeError): logger.warning(f"Invalid tag parameter: {tag}, expected integer") return {"nodes": [], "lines": []} # 有标签查询条件时,查询有指定标签的DataModel节点及其DERIVED_FROM关系 cypher = """ MATCH (dm:DataModel)-[:LABEL]->(t) WHERE id(t) = $tag_id WITH dm MATCH path = (dm)-[:DERIVED_FROM*0..]->(target:DataModel) RETURN path UNION MATCH (dm:DataModel)-[:LABEL]->(t) WHERE id(t) = $tag_id WITH dm MATCH path = (source:DataModel)-[:DERIVED_FROM*0..]->(dm) RETURN path """ result = session.run(cypher, tag_id=tag_id) else: # 没有标签查询条件时,查询所有DataModel节点及其DERIVED_FROM关系 cypher = """ MATCH (dm:DataModel) WITH dm MATCH path = (dm)-[:DERIVED_FROM*0..]->(target:DataModel) RETURN path UNION MATCH (dm:DataModel) WITH dm MATCH path = (source:DataModel)-[:DERIVED_FROM*0..]->(dm) RETURN path """ result = session.run(cypher) # 收集节点和关系 nodes = {} lines = {} for record in result: # 处理路径 path = record['path'] logger.debug(f"处理社区路径,长度: {len(path)}, 节点数: {len(path.nodes)}, 关系数: {len(path.relationships)}") # 处理路径中的所有节点 for node in path.nodes: node_id = int(node.id) # 直接转换为整数 if node_id not in nodes: node_dict = serialize_node_properties(node) node_dict["id"] = str(node_id) node_dict["node_type"] = list(node.labels)[0] if node.labels else "" nodes[node_id] = node_dict logger.debug(f"添加社区节点: ID={node_id}, 标签={list(node.labels)}") # 处理路径中的所有关系 for rel in path.relationships: rel_id = int(rel.id) # 直接转换为整数 if rel_id not in lines: rel_dict = { "id": str(rel_id), "from": str(int(rel.start_node.id)), "to": str(int(rel.end_node.id)), "text": rel.type } lines[rel_id] = rel_dict logger.debug(f"添加社区关系: ID={rel_id}, 类型={rel.type}, 从{int(rel.start_node.id)}到{int(rel.end_node.id)}") logger.info(f"成功获取数据模型社区图谱,标签ID: {tag}, 节点数: {len(nodes)}, 关系数: {len(lines)}") return { "nodes": list(nodes.values()), "lines": list(lines.values()) } except Exception as e: logger.error(f"获取数据模型社区图谱失败: {str(e)}") import traceback logger.error(f"错误详情: {traceback.format_exc()}") return {"nodes": [], "lines": []} def model_search_list(model_id, page, page_size, en_name_filter=None, name_filter=None, category_filter=None, tag_filter=None): """获取特定数据模型关联的元数据列表""" try: with connect_graph().session() as session: # 确保model_id为整数 try: model_id_int = int(model_id) except (ValueError, TypeError): logger.error(f"模型ID不是有效的整数: {model_id}") return [], 0 # 基本匹配语句 - 支持DataMeta和Metadata标签 match_clause = """ MATCH (n:DataModel)-[:INCLUDES]->(m) WHERE id(n) = $model_id AND (m:DataMeta OR m:Metadata) """ where_conditions = [] if en_name_filter: where_conditions.append(f"m.en_name CONTAINS '{en_name_filter}'") if name_filter: where_conditions.append(f"m.name CONTAINS '{name_filter}'") if category_filter: where_conditions.append(f"m.category = '{category_filter}'") # 标签过滤需要额外的匹配 tag_match = "" if tag_filter: tag_match = "MATCH (m)-[:HAS_TAG]->(t:Tag) WHERE t.name = $tag_filter" where_clause = " AND " + " AND ".join(where_conditions) if where_conditions else "" # 计算总数 count_cypher = f""" {match_clause}{where_clause} {tag_match} RETURN count(m) as count """ count_params = {"model_id": model_id_int} if tag_filter: count_params["tag_filter"] = tag_filter count_result = session.run(count_cypher, **count_params) total_count = count_result.single()["count"] # 分页查询 skip = (page - 1) * page_size cypher = f""" {match_clause}{where_clause} {tag_match} RETURN m ORDER BY m.name SKIP {skip} LIMIT {page_size} """ result = session.run(cypher, **count_params) # 格式化结果 metadata_list = [] for record in result: meta = serialize_node_properties(record["m"]) meta["id"] = record["m"].id metadata_list.append(meta) logger.info(f"成功获取数据模型关联元数据,ID: {model_id_int}, 元数据数量: {total_count}") return metadata_list, total_count except Exception as e: logger.error(f"获取数据模型关联的元数据列表失败: {str(e)}") return [], 0