|
- """
- 数据模型核心业务逻辑模块
- 本模块包含了数据模型相关的所有核心业务逻辑函数,包括:
- - 数据模型的创建、更新、删除
- - 数据模型与数据资源、元数据之间的关系处理
- - 数据模型血缘关系管理
- - 数据模型图谱生成
- - 数据模型层级计算等功能
- """
- import math
- import threading
- from concurrent.futures import ThreadPoolExecutor
- import pandas as pd
- from py2neo import Relationship
- import logging
- import json
- from app.core.graph.graph_operations import relationship_exists
- from app.core.graph.graph_operations import connect_graph,create_or_get_node,get_node
- from app.services.neo4j_driver import neo4j_driver
- from app.core.meta_data import get_formatted_time, handle_id_unstructured
- from app.core.common import delete_relationships, update_or_create_node, get_node_by_id_no_label
- from app.core.data_resource.resource import get_node_by_id
- # 根据child关系计算数据模型当前的level自动保存
- def calculate_model_level(id):
- """
- 根据child关系计算数据模型当前的level并自动保存
-
- Args:
- id: 数据模型的节点ID(字符串)
-
- Returns:
- None
- """
- # 确保id是字符串类型
- node_id = str(id) if id is not None else None
-
- cql = """
- MATCH (start_node:data_model)
- WHERE elementId(start_node) = $nodeId
- CALL {
- WITH start_node
- OPTIONAL MATCH path = (start_node)-[:child*]->(end_node)
- RETURN length(path) AS level
- }
- WITH coalesce(max(level), 0) AS max_level
- RETURN max_level
- """
-
- with connect_graph().session() as session:
- result = session.run(cql, nodeId=node_id)
- record = result.single()
- data = record["max_level"] if record and "max_level" in record else 0
-
- # 更新level属性
- update_query = """
- MATCH (n:data_model)
- WHERE elementId(n) = $nodeId
- SET n.level = $level
- RETURN n
- """
- session.run(update_query, nodeId=node_id, level=data)
- # 处理数据模型血缘关系
- def handle_model_relation(resource_ids):
- """
- 处理数据模型血缘关系
-
- Args:
- resource_ids: 数据资源ID
-
- Returns:
- 血缘关系数据
- """
- query = """
- MATCH (search:data_resource)-[:connection]->(common_node:meta_node)<-[:connection]-(connect:data_resource)
- WHERE id(search) = $resource_Ids
- WITH search, connect, common_node
- MATCH (search)-[:connection]->(search_node:meta_node)
- WITH search, connect, common_node, collect(DISTINCT id(search_node)) AS search_nodes
- MATCH (connect)-[:connection]->(connect_node:meta_node)
- WITH search, connect, common_node, search_nodes, collect(DISTINCT id(connect_node)) AS connect_nodes
- WITH search, connect, search_nodes, connect_nodes, collect(DISTINCT id(common_node)) AS common_nodes
- // 剔除 search_nodes 和 connect_nodes 中包含在 common_nodes 中的内容
- WITH search, connect, common_nodes,
- [node IN search_nodes WHERE NOT node IN common_nodes] AS filtered_search_nodes,
- [node IN connect_nodes WHERE NOT node IN common_nodes] AS filtered_connect_nodes
- RETURN id(connect) as blood_resources, common_nodes,
- filtered_search_nodes as origin_nodes, filtered_connect_nodes as blood_nodes
- """
- with connect_graph().session() as session:
- result = session.run(query, resource_Ids=resource_ids)
- return result.data()
- # 创建一个数据模型节点
- def handle_data_model(data_model, result_list, result, receiver):
- """
- 创建一个数据模型节点
-
- Args:
- data_model: 数据模型名称
- result_list: 数据模型英文名列表
- result: 序列化的ID列表
- receiver: 接收到的请求参数
-
- Returns:
- tuple: (id, data_model_node)
- """
- # 添加数据资源 血缘关系的字段 blood_resource
- data_model_en = result_list[0] if result_list and len(result_list) > 0 else ""
- receiver['id_list'] = result
- add_attribute = {
- 'time': get_formatted_time(),
- 'en_name': data_model_en
- }
- receiver.update(add_attribute)
- data_model_node = get_node('data_model', name=data_model) or create_or_get_node('data_model', **receiver)
- child_list = receiver.get('childrenId', [])
- for child_id in child_list:
- child_node = get_node_by_id_no_label(child_id)
- if child_node and not relationship_exists(data_model_node, 'child', child_node):
- with connect_graph().session() as session:
- relationship = Relationship(data_model_node, 'child', child_node)
- session.execute_write(
- lambda tx: tx.run(
- "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:child]->(b)",
- a_id=data_model_node.id, b_id=child_node.id
- )
- )
- # 根据传入参数id,和数据标签建立关系
- if receiver.get('tag'):
- # 使用 Cypher 查询通过 id 查找节点
- tag = get_node_by_id('data_label', receiver['tag'])
- if tag and not relationship_exists(data_model_node, 'label', tag):
- with connect_graph().session() as session:
- session.execute_write(
- lambda tx: tx.run(
- "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:label]->(b)",
- a_id=data_model_node.id, b_id=tag.id
- )
- )
- # 获取节点ID
- node_id = None
- if hasattr(data_model_node, 'id'):
- node_id = data_model_node.id
- else:
- # 如果节点没有id属性,尝试通过查询获取
- query = """
- MATCH (n:data_model {name: $name})
- RETURN id(n) as node_id
- """
- with connect_graph().session() as session:
- result = session.run(query, name=data_model)
- record = result.single()
- if record and "node_id" in record:
- node_id = record["node_id"]
- return node_id, data_model_node
- # (从数据资源中选取)
- def resource_handle_meta_data_model(id_lists, data_model_node_id):
- """
- 处理从数据资源中选取的数据模型与元数据的关系
-
- Args:
- id_lists: ID列表
- data_model_node_id: 数据模型节点ID
-
- Returns:
- None
- """
- # 构建meta_id和resouce_id的列表
- resouce_ids = [record['resource_id'] for record in id_lists]
- meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']]
- metaData = [record['data_standard'] for id_list in id_lists for record in id_list['metaData']]
-
- # 创建与meta_node的关系 组成关系
- if meta_ids:
- query = """
- MATCH (source:data_model), (target:meta_node)
- WHERE id(source)=$source_id AND id(target) IN $target_ids
- MERGE (source)-[:component]->(target)
- """
- with neo4j_driver.get_session() as session:
- session.run(query, source_id=data_model_node_id, target_ids=meta_ids)
- # 创建与data_resource的关系 资源关系
- if resouce_ids:
- query = """
- MATCH (source:data_model), (target:data_resource)
- WHERE id(source)=$source_id AND id(target) IN $target_ids
- MERGE (source)-[:resource]->(target)
- """
- with neo4j_driver.get_session() as session:
- session.run(query, source_id=data_model_node_id, target_ids=resouce_ids)
- # (从数据模型中选取)
- def model_handle_meta_data_model(id_lists, data_model_node_id):
- """
- 处理从数据模型中选取的数据模型与元数据的关系
-
- Args:
- id_lists: ID列表
- data_model_node_id: 数据模型节点ID
-
- Returns:
- None
- """
- # 构建meta_id和model_id的列表
- model_ids = [record['model_id'] for record in id_lists]
- meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']]
-
- # 创建与meta_node的关系 组成关系
- if meta_ids:
- query = """
- MATCH (source:data_model), (target:meta_node)
- WHERE id(source)=$source_id AND id(target) IN $target_ids
- MERGE (source)-[:component]->(target)
- """
- with neo4j_driver.get_session() as session:
- session.run(query, source_id=data_model_node_id, target_ids=meta_ids)
- # 创建与data_model的关系 模型关系
- if model_ids:
- query = """
- MATCH (source:data_model), (target:data_model)
- WHERE id(source)=$source_id AND id(target) IN $target_ids
- MERGE (source)-[:use]->(target)
- """
- with neo4j_driver.get_session() as session:
- session.run(query, source_id=data_model_node_id, target_ids=model_ids)
- # (从DDL中选取)
- def handle_no_meta_data_model(id_lists, receiver, data_model_node):
- """
- 处理从DDL中选取的没有元数据的数据模型
-
- Args:
- id_lists: ID列表
- receiver: 接收到的请求参数
- data_model_node: 数据模型节点
-
- Returns:
- None
- """
- # 构建meta_id和resouce_id的列表
- resouce_ids = [record['resource_id'] for record in id_lists]
- meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']]
-
- # 获取数据模型节点ID
- data_model_node_id = None
- if hasattr(data_model_node, 'id'):
- data_model_node_id = data_model_node.id
- else:
- # 如果节点没有id属性,尝试通过查询获取
- query = """
- MATCH (n:data_model {name: $name})
- RETURN id(n) as node_id
- """
- with connect_graph().session() as session:
- result = session.run(query, name=data_model_node.get('name'))
- record = result.single()
- if record:
- data_model_node_id = record["node_id"]
-
- if not data_model_node_id:
- return
-
- # 创建与data_resource的关系 资源关系
- if resouce_ids:
- query = """
- MATCH (source:data_model), (target:data_resource)
- WHERE id(source)=$source_id AND id(target) IN $target_ids
- MERGE (source)-[:resource]->(target)
- """
- with connect_graph().session() as session:
- session.run(query, source_id=data_model_node_id, target_ids=resouce_ids)
- if meta_ids:
- meta_node_list = []
- for id in meta_ids:
- query = """
- MATCH (n)
- WHERE id(n) = $node_id
- RETURN n
- """
- with connect_graph().session() as session:
- result = session.run(query, node_id=id)
- if result:
- record = result.data()
- if record:
- meta_node_list.append(record[0]['n'])
-
- # 提取接收到的数据并创建meta_node节点
- meta_node = None
- resource_ids = []
-
- for item in id_lists:
- resource_id = item['resource_id']
- resource_ids.append(resource_id)
-
- for meta_item in item['metaData']:
- meta_id = meta_item['id']
- data_standard = meta_item.get('data_standard', '')
- en_name_zh = meta_item.get('en_name_zh', '')
- data_name = meta_item.get('data_name', '')
-
- # 使用传递的参数创建meta_node节点
- meta_params = {
- 'name': data_name,
- 'cn_name': en_name_zh,
- 'standard': data_standard,
- 'time': get_formatted_time()
- }
-
- # 创建meta_node节点
- meta_node = create_or_get_node('meta_node', **meta_params)
-
- # 创建与data_model的关系
- if meta_node and not relationship_exists(data_model_node, 'component', meta_node):
- with connect_graph().session() as session:
- session.execute_write(
- lambda tx: tx.run(
- "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:component]->(b)",
- a_id=data_model_node.id, b_id=meta_node.id
- )
- )
- # 定义查询数据模型详情的Cypher查询
- def type_cql_query():
- """
- 构建获取数据模型详情的Cypher查询
-
- Returns:
- 查询语句
- """
- query = """
- MATCH (n:data_model)
- WHERE elementId(n) = $nodeId
- // 获取元数据节点, 数据模型
- WITH n
- OPTIONAL MATCH (n)-[:connection]->(a:meta_node)
- // 获取数据标准
- OPTIONAL MATCH (n)-[:clean_model]-(d:data_standard)-[:clean_model]->(a)
- // 获取数据标签
- OPTIONAL MATCH (n)-[:label]->(la:data_label)
- // 获取子节点关系
- OPTIONAL MATCH (n)-[:child]->(child_node:data_model)
- // 收集元数据信息,注意ma变量未定义,需要修复
- OPTIONAL MATCH (a)-[:master_data]->(ma:master_data)
- WITH n, a, d, la, collect(DISTINCT {id: elementId(child_node), name: child_node.name}) AS childrenId, ma
- // 收集元数据信息并排序
- WITH n, collect(DISTINCT {
- id: elementId(a),
- name: COALESCE(a.name, ""),
- en_name: COALESCE(a.en_name, ""),
- data_type: COALESCE(a.data_type, ""),
- master_data: CASE
- WHEN ma IS NOT NULL THEN {id: elementId(ma), name: COALESCE(ma.name, "")}
- ELSE NULL
- END,
- data_standard: CASE
- WHEN d IS NOT NULL THEN {id: elementId(d), name: COALESCE(d.name, "")}
- ELSE NULL
- END
- }) AS meta_ids,
- properties(n) AS properties,
- CASE
- WHEN la IS NOT NULL THEN {id: elementId(la), name: COALESCE(la.name, "")}
- ELSE NULL
- END AS tag,
- childrenId
- // 对 meta_ids 进行排序
- UNWIND meta_ids AS meta_id
- WITH n, tag, properties, childrenId, meta_id
- ORDER BY meta_id.id
- WITH n, tag, properties, childrenId, collect(meta_id) AS sorted_meta_ids
- // 构建结果集
- WITH [{data_resource: null, resource_id: null, meta_ids: sorted_meta_ids}] AS resources,
- elementId(n) as nodeid, tag, properties, n, childrenId
- UNWIND resources as resource
- WITH nodeid, collect(resource) as results, tag, properties, n, childrenId
- // 合并结果集
- RETURN results, tag, properties, childrenId
- """
- return query
- # 数据模型编辑接口
- def handle_id_model(id):
- """
- 获取数据模型详情
-
- Args:
- id: 数据模型ID (字符串)
-
- Returns:
- 数据模型详情
- """
- # 获取数据模型的名称,元数据名称,对应选中的数据资源名称
- query = type_cql_query()
- # 确保id参数为字符串类型并进行日志输出
- node_id = str(id) if id is not None else None
- print(f"Querying data model with elementId: {node_id}")
- with connect_graph().session() as session:
- try:
- result = session.run(query, nodeId=node_id)
- data_ = result.data()
- print(f"Query result: {data_}")
- if not data_:
- print(f"No data found for elementId: {node_id}")
- return {"data_model": {}}
- res_list = []
- properties = {}
- for record in data_:
- if 'results' in record:
- res_list = record['results']
- if 'properties' in record:
- properties = record['properties']
- if 'tag' in record and record['tag'] is not None:
- properties['tag'] = record['tag']
- if 'childrenId' in record and record['childrenId'] is not None:
- properties['childrenId'] = record['childrenId']
-
- # 处理id值,确保是字符串格式
- if 'id' in properties and properties['id'] is not None:
- properties['id'] = str(properties['id'])
-
- # 处理tag中的id
- if 'tag' in properties and properties['tag'] is not None and 'id' in properties['tag']:
- properties['tag']['id'] = str(properties['tag']['id'])
-
- # 处理childrenId列表中的id
- if 'childrenId' in properties and properties['childrenId']:
- for child in properties['childrenId']:
- if 'id' in child:
- child['id'] = str(child['id'])
-
- properties.pop('id_list', None)
- if 'tag' not in properties:
- properties['tag'] = None
- if 'describe' not in properties:
- properties['describe'] = None
- # 处理结果中的id值为字符串
- if res_list:
- for res in res_list:
- if 'resource_id' in res and res['resource_id'] is not None:
- res['resource_id'] = str(res['resource_id'])
- if 'meta_ids' in res:
- for meta in res['meta_ids']:
- if 'id' in meta:
- meta['id'] = str(meta['id'])
- if 'data_standard' in meta and meta['data_standard'] and 'id' in meta['data_standard']:
- meta['data_standard']['id'] = str(meta['data_standard']['id'])
- if 'master_data' in meta and meta['master_data'] and 'id' in meta['master_data']:
- meta['master_data']['id'] = str(meta['master_data']['id'])
- res_dict = {"resource_selected": res_list}
- merged_dict = {**res_dict, **properties}
- response_data = {"data_model": merged_dict}
- return response_data
- except Exception as e:
- print(f"Error in handle_id_model: {str(e)}")
- import traceback
- traceback.print_exc()
- return {"data_model": {}}
- # 数据模型详情
- '''
- def handle_id_model(model_id):
- """
- 获取数据模型详情
-
- Args:
- model_id: 数据模型ID
-
- Returns:
- 数据模型详情
- """
- model_detail_query = """
- MATCH (n:data_model) WHERE id(n) = $model_id
- RETURN n
- """
-
- with connect_graph().session() as session:
- model_detail_result = session.run(model_detail_query, model_id=model_id).data()
-
- if not model_detail_result:
- return None
-
- model_detail = model_detail_result[0]['n']
- model_info = dict(model_detail)
- model_info['id'] = model_id
-
- # 获取data_model节点连接的resource节点
- resource_query = """
- MATCH (n:data_model)-[:resource]->(r:data_resource) WHERE id(n) = $model_id
- RETURN r
- """
- resource_result = session.run(resource_query, model_id=model_id).data()
- resources = []
-
- for item in resource_result:
- if 'r' in item and hasattr(item['r'], 'id'):
- resource = dict(item['r'])
- resource['id'] = item['r'].id
- resources.append(resource)
-
- model_info['resources'] = resources
-
- # 获取data_model节点连接的component节点
- component_query = """
- MATCH (n:data_model)-[:component]->(m:meta_node) WHERE id(n) = $model_id
- RETURN m
- """
- component_result = session.run(component_query, model_id=model_id).data()
- components = []
-
- for item in component_result:
- if 'm' in item and hasattr(item['m'], 'id'):
- component = dict(item['m'])
- component['id'] = item['m'].id
- components.append(component)
-
- model_info['components'] = components
-
- # 获取data_model节点连接的use节点
- use_query = """
- MATCH (n:data_model)-[:use]->(u:data_model) WHERE id(n) = $model_id
- RETURN u
- """
- use_result = session.run(use_query, model_id=model_id).data()
- uses = []
-
- for item in use_result:
- if 'u' in item and hasattr(item['u'], 'id'):
- use = dict(item['u'])
- use['id'] = item['u'].id
- uses.append(use)
-
- model_info['uses'] = uses
-
- # 获取data_model节点连接的标签
- tag_query = """
- MATCH (n:data_model)-[:label]->(t:data_label) WHERE id(n) = $model_id
- RETURN t
- """
- tag_result = session.run(tag_query, model_id=model_id).data()
-
- if tag_result and 't' in tag_result[0] and hasattr(tag_result[0]['t'], 'id'):
- tag = dict(tag_result[0]['t'])
- tag['id'] = tag_result[0]['t'].id
- model_info['tag'] = tag
-
- return model_info
- '''
- # 数据模型列表
- def model_list(skip_count, page_size, en_name_filter=None, name_filter=None,
- category=None, tag=None, level=None):
- """
- 获取数据模型列表
-
- Args:
- skip_count: 跳过的记录数量
- page_size: 每页记录数量
- en_name_filter: 英文名称过滤条件
- name_filter: 名称过滤条件
- category: 分类过滤条件
- tag: 标签过滤条件
- level: 级别过滤条件
-
- Returns:
- tuple: (数据列表, 总记录数)
- """
- # 构建查询条件
- params = {}
- match_clause = "MATCH (n:data_model)"
- where_clause = []
-
- if tag:
- match_clause = "MATCH (n:data_model)-[:label]->(t:data_label)"
- where_clause.append("elementId(t) = $tag")
- params['tag'] = str(tag)
-
- if name_filter:
- where_clause.append("n.name =~ $name_filter")
- params['name_filter'] = f"(?i).*{name_filter}.*"
-
- if en_name_filter:
- where_clause.append("n.en_name =~ $en_name_filter")
- params['en_name_filter'] = f"(?i).*{en_name_filter}.*"
-
- if category:
- where_clause.append("n.category = $category")
- params['category'] = category
-
- if level:
- where_clause.append("n.level = $level")
- params['level'] = level
-
- # 转换为字符串形式
- where_str = " AND ".join(where_clause)
- if where_str:
- where_str = "WHERE " + where_str
-
- # 获取数据总数
- count_query = f"""
- {match_clause}
- {where_str}
- RETURN count(n) as count
- """
-
- # 使用正确的session方式执行查询
- driver = connect_graph()
- if not driver:
- return [], 0
-
- with driver.session() as session:
- count_result = session.run(count_query, **params)
- count = count_result.single()["count"]
-
- # 获取分页数据
- params['skip'] = skip_count
- params['limit'] = page_size
-
- data_query = f"""
- {match_clause}
- {where_str}
- OPTIONAL MATCH (n)-[:label]->(t:data_label)
- WITH n, t
- OPTIONAL MATCH (n)-[:component]->(m:meta_node)
- RETURN
- elementId(n) as id,
- n.name as name,
- n.en_name as en_name,
- n.category as category,
- n.description as description,
- n.time as time,
- n.level as level,
- t.name as tag_name,
- elementId(t) as tag_id,
- count(m) as component_count
- ORDER BY n.time DESC
- SKIP $skip
- LIMIT $limit
- """
-
- result = session.run(data_query, **params)
- data = result.data()
-
- # 确保结果中的ID是字符串格式
- for item in data:
- if 'id' in item:
- item['id'] = str(item['id'])
- if 'tag_id' in item and item['tag_id'] is not None:
- item['tag_id'] = str(item['tag_id'])
-
- return data, count
- # 有血缘关系的数据资源列表
- def model_resource_list(skip_count, page_size, name_filter=None, id=None,
- category=None, time=None):
- """
- 获取有血缘关系的数据资源列表
-
- Args:
- skip_count: 跳过的记录数量
- page_size: 每页记录数量
- name_filter: 名称过滤条件
- id: 数据资源ID
- category: 分类过滤条件
- time: 时间过滤条件
-
- Returns:
- tuple: (数据列表, 总记录数)
- """
- # 构建查询条件
- params = {'id': id}
- where_clause = []
-
- if name_filter:
- where_clause.append("n.name =~ $name_filter")
- params['name_filter'] = f"(?i).*{name_filter}.*"
-
- if category:
- where_clause.append("n.category = $category")
- params['category'] = category
-
- if time:
- where_clause.append("n.time >= $time")
- params['time'] = time
-
- # 转换为字符串形式
- where_str = " AND ".join(where_clause)
- if where_str:
- where_str = "WHERE " + where_str
-
- # 获取数据总数
- count_query = f"""
- MATCH (search:data_resource) WHERE id(search) = $id
- MATCH (search)-[:connection]->(mn:meta_node)<-[:connection]-(n:data_resource)
- {where_str}
- RETURN count(DISTINCT n) as count
- """
-
- # 使用正确的session方式执行查询
- driver = connect_graph()
- if not driver:
- return [], 0
-
- with driver.session() as session:
- count_result = session.run(count_query, **params)
- count = count_result.single()["count"]
-
- # 获取分页数据
- params['skip'] = skip_count
- params['limit'] = page_size
-
- data_query = f"""
- MATCH (search:data_resource) WHERE id(search) = $id
- MATCH (search)-[:connection]->(mn:meta_node)<-[:connection]-(n:data_resource)
- {where_str}
- WITH DISTINCT n, mn
- RETURN
- id(n) as id,
- n.name as name,
- n.en_name as en_name,
- n.category as category,
- n.description as description,
- n.time as time,
- collect({{id: id(mn), name: mn.name}}) as common_meta
- ORDER BY n.time DESC
- SKIP $skip
- LIMIT $limit
- """
-
- result = session.run(data_query, **params)
- data = result.data()
-
- return data, count
- # 数据模型血缘图谱
- def model_kinship_graph(nodeid, meta=False):
- """
- 获取数据模型血缘图谱
-
- Args:
- nodeid: 节点ID(字符串)
- meta: 是否返回元数据
-
- Returns:
- 图谱数据
- """
- # 确保nodeid是字符串类型
- node_id = str(nodeid) if nodeid is not None else None
-
- if meta:
- query = """
- MATCH p = (n:data_model)-[r:component|resource*..3]-(m)
- WHERE elementId(n) = $nodeId
- WITH p, relationships(p) as rels
- RETURN p
- limit 300
- """
- else:
- query = """
- MATCH p = (n:data_model)-[r:resource*..3]-(m)
- WHERE elementId(n) = $nodeId and labels(m) <> ['meta_node']
- WITH p, relationships(p) as rels
- RETURN p
- limit 300
- """
-
- # 使用正确的session方式执行查询
- driver = connect_graph()
- if not driver:
- return {"nodes": [], "edges": []}
-
- with driver.session() as session:
- result = session.run(query, nodeId=node_id)
-
- nodes = set()
- relationships = set()
- nodes_by_id = {}
-
- for record in result:
- path = record["p"]
-
- for node in path.nodes:
- node_id_str = str(node.element_id) if hasattr(node, 'element_id') else str(node.id)
- if node_id_str not in nodes:
- node_type = list(node.labels)[0].split('_')[1]
- node_data = {
- "id": node_id_str,
- "text": node.get("name", ""),
- "type": node_type
- }
-
- nodes.add(node_id_str)
- nodes_by_id[node_id_str] = node_data
-
- for rel in path.relationships:
- start_id = str(rel.start_node.element_id) if hasattr(rel.start_node, 'element_id') else str(rel.start_node.id)
- end_id = str(rel.end_node.element_id) if hasattr(rel.end_node, 'element_id') else str(rel.end_node.id)
- relationship_id = f"{start_id}-{end_id}"
- if relationship_id not in relationships:
- relationship_data = {
- "from": start_id,
- "to": end_id,
- "text": type(rel).__name__
- }
- relationships.add(relationship_id)
-
- # 转换为所需格式
- return {
- "nodes": list(nodes_by_id.values()),
- "edges": [{"from": rel.split("-")[0], "to": rel.split("-")[1], "text": ""}
- for rel in relationships]
- }
- # 数据模型影响图谱
- def model_impact_graph(nodeid, meta=False):
- """
- 获取数据模型影响图谱
-
- Args:
- nodeid: 节点ID(字符串)
- meta: 是否返回元数据
-
- Returns:
- 图谱数据
- """
- # 确保nodeid是字符串类型
- node_id = str(nodeid) if nodeid is not None else None
-
- if meta:
- query = """
- MATCH p = (n:data_model)-[r:use*..3]-(m)
- WHERE elementId(n) = $nodeId
- WITH p, relationships(p) as rels
- RETURN p
- limit 300
- """
- else:
- query = """
- MATCH p = (n:data_model)-[r:use*..3]-(m)
- WHERE elementId(n) = $nodeId
- WITH p, relationships(p) as rels
- RETURN p
- limit 300
- """
-
- # 使用正确的session方式执行查询
- driver = connect_graph()
- if not driver:
- return {"nodes": [], "edges": []}
-
- with driver.session() as session:
- result = session.run(query, nodeId=node_id)
-
- nodes = set()
- relationships = set()
- nodes_by_id = {}
-
- for record in result:
- path = record["p"]
-
- for node in path.nodes:
- node_id_str = str(node.element_id) if hasattr(node, 'element_id') else str(node.id)
- if node_id_str not in nodes:
- node_type = list(node.labels)[0].split('_')[1]
- node_data = {
- "id": node_id_str,
- "text": node.get("name", ""),
- "type": node_type
- }
-
- nodes.add(node_id_str)
- nodes_by_id[node_id_str] = node_data
-
- for rel in path.relationships:
- start_id = str(rel.start_node.element_id) if hasattr(rel.start_node, 'element_id') else str(rel.start_node.id)
- end_id = str(rel.end_node.element_id) if hasattr(rel.end_node, 'element_id') else str(rel.end_node.id)
- relationship_id = f"{start_id}-{end_id}"
- if relationship_id not in relationships:
- relationship_data = {
- "from": start_id,
- "to": end_id,
- "text": type(rel).__name__
- }
- relationships.add(relationship_id)
-
- # 转换为所需格式
- return {
- "nodes": list(nodes_by_id.values()),
- "edges": [{"from": rel.split("-")[0], "to": rel.split("-")[1], "text": ""}
- for rel in relationships]
- }
- # 数据模型全部图谱
- def model_all_graph(nodeid, meta=False):
- """
- 获取数据模型全部图谱
-
- Args:
- nodeid: 节点ID(字符串)
- meta: 是否返回元数据
-
- Returns:
- 图谱数据
- """
- # 确保nodeid是字符串类型
- node_id = str(nodeid) if nodeid is not None else None
-
- if meta:
- query = """
- MATCH p = (n:data_model)-[r*..3]-(m)
- WHERE elementId(n) = $nodeId
- WITH p, relationships(p) as rels
- RETURN p
- limit 300
- """
- else:
- query = """
- MATCH p = (n:data_model)-[r*..3]-(m)
- WHERE elementId(n) = $nodeId and labels(m) <> ['meta_node']
- WITH p, relationships(p) as rels
- RETURN p
- limit 300
- """
-
- # 使用正确的session方式执行查询
- driver = connect_graph()
- if not driver:
- return {"nodes": [], "edges": []}
-
- with driver.session() as session:
- result = session.run(query, nodeId=node_id)
-
- nodes = set()
- relationships = set()
- nodes_by_id = {}
-
- for record in result:
- path = record["p"]
-
- for node in path.nodes:
- node_id_str = str(node.element_id) if hasattr(node, 'element_id') else str(node.id)
- if node_id_str not in nodes:
- node_type = list(node.labels)[0].split('_')[1]
- node_data = {
- "id": node_id_str,
- "text": node.get("name", ""),
- "type": node_type
- }
-
- nodes.add(node_id_str)
- nodes_by_id[node_id_str] = node_data
-
- for rel in path.relationships:
- start_id = str(rel.start_node.element_id) if hasattr(rel.start_node, 'element_id') else str(rel.start_node.id)
- end_id = str(rel.end_node.element_id) if hasattr(rel.end_node, 'element_id') else str(rel.end_node.id)
- relationship_id = f"{start_id}-{end_id}"
- if relationship_id not in relationships:
- relationship_data = {
- "from": start_id,
- "to": end_id,
- "text": type(rel).__name__
- }
- relationships.add(relationship_id)
-
- # 转换为所需格式
- return {
- "nodes": list(nodes_by_id.values()),
- "edges": [{"from": rel.split("-")[0], "to": rel.split("-")[1], "text": ""}
- for rel in relationships]
- }
- # 更新数据模型
- def data_model_edit(receiver):
- """
- 更新数据模型
-
- Args:
- receiver: 接收到的请求参数
-
- Returns:
- 更新结果
- """
- id = receiver.get('id')
- name = receiver.get('name')
- en_name = receiver.get('en_name')
- category = receiver.get('category')
- description = receiver.get('description')
- tag = receiver.get('tag')
-
- # 更新数据模型节点
- query = """
- MATCH (n:data_model) WHERE id(n) = $id
- SET n.name = $name, n.en_name = $en_name, n.category = $category, n.description = $description
- RETURN n
- """
-
- with connect_graph().session() as session:
- result = session.run(query, id=id, name=name, en_name=en_name,
- category=category, description=description).data()
-
- # 处理标签关系
- if tag:
- # 先删除所有标签关系
- delete_query = """
- MATCH (n:data_model)-[r:label]->() WHERE id(n) = $id
- DELETE r
- """
- with connect_graph().session() as session:
- session.run(delete_query, id=id)
-
- # 再创建新的标签关系
- tag_node = get_node_by_id('data_label', tag)
- if tag_node:
- model_node = get_node_by_id_no_label(id)
- if model_node and not relationship_exists(model_node, 'label', tag_node):
- with connect_graph().session() as session:
- session.execute_write(
- lambda tx: tx.run(
- "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:label]->(b)",
- a_id=model_node.id, b_id=tag_node.id
- )
- )
-
- return {"message": "数据模型更新成功"}
|