|
@@ -1,1887 +0,0 @@
|
|
|
-"""
|
|
|
|
|
-数据模型核心业务逻辑模块
|
|
|
|
|
-
|
|
|
|
|
-本模块包含了数据模型相关的所有核心业务逻辑函数,包括:
|
|
|
|
|
-- 数据模型的创建、更新、删除
|
|
|
|
|
-- 数据模型与数据资源、元数据之间的关系处理
|
|
|
|
|
-- 数据模型血缘关系管理
|
|
|
|
|
-- 数据模型图谱生成
|
|
|
|
|
-- 数据模型层级计算等功能
|
|
|
|
|
-"""
|
|
|
|
|
-
|
|
|
|
|
-import math
|
|
|
|
|
-import threading
|
|
|
|
|
-from concurrent.futures import ThreadPoolExecutor
|
|
|
|
|
-import pandas as pd
|
|
|
|
|
-from py2neo import Relationship
|
|
|
|
|
-import logging
|
|
|
|
|
-import json
|
|
|
|
|
-
|
|
|
|
|
-# Configure logger
|
|
|
|
|
-logger = logging.getLogger(__name__)
|
|
|
|
|
-
|
|
|
|
|
-from app.core.graph.graph_operations import relationship_exists
|
|
|
|
|
-from app.core.graph.graph_operations import connect_graph,create_or_get_node,get_node
|
|
|
|
|
-from app.services.neo4j_driver import neo4j_driver
|
|
|
|
|
-from app.core.meta_data import get_formatted_time, handle_id_unstructured
|
|
|
|
|
-from app.core.common import delete_relationships, update_or_create_node, get_node_by_id_no_label
|
|
|
|
|
-from app.core.data_resource.resource import get_node_by_id, serialize_node_properties
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-# 根据child关系计算数据模型当前的level自动保存
|
|
|
|
|
-def calculate_model_level(id):
|
|
|
|
|
- """
|
|
|
|
|
- 根据child关系计算数据模型当前的level并自动保存
|
|
|
|
|
-
|
|
|
|
|
- Args:
|
|
|
|
|
- id: 数据模型的节点ID(整数)
|
|
|
|
|
-
|
|
|
|
|
- Returns:
|
|
|
|
|
- None
|
|
|
|
|
- """
|
|
|
|
|
- # 确保id是整数类型
|
|
|
|
|
- node_id = int(id) if id is not None else None
|
|
|
|
|
-
|
|
|
|
|
- cql = """
|
|
|
|
|
- MATCH (start_node:DataModel)
|
|
|
|
|
- WHERE id(start_node) = $nodeId
|
|
|
|
|
- CALL {
|
|
|
|
|
- WITH start_node
|
|
|
|
|
- OPTIONAL MATCH path = (start_node)-[:child*]->(end_node)
|
|
|
|
|
- RETURN length(path) AS level
|
|
|
|
|
- }
|
|
|
|
|
- WITH coalesce(max(level), 0) AS max_level
|
|
|
|
|
- RETURN max_level
|
|
|
|
|
- """
|
|
|
|
|
-
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- result = session.run(cql, nodeId=node_id)
|
|
|
|
|
- record = result.single()
|
|
|
|
|
- data = record["max_level"] if record and "max_level" in record else 0
|
|
|
|
|
-
|
|
|
|
|
- # 更新level属性
|
|
|
|
|
- update_query = """
|
|
|
|
|
- MATCH (n:DataModel)
|
|
|
|
|
- WHERE id(n) = $nodeId
|
|
|
|
|
- SET n.level = $level
|
|
|
|
|
- RETURN n
|
|
|
|
|
- """
|
|
|
|
|
-
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- session.run(update_query, nodeId=node_id, level=data)
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-# 处理数据模型血缘关系
|
|
|
|
|
-def handle_model_relation(resource_ids):
|
|
|
|
|
- """
|
|
|
|
|
- 处理数据模型血缘关系
|
|
|
|
|
-
|
|
|
|
|
- Args:
|
|
|
|
|
- resource_ids: 数据资源ID
|
|
|
|
|
-
|
|
|
|
|
- Returns:
|
|
|
|
|
- 血缘关系数据
|
|
|
|
|
- """
|
|
|
|
|
- query = """
|
|
|
|
|
- MATCH (search:DataResource)-[:connection]->(common_node:meta_node)<-[:connection]-(connect:DataResource)
|
|
|
|
|
- WHERE id(search) = $resource_Ids
|
|
|
|
|
- WITH search, connect, common_node
|
|
|
|
|
- MATCH (search)-[:connection]->(search_node:meta_node)
|
|
|
|
|
- WITH search, connect, common_node, collect(DISTINCT id(search_node)) AS search_nodes
|
|
|
|
|
- MATCH (connect)-[:connection]->(connect_node:meta_node)
|
|
|
|
|
- WITH search, connect, common_node, search_nodes, collect(DISTINCT id(connect_node)) AS connect_nodes
|
|
|
|
|
- WITH search, connect, search_nodes, connect_nodes, collect(DISTINCT id(common_node)) AS common_nodes
|
|
|
|
|
-
|
|
|
|
|
- // 剔除 search_nodes 和 connect_nodes 中包含在 common_nodes 中的内容
|
|
|
|
|
- WITH search, connect, common_nodes,
|
|
|
|
|
- [node IN search_nodes WHERE NOT node IN common_nodes] AS filtered_search_nodes,
|
|
|
|
|
- [node IN connect_nodes WHERE NOT node IN common_nodes] AS filtered_connect_nodes
|
|
|
|
|
-
|
|
|
|
|
- RETURN id(connect) as blood_resources, common_nodes,
|
|
|
|
|
- filtered_search_nodes as origin_nodes, filtered_connect_nodes as blood_nodes
|
|
|
|
|
- """
|
|
|
|
|
-
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- result = session.run(query, resource_Ids=resource_ids)
|
|
|
|
|
- return result.data()
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-# 创建一个数据模型节点
|
|
|
|
|
-def handle_data_model(data_model, result_list, result, receiver):
|
|
|
|
|
- """
|
|
|
|
|
- 创建一个数据模型节点
|
|
|
|
|
-
|
|
|
|
|
- Args:
|
|
|
|
|
- data_model: 数据模型名称
|
|
|
|
|
- result_list: 数据模型英文名列表
|
|
|
|
|
- result: 序列化的ID列表
|
|
|
|
|
- receiver: 接收到的请求参数
|
|
|
|
|
-
|
|
|
|
|
- Returns:
|
|
|
|
|
- tuple: (id, data_model_node)
|
|
|
|
|
- """
|
|
|
|
|
- try:
|
|
|
|
|
- # 添加数据资源 血缘关系的字段 blood_resource
|
|
|
|
|
- data_model_en = result_list[0] if result_list and len(result_list) > 0 else ""
|
|
|
|
|
- receiver['id_list'] = result
|
|
|
|
|
- add_attribute = {
|
|
|
|
|
- 'create_time': get_formatted_time(),
|
|
|
|
|
- 'name_en': data_model_en
|
|
|
|
|
- }
|
|
|
|
|
- receiver.update(add_attribute)
|
|
|
|
|
- data_model_node = get_node('DataModel', name_zh=data_model) or create_or_get_node('DataModel', **receiver)
|
|
|
|
|
-
|
|
|
|
|
- logger.info(f"通过查询或创建节点获得节点ID111,data_model_node: {data_model_node}")
|
|
|
|
|
- # 获取节点ID,确保我们能安全地访问节点ID
|
|
|
|
|
- node_id = data_model_node
|
|
|
|
|
- if hasattr(data_model_node, 'id'):
|
|
|
|
|
- logger.info(f"通过节点ID获取节点ID222,data_model_node: {data_model_node}")
|
|
|
|
|
- node_id = data_model_node.id
|
|
|
|
|
- else:
|
|
|
|
|
- logger.info(f"通过查询节点名称获取节点ID333,data_model_node: {data_model_node}")
|
|
|
|
|
- # 如果节点没有id属性,尝试通过查询获取
|
|
|
|
|
- query = """
|
|
|
|
|
- MATCH (n:DataModel {name: $name})
|
|
|
|
|
- RETURN id(n) as node_id
|
|
|
|
|
- """
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- result = session.run(query, name=data_model)
|
|
|
|
|
- record = result.single()
|
|
|
|
|
- logger.info(f"通过查询节点名称获取节点ID444,record: {record}")
|
|
|
|
|
- if record and "node_id" in record:
|
|
|
|
|
- logger.info(f"通过查询节点名称获取节点ID555,record: {record}")
|
|
|
|
|
- node_id = record["node_id"]
|
|
|
|
|
-
|
|
|
|
|
- # 安全地处理子节点关系
|
|
|
|
|
- child_list = receiver.get('childrenId', [])
|
|
|
|
|
- for child_id in child_list:
|
|
|
|
|
- child_node = get_node_by_id_no_label(child_id)
|
|
|
|
|
- if child_node:
|
|
|
|
|
- # 直接使用Cypher查询检查关系是否存在
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- rel_query = """
|
|
|
|
|
- MATCH (a)-[r:child]->(b)
|
|
|
|
|
- WHERE id(a) = $start_id AND id(b) = $end_id
|
|
|
|
|
- RETURN count(r) > 0 as exists
|
|
|
|
|
- """
|
|
|
|
|
- rel_result = session.run(rel_query,
|
|
|
|
|
- start_id=int(node_id),
|
|
|
|
|
- end_id=int(child_node.id)).single()
|
|
|
|
|
-
|
|
|
|
|
- # 如果关系不存在,则创建关系
|
|
|
|
|
- if not (rel_result and rel_result["exists"]):
|
|
|
|
|
- child_node_id = child_node.id if child_node else None
|
|
|
|
|
- if child_node_id is not None:
|
|
|
|
|
- # 将变量转换为确定的int类型以避免类型检查问题
|
|
|
|
|
- child_id_int = int(child_node_id)
|
|
|
|
|
- session.execute_write(
|
|
|
|
|
- lambda tx: tx.run(
|
|
|
|
|
- "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:child]->(b)",
|
|
|
|
|
- a_id=int(node_id), b_id=child_id_int
|
|
|
|
|
- )
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- # 根据传入参数id,和数据标签建立关系
|
|
|
|
|
- if receiver.get('tag'):
|
|
|
|
|
- tag = get_node_by_id('DataLabel', receiver['tag'])
|
|
|
|
|
- if tag:
|
|
|
|
|
- # 直接使用Cypher查询检查关系是否存在
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- rel_query = """
|
|
|
|
|
- MATCH (a)-[r:LABEL]->(b)
|
|
|
|
|
- WHERE id(a) = $start_id AND id(b) = $end_id
|
|
|
|
|
- RETURN count(r) > 0 as exists
|
|
|
|
|
- """
|
|
|
|
|
- rel_result = session.run(rel_query,
|
|
|
|
|
- start_id=int(node_id),
|
|
|
|
|
- end_id=int(tag.id)).single()
|
|
|
|
|
-
|
|
|
|
|
- # 如果关系不存在,则创建关系
|
|
|
|
|
- if not (rel_result and rel_result["exists"]):
|
|
|
|
|
- session.execute_write(
|
|
|
|
|
- lambda tx: tx.run(
|
|
|
|
|
- "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:LABEL]->(b)",
|
|
|
|
|
- a_id=int(node_id), b_id=int(tag.id)
|
|
|
|
|
- )
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- # 处理数据源关系 - 创建COME_FROM关系
|
|
|
|
|
- data_source = receiver.get('data_source')
|
|
|
|
|
- if data_source:
|
|
|
|
|
- try:
|
|
|
|
|
- # 获取数据源的标识(支持多种格式)
|
|
|
|
|
- data_source_id = None
|
|
|
|
|
- data_source_name_en = None
|
|
|
|
|
-
|
|
|
|
|
- # 1. 如果是数字(节点ID)
|
|
|
|
|
- if isinstance(data_source, (int, float)) or (isinstance(data_source, str) and data_source.isdigit()):
|
|
|
|
|
- data_source_id = int(data_source)
|
|
|
|
|
- logger.info(f"data_source 为节点ID: {data_source_id}")
|
|
|
|
|
- # 2. 如果是字典且包含name_en
|
|
|
|
|
- elif isinstance(data_source, dict) and data_source.get('name_en'):
|
|
|
|
|
- data_source_name_en = data_source['name_en']
|
|
|
|
|
- logger.info(f"data_source 为字典,提取name_en: {data_source_name_en}")
|
|
|
|
|
- # 3. 如果是字符串(name_en)
|
|
|
|
|
- elif isinstance(data_source, str):
|
|
|
|
|
- data_source_name_en = data_source
|
|
|
|
|
- logger.info(f"data_source 为字符串name_en: {data_source_name_en}")
|
|
|
|
|
-
|
|
|
|
|
- # 创建数据模型与数据源的关系
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- if data_source_id is not None:
|
|
|
|
|
- # 使用节点ID创建关系
|
|
|
|
|
- # 首先检查数据源节点是否存在
|
|
|
|
|
- check_ds_cypher = "MATCH (b:DataSource) WHERE id(b) = $ds_id RETURN b"
|
|
|
|
|
- check_ds_result = session.run(check_ds_cypher, ds_id=data_source_id)
|
|
|
|
|
-
|
|
|
|
|
- if not check_ds_result.single():
|
|
|
|
|
- logger.warning(f"数据源节点不存在: ID={data_source_id},跳过关系创建")
|
|
|
|
|
- else:
|
|
|
|
|
- # 检查关系是否已存在
|
|
|
|
|
- rel_check_query = """
|
|
|
|
|
- MATCH (a:DataModel)-[r:COME_FROM]->(b:DataSource)
|
|
|
|
|
- WHERE id(a) = $model_id AND id(b) = $ds_id
|
|
|
|
|
- RETURN count(r) > 0 as exists
|
|
|
|
|
- """
|
|
|
|
|
- rel_check_result = session.run(rel_check_query,
|
|
|
|
|
- model_id=int(node_id),
|
|
|
|
|
- ds_id=data_source_id).single()
|
|
|
|
|
-
|
|
|
|
|
- # 如果关系不存在,则创建COME_FROM关系
|
|
|
|
|
- if not (rel_check_result and rel_check_result["exists"]):
|
|
|
|
|
- create_rel_cypher = """
|
|
|
|
|
- MATCH (a:DataModel), (b:DataSource)
|
|
|
|
|
- WHERE id(a) = $model_id AND id(b) = $ds_id
|
|
|
|
|
- CREATE (a)-[r:COME_FROM]->(b)
|
|
|
|
|
- RETURN r
|
|
|
|
|
- """
|
|
|
|
|
- session.run(create_rel_cypher,
|
|
|
|
|
- model_id=int(node_id),
|
|
|
|
|
- ds_id=data_source_id)
|
|
|
|
|
- logger.info(f"已创建数据模型与数据源的COME_FROM关系: model_id={node_id} -> data_source_id={data_source_id}")
|
|
|
|
|
- else:
|
|
|
|
|
- logger.info(f"数据模型与数据源的COME_FROM关系已存在: model_id={node_id} -> data_source_id={data_source_id}")
|
|
|
|
|
-
|
|
|
|
|
- elif data_source_name_en:
|
|
|
|
|
- # 使用name_en创建关系(兼容旧方式)
|
|
|
|
|
- # 首先检查数据源节点是否存在
|
|
|
|
|
- check_ds_cypher = "MATCH (b:DataSource) WHERE b.name_en = $ds_name_en RETURN b"
|
|
|
|
|
- check_ds_result = session.run(check_ds_cypher, ds_name_en=data_source_name_en)
|
|
|
|
|
-
|
|
|
|
|
- if not check_ds_result.single():
|
|
|
|
|
- logger.warning(f"数据源节点不存在: name_en={data_source_name_en},跳过关系创建")
|
|
|
|
|
- else:
|
|
|
|
|
- # 检查关系是否已存在
|
|
|
|
|
- rel_check_query = """
|
|
|
|
|
- MATCH (a:DataModel)-[r:COME_FROM]->(b:DataSource)
|
|
|
|
|
- WHERE id(a) = $model_id AND b.name_en = $ds_name_en
|
|
|
|
|
- RETURN count(r) > 0 as exists
|
|
|
|
|
- """
|
|
|
|
|
- rel_check_result = session.run(rel_check_query,
|
|
|
|
|
- model_id=int(node_id),
|
|
|
|
|
- ds_name_en=data_source_name_en).single()
|
|
|
|
|
-
|
|
|
|
|
- # 如果关系不存在,则创建COME_FROM关系
|
|
|
|
|
- if not (rel_check_result and rel_check_result["exists"]):
|
|
|
|
|
- create_rel_cypher = """
|
|
|
|
|
- MATCH (a:DataModel), (b:DataSource)
|
|
|
|
|
- WHERE id(a) = $model_id AND b.name_en = $ds_name_en
|
|
|
|
|
- CREATE (a)-[r:COME_FROM]->(b)
|
|
|
|
|
- RETURN r
|
|
|
|
|
- """
|
|
|
|
|
- session.run(create_rel_cypher,
|
|
|
|
|
- model_id=int(node_id),
|
|
|
|
|
- ds_name_en=data_source_name_en)
|
|
|
|
|
- logger.info(f"已创建数据模型与数据源的COME_FROM关系: model_id={node_id} -> name_en={data_source_name_en}")
|
|
|
|
|
- else:
|
|
|
|
|
- logger.info(f"数据模型与数据源的COME_FROM关系已存在: model_id={node_id} -> name_en={data_source_name_en}")
|
|
|
|
|
- else:
|
|
|
|
|
- logger.warning(f"data_source参数无效,无法识别格式: {data_source}")
|
|
|
|
|
-
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- # 数据源关系创建失败不应该中断主流程
|
|
|
|
|
- logger.error(f"处理数据源关系失败(不中断主流程): {str(e)}")
|
|
|
|
|
- # 不再抛出异常,允许主流程继续
|
|
|
|
|
-
|
|
|
|
|
- return node_id, data_model_node
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- logging.error(f"Error in handle_data_model: {str(e)}")
|
|
|
|
|
- raise
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-# (从数据资源中选取)
|
|
|
|
|
-def resource_handle_meta_data_model(id_lists, data_model_node_id):
|
|
|
|
|
- """
|
|
|
|
|
- 处理从数据资源中选取的数据模型与元数据的关系
|
|
|
|
|
-
|
|
|
|
|
- Args:
|
|
|
|
|
- id_lists: ID列表
|
|
|
|
|
- data_model_node_id: 数据模型节点ID
|
|
|
|
|
-
|
|
|
|
|
- Returns:
|
|
|
|
|
- None
|
|
|
|
|
- """
|
|
|
|
|
- try:
|
|
|
|
|
- logger.info(f"开始处理数据模型与元数据的关系,数据模型ID: {data_model_node_id}")
|
|
|
|
|
-
|
|
|
|
|
- # 构建meta_id和resouce_id的列表
|
|
|
|
|
- resouce_ids = [record['resource_id'] for record in id_lists]
|
|
|
|
|
- meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']]
|
|
|
|
|
-
|
|
|
|
|
- logger.info(f"资源ID列表: {resouce_ids}")
|
|
|
|
|
- logger.info(f"元数据ID列表: {meta_ids}")
|
|
|
|
|
-
|
|
|
|
|
- # 创建与meta_node的关系 组成关系
|
|
|
|
|
- if meta_ids:
|
|
|
|
|
- logger.info("开始创建数据模型与元数据的关系")
|
|
|
|
|
- query = """
|
|
|
|
|
- MATCH (source:DataModel), (target:DataMeta)
|
|
|
|
|
- WHERE id(source)=$source_id AND id(target) IN $target_ids
|
|
|
|
|
- MERGE (source)-[:INCLUDES]->(target)
|
|
|
|
|
- RETURN count(*) as count
|
|
|
|
|
- """
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- result = session.run(query, source_id=data_model_node_id, target_ids=meta_ids)
|
|
|
|
|
- result_record = result.single()
|
|
|
|
|
- count = result_record["count"] if result_record else 0
|
|
|
|
|
- logger.info(f"成功创建 {count} 个数据模型与元数据的关系")
|
|
|
|
|
-
|
|
|
|
|
- # 创建与DataResource的关系 资源关系
|
|
|
|
|
- # 不在创建Modle时创建资源关系,将资源关系创建放在数据流程创建时处理
|
|
|
|
|
- # 关系名称为DERIVED_FROM
|
|
|
|
|
- # commented by mxl 2025-06-27
|
|
|
|
|
- #
|
|
|
|
|
- # if resouce_ids:
|
|
|
|
|
- # logger.info("开始创建数据模型与数据资源的关系")
|
|
|
|
|
- # query = """
|
|
|
|
|
- # MATCH (source:DataModel), (target:DataResource)
|
|
|
|
|
- # WHERE id(source)=$source_id AND id(target) IN $target_ids
|
|
|
|
|
- # MERGE (source)-[:DERIVED_FROM]->(target)
|
|
|
|
|
- # RETURN count(*) as count
|
|
|
|
|
- # """
|
|
|
|
|
- # with connect_graph().session() as session:
|
|
|
|
|
- # result = session.run(query, source_id=data_model_node_id, target_ids=resouce_ids)
|
|
|
|
|
- # count = result.single()["count"]
|
|
|
|
|
- # logger.info(f"成功创建 {count} 个数据模型与数据资源的关系")
|
|
|
|
|
-
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- logger.error(f"处理数据模型与元数据的关系时发生错误: {str(e)}")
|
|
|
|
|
- raise
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-# (从数据模型中选取)
|
|
|
|
|
-def model_handle_meta_data_model(id_lists, data_model_node_id):
|
|
|
|
|
- """
|
|
|
|
|
- 处理从数据模型中选取的数据模型与元数据的关系
|
|
|
|
|
-
|
|
|
|
|
- Args:
|
|
|
|
|
- id_lists: ID列表
|
|
|
|
|
- data_model_node_id: 数据模型节点ID
|
|
|
|
|
-
|
|
|
|
|
- Returns:
|
|
|
|
|
- None
|
|
|
|
|
- """
|
|
|
|
|
- # 构建meta_id和model_id的列表
|
|
|
|
|
- model_ids = [record['model_id'] for record in id_lists]
|
|
|
|
|
- meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']]
|
|
|
|
|
-
|
|
|
|
|
- # 创建与meta_node的关系 组成关系
|
|
|
|
|
- if meta_ids:
|
|
|
|
|
- query = """
|
|
|
|
|
- MATCH (source:DataModel), (target:DataMeta)
|
|
|
|
|
- WHERE id(source)=$source_id AND id(target) IN $target_ids
|
|
|
|
|
- MERGE (source)-[:component]->(target)
|
|
|
|
|
- """
|
|
|
|
|
- with neo4j_driver.get_session() as session:
|
|
|
|
|
- session.run(query, source_id=data_model_node_id, target_ids=meta_ids)
|
|
|
|
|
-
|
|
|
|
|
- # 创建与data_model的关系 模型关系
|
|
|
|
|
- if model_ids:
|
|
|
|
|
- query = """
|
|
|
|
|
- MATCH (source:DataModel), (target:DataModel)
|
|
|
|
|
- WHERE id(source)=$source_id AND id(target) IN $target_ids
|
|
|
|
|
- MERGE (source)-[:use]->(target)
|
|
|
|
|
- """
|
|
|
|
|
- with neo4j_driver.get_session() as session:
|
|
|
|
|
- session.run(query, source_id=data_model_node_id, target_ids=model_ids)
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-# (从DDL中选取)
|
|
|
|
|
-def handle_no_meta_data_model(id_lists, receiver, data_model_node):
|
|
|
|
|
- """
|
|
|
|
|
- 处理从DDL中选取的没有元数据的数据模型
|
|
|
|
|
-
|
|
|
|
|
- Args:
|
|
|
|
|
- id_lists: ID列表(可以为空)
|
|
|
|
|
- receiver: 接收到的请求参数
|
|
|
|
|
- data_model_node: 数据模型节点
|
|
|
|
|
-
|
|
|
|
|
- Returns:
|
|
|
|
|
- None
|
|
|
|
|
- """
|
|
|
|
|
- # DDL新增时,id_lists可能为空,提前返回
|
|
|
|
|
- if not id_lists:
|
|
|
|
|
- logger.info("id_lists为空,跳过资源关系处理")
|
|
|
|
|
- return
|
|
|
|
|
-
|
|
|
|
|
- # 构建meta_id和resouce_id的列表
|
|
|
|
|
- resouce_ids = [record['resource_id'] for record in id_lists if 'resource_id' in record]
|
|
|
|
|
- meta_ids = [record['id'] for id_list in id_lists for record in id_list.get('metaData', []) if 'id' in record]
|
|
|
|
|
-
|
|
|
|
|
- # 获取数据模型节点ID
|
|
|
|
|
- data_model_node_id = None
|
|
|
|
|
- if hasattr(data_model_node, 'id'):
|
|
|
|
|
- # data_model_node 是节点对象
|
|
|
|
|
- data_model_node_id = data_model_node.id
|
|
|
|
|
- elif isinstance(data_model_node, int):
|
|
|
|
|
- # data_model_node 直接就是整数ID
|
|
|
|
|
- data_model_node_id = data_model_node
|
|
|
|
|
- elif isinstance(data_model_node, dict):
|
|
|
|
|
- # data_model_node 是字典,尝试通过name_zh查询
|
|
|
|
|
- query = """
|
|
|
|
|
- MATCH (n:DataModel {name_zh: $name_zh})
|
|
|
|
|
- RETURN id(n) as node_id
|
|
|
|
|
- """
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- result = session.run(query, name_zh=data_model_node.get('name_zh'))
|
|
|
|
|
- record = result.single()
|
|
|
|
|
- if record:
|
|
|
|
|
- data_model_node_id = record["node_id"]
|
|
|
|
|
- else:
|
|
|
|
|
- # 未知类型,记录警告
|
|
|
|
|
- logger.warning(f"data_model_node类型未知: {type(data_model_node)}, 值: {data_model_node}")
|
|
|
|
|
-
|
|
|
|
|
- if not data_model_node_id:
|
|
|
|
|
- return
|
|
|
|
|
-
|
|
|
|
|
- # 创建与DataResource的关系 资源关系
|
|
|
|
|
- if resouce_ids:
|
|
|
|
|
- query = """
|
|
|
|
|
- MATCH (source:DataModel), (target:DataResource)
|
|
|
|
|
- WHERE id(source)=$source_id AND id(target) IN $target_ids
|
|
|
|
|
- MERGE (source)-[:resource]->(target)
|
|
|
|
|
- """
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- session.run(query, source_id=data_model_node_id, target_ids=resouce_ids)
|
|
|
|
|
-
|
|
|
|
|
- if meta_ids:
|
|
|
|
|
- meta_node_list = []
|
|
|
|
|
- for id in meta_ids:
|
|
|
|
|
- query = """
|
|
|
|
|
- MATCH (n)
|
|
|
|
|
- WHERE id(n) = $node_id
|
|
|
|
|
- RETURN n
|
|
|
|
|
- """
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- result = session.run(query, node_id=id)
|
|
|
|
|
- # 必须在 session 作用域内处理结果
|
|
|
|
|
- if result:
|
|
|
|
|
- record = result.data()
|
|
|
|
|
- if record:
|
|
|
|
|
- meta_node_list.append(record[0]['n'])
|
|
|
|
|
-
|
|
|
|
|
- # 提取接收到的数据并创建meta_node节点
|
|
|
|
|
- meta_node = None
|
|
|
|
|
- resource_ids = []
|
|
|
|
|
-
|
|
|
|
|
- for item in id_lists:
|
|
|
|
|
- resource_id = item.get('resource_id')
|
|
|
|
|
- if resource_id:
|
|
|
|
|
- resource_ids.append(resource_id)
|
|
|
|
|
-
|
|
|
|
|
- for meta_item in item.get('metaData', []):
|
|
|
|
|
- meta_id = meta_item['id']
|
|
|
|
|
- data_standard = meta_item.get('data_standard', '')
|
|
|
|
|
- name_en = meta_item.get('name_en', '')
|
|
|
|
|
- name_zh = meta_item.get('name_zh', '')
|
|
|
|
|
-
|
|
|
|
|
- # 使用传递的参数创建meta_node节点
|
|
|
|
|
- meta_params = {
|
|
|
|
|
- 'name_zh': name_zh,
|
|
|
|
|
- 'name_en': name_en,
|
|
|
|
|
- 'standard': data_standard,
|
|
|
|
|
- 'create_time': get_formatted_time()
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- # 创建meta_node节点
|
|
|
|
|
- meta_node = create_or_get_node('DataMeta', **meta_params)
|
|
|
|
|
-
|
|
|
|
|
- # 获取数据模型节点ID
|
|
|
|
|
- dm_id = data_model_node_id if data_model_node_id is not None else data_model_node
|
|
|
|
|
- # 确保dm_id是整数类型
|
|
|
|
|
- if isinstance(dm_id, int):
|
|
|
|
|
- dm_id_int = dm_id
|
|
|
|
|
- elif isinstance(dm_id, dict):
|
|
|
|
|
- dict_dm_id = dm_id.get('id')
|
|
|
|
|
- dm_id_int = int(dict_dm_id) if dict_dm_id is not None else None
|
|
|
|
|
- elif hasattr(dm_id, 'id'):
|
|
|
|
|
- dm_id_int = int(dm_id.id)
|
|
|
|
|
- else:
|
|
|
|
|
- try:
|
|
|
|
|
- dm_id_int = int(dm_id)
|
|
|
|
|
- except (ValueError, TypeError):
|
|
|
|
|
- dm_id_int = None
|
|
|
|
|
-
|
|
|
|
|
- if meta_node and dm_id_int is not None:
|
|
|
|
|
- # 确保meta_node_id是整数类型
|
|
|
|
|
- if isinstance(meta_node, int):
|
|
|
|
|
- meta_node_id_int = meta_node
|
|
|
|
|
- elif isinstance(meta_node, dict):
|
|
|
|
|
- dict_id = meta_node.get('id')
|
|
|
|
|
- meta_node_id_int = int(dict_id) if dict_id is not None else None
|
|
|
|
|
- elif hasattr(meta_node, 'id'):
|
|
|
|
|
- meta_node_id_int = int(meta_node.id)
|
|
|
|
|
- else:
|
|
|
|
|
- try:
|
|
|
|
|
- meta_node_id_int = int(meta_node)
|
|
|
|
|
- except (ValueError, TypeError):
|
|
|
|
|
- meta_node_id_int = None
|
|
|
|
|
-
|
|
|
|
|
- if meta_node_id_int is not None:
|
|
|
|
|
- # 直接使用Cypher查询检查关系是否存在
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- rel_query = """
|
|
|
|
|
- MATCH (a)-[r:INCLUDES]->(b)
|
|
|
|
|
- WHERE id(a) = $start_id AND id(b) = $end_id
|
|
|
|
|
- RETURN count(r) > 0 as exists
|
|
|
|
|
- """
|
|
|
|
|
- rel_result = session.run(rel_query,
|
|
|
|
|
- start_id=dm_id_int,
|
|
|
|
|
- end_id=meta_node_id_int).single()
|
|
|
|
|
-
|
|
|
|
|
- # 如果关系不存在,则创建INCLUDES关系
|
|
|
|
|
- if not (rel_result and rel_result["exists"]):
|
|
|
|
|
- session.execute_write(
|
|
|
|
|
- lambda tx: tx.run(
|
|
|
|
|
- "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:INCLUDES]->(b)",
|
|
|
|
|
- a_id=dm_id_int, b_id=meta_node_id_int
|
|
|
|
|
- )
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-# 数据模型-详情接口
|
|
|
|
|
-def handle_id_model(id):
|
|
|
|
|
- """
|
|
|
|
|
- 获取数据模型详情
|
|
|
|
|
-
|
|
|
|
|
- Args:
|
|
|
|
|
- id: 数据模型的节点ID
|
|
|
|
|
-
|
|
|
|
|
- Returns:
|
|
|
|
|
- dict: 包含数据模型详情的字典,格式为:
|
|
|
|
|
- {"data_model": {
|
|
|
|
|
- "resource_selected": [...],
|
|
|
|
|
- "leader": ...,
|
|
|
|
|
- "origin": ...,
|
|
|
|
|
- "frequency": ...,
|
|
|
|
|
- "childrenId": [...],
|
|
|
|
|
- "organization": ...,
|
|
|
|
|
- "name_zh": ...,
|
|
|
|
|
- "name_en": ...,
|
|
|
|
|
- "data_sensitivity": ...,
|
|
|
|
|
- "describe": ...,
|
|
|
|
|
- "tag": ...,
|
|
|
|
|
- "create_time": ...,
|
|
|
|
|
- "category": ...,
|
|
|
|
|
- "status": ...
|
|
|
|
|
- }}
|
|
|
|
|
- """
|
|
|
|
|
- node_id = id
|
|
|
|
|
- cql = """
|
|
|
|
|
- MATCH (n:DataModel) WHERE id(n) = $nodeId
|
|
|
|
|
- OPTIONAL MATCH (n)-[:INCLUDES]->(meta:DataMeta)
|
|
|
|
|
- OPTIONAL MATCH (n)-[:DERIVED_FROM]->(resource:DataResource)
|
|
|
|
|
- OPTIONAL MATCH (n)-[:LABEL]->(tag:DataLabel)
|
|
|
|
|
- OPTIONAL MATCH (uses:model_use)-[:use]->(n)
|
|
|
|
|
- OPTIONAL MATCH (n)-[:has_component]->(component)
|
|
|
|
|
- WITH n,
|
|
|
|
|
- collect(DISTINCT meta) as meta_nodes,
|
|
|
|
|
- collect(DISTINCT resource) as resources,
|
|
|
|
|
- collect(DISTINCT component) as components,
|
|
|
|
|
- collect(DISTINCT uses) as uses,
|
|
|
|
|
- collect(DISTINCT tag) as tags,
|
|
|
|
|
- CASE WHEN n.childrenId IS NOT NULL THEN n.childrenId ELSE [] END as children
|
|
|
|
|
- RETURN {
|
|
|
|
|
- // 基本信息
|
|
|
|
|
- id: id(n),
|
|
|
|
|
- name_zh: n.name_zh,
|
|
|
|
|
- name_en: n.name_en,
|
|
|
|
|
- create_time: n.create_time,
|
|
|
|
|
- describe: n.describe,
|
|
|
|
|
- category: n.category,
|
|
|
|
|
- level: n.level,
|
|
|
|
|
- tag: CASE WHEN size(tags) > 0 AND tags[0] IS NOT NULL THEN {id: id(tags[0]), name: tags[0].name} ELSE null END,
|
|
|
|
|
- // 添加其他必需字段
|
|
|
|
|
- leader: n.leader,
|
|
|
|
|
- origin: n.origin,
|
|
|
|
|
- blood_resource: n.blood_resource,
|
|
|
|
|
- frequency: n.frequency,
|
|
|
|
|
- organization: n.organization,
|
|
|
|
|
- data_sensitivity: n.data_sensitivity,
|
|
|
|
|
- status: n.status,
|
|
|
|
|
- // 子节点列表
|
|
|
|
|
- childrenId: children
|
|
|
|
|
- } AS result,
|
|
|
|
|
- // 资源列表
|
|
|
|
|
- [{
|
|
|
|
|
- data_resource: [resource IN resources WHERE resource IS NOT NULL | {
|
|
|
|
|
- id: id(resource),
|
|
|
|
|
- name_zh: resource.name_zh,
|
|
|
|
|
- name_en: resource.name_en,
|
|
|
|
|
- description: resource.description
|
|
|
|
|
- }],
|
|
|
|
|
- resource_id: [resource IN resources WHERE resource IS NOT NULL | id(resource)],
|
|
|
|
|
- meta_ids: [meta IN meta_nodes WHERE meta IS NOT NULL | {
|
|
|
|
|
- id: id(meta),
|
|
|
|
|
- name_zh: meta.name_zh,
|
|
|
|
|
- name_en: meta.name_en,
|
|
|
|
|
- data_type: meta.data_type
|
|
|
|
|
- }]
|
|
|
|
|
- }] AS resource_selected
|
|
|
|
|
- """
|
|
|
|
|
-
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- result = session.run(cql, nodeId=node_id)
|
|
|
|
|
-
|
|
|
|
|
- # 处理查询结果
|
|
|
|
|
- record = result.single()
|
|
|
|
|
- logging.info(f"获得查询结果---------->>>{record}")
|
|
|
|
|
-
|
|
|
|
|
- if record:
|
|
|
|
|
- # 获取基本属性和资源选择列表
|
|
|
|
|
- properties = record["result"]
|
|
|
|
|
- resource_selected = record["resource_selected"]
|
|
|
|
|
-
|
|
|
|
|
- # 确保所有必需字段都有默认值,避免空值
|
|
|
|
|
- required_fields = ['tag', 'leader', 'origin', 'blood_resource',
|
|
|
|
|
- 'frequency', 'describe', 'organization', 'name_zh', 'name_en',
|
|
|
|
|
- 'data_sensitivity', 'create_time', 'category', 'status', 'childrenId']
|
|
|
|
|
-
|
|
|
|
|
- for field in required_fields:
|
|
|
|
|
- if field not in properties or properties[field] is None:
|
|
|
|
|
- if field == 'tag':
|
|
|
|
|
- properties[field] = {}
|
|
|
|
|
- elif field == 'childrenId':
|
|
|
|
|
- properties[field] = []
|
|
|
|
|
- else:
|
|
|
|
|
- properties[field] = ""
|
|
|
|
|
-
|
|
|
|
|
- # 构建最终返回格式
|
|
|
|
|
- final_data = {
|
|
|
|
|
- "resource_selected": resource_selected,
|
|
|
|
|
- **properties
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- return {"data_model": final_data}
|
|
|
|
|
- else:
|
|
|
|
|
- # 如果没有查询到结果,返回空的结构
|
|
|
|
|
- return {"data_model": {
|
|
|
|
|
- "resource_selected": [{"meta_ids": [], "data_resource": None, "resource_id": None}],
|
|
|
|
|
- "leader": None, "origin": None, "frequency": None, "childrenId": [],
|
|
|
|
|
- "organization": None, "name_zh": None, "name_en": None, "data_sensitivity": None,
|
|
|
|
|
- "describe": None, "tag": {}, "create_time": None, "category": None, "status": None
|
|
|
|
|
- }}
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-# 数据模型列表
|
|
|
|
|
-def model_list(skip_count, page_size, name_en_filter=None, name_zh_filter=None,
|
|
|
|
|
- category=None, tag=None, level=None):
|
|
|
|
|
- """
|
|
|
|
|
- 获取数据模型列表
|
|
|
|
|
-
|
|
|
|
|
- Args:
|
|
|
|
|
- skip_count: 跳过的数量
|
|
|
|
|
- page_size: 页面大小
|
|
|
|
|
- name_en_filter: 英文名称过滤条件
|
|
|
|
|
- name_zh_filter: 名称过滤条件
|
|
|
|
|
- category: 类别过滤条件
|
|
|
|
|
- tag: 标签过滤条件
|
|
|
|
|
- level: 层级过滤条件
|
|
|
|
|
-
|
|
|
|
|
- Returns:
|
|
|
|
|
- tuple: (数据模型列表, 总数量)
|
|
|
|
|
- """
|
|
|
|
|
- try:
|
|
|
|
|
- # 构建where子句 - 只针对DataModel节点的过滤条件
|
|
|
|
|
- datamodel_where_clause = []
|
|
|
|
|
- params = {}
|
|
|
|
|
-
|
|
|
|
|
- if name_zh_filter is not None:
|
|
|
|
|
- datamodel_where_clause.append("n.name_zh =~ $name_zh")
|
|
|
|
|
- params['name_zh'] = f".*{name_zh_filter}.*"
|
|
|
|
|
-
|
|
|
|
|
- if name_en_filter is not None:
|
|
|
|
|
- datamodel_where_clause.append("n.name_en =~ $name_en")
|
|
|
|
|
- params['name_en'] = f".*{name_en_filter}.*"
|
|
|
|
|
-
|
|
|
|
|
- if category is not None:
|
|
|
|
|
- datamodel_where_clause.append("n.category = $category")
|
|
|
|
|
- params['category'] = category
|
|
|
|
|
-
|
|
|
|
|
- if level is not None:
|
|
|
|
|
- datamodel_where_clause.append("n.level = $level")
|
|
|
|
|
- params['level'] = level
|
|
|
|
|
-
|
|
|
|
|
- # 处理标签查询
|
|
|
|
|
- if tag is not None:
|
|
|
|
|
- # 确保tag参数是整数类型
|
|
|
|
|
- try:
|
|
|
|
|
- tag_id = int(tag)
|
|
|
|
|
- params['tag'] = tag_id
|
|
|
|
|
- except (ValueError, TypeError):
|
|
|
|
|
- logger.warning(f"Invalid tag parameter: {tag}, expected integer")
|
|
|
|
|
- return [], 0
|
|
|
|
|
-
|
|
|
|
|
- # 有标签查询条件时,需要确保标签关系存在
|
|
|
|
|
- match_clause = "MATCH (n:DataModel)-[:LABEL]->(t)"
|
|
|
|
|
- datamodel_where_clause.append("id(t) = $tag")
|
|
|
|
|
- else:
|
|
|
|
|
- # 没有标签查询条件时,先匹配DataModel,然后可选连接标签
|
|
|
|
|
- match_clause = "MATCH (n:DataModel)"
|
|
|
|
|
-
|
|
|
|
|
- # 构建DataModel节点的WHERE子句
|
|
|
|
|
- datamodel_where_str = " AND ".join(datamodel_where_clause)
|
|
|
|
|
- if datamodel_where_str:
|
|
|
|
|
- datamodel_where_str = f"WHERE {datamodel_where_str}"
|
|
|
|
|
-
|
|
|
|
|
- # 构建查询
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- # 计算总数量
|
|
|
|
|
- if tag is not None:
|
|
|
|
|
- # 有标签查询时,直接使用标签连接
|
|
|
|
|
- count_query = f"""
|
|
|
|
|
- {match_clause}
|
|
|
|
|
- {datamodel_where_str}
|
|
|
|
|
- RETURN COUNT(DISTINCT n) AS count
|
|
|
|
|
- """
|
|
|
|
|
- else:
|
|
|
|
|
- # 无标签查询时,只计算DataModel节点
|
|
|
|
|
- count_query = f"""
|
|
|
|
|
- MATCH (n:DataModel)
|
|
|
|
|
- {datamodel_where_str}
|
|
|
|
|
- RETURN COUNT(n) AS count
|
|
|
|
|
- """
|
|
|
|
|
-
|
|
|
|
|
- logger.debug(f"Count query: {count_query}")
|
|
|
|
|
- logger.debug(f"Query parameters: {params}")
|
|
|
|
|
-
|
|
|
|
|
- count_result = session.run(count_query, **params)
|
|
|
|
|
- count_record = count_result.single()
|
|
|
|
|
- total = count_record['count'] if count_record else 0
|
|
|
|
|
-
|
|
|
|
|
- # 查询数据 - 修复OPTIONAL MATCH的笛卡尔积问题
|
|
|
|
|
- if tag is not None:
|
|
|
|
|
- # 有标签查询时,直接使用标签连接
|
|
|
|
|
- query = f"""
|
|
|
|
|
- {match_clause}
|
|
|
|
|
- {datamodel_where_str}
|
|
|
|
|
- RETURN DISTINCT
|
|
|
|
|
- id(n) as id,
|
|
|
|
|
- n.name_zh as name_zh,
|
|
|
|
|
- n.name_en as name_en,
|
|
|
|
|
- n.create_time as create_time,
|
|
|
|
|
- n.describe as describe,
|
|
|
|
|
- n.level as level,
|
|
|
|
|
- n.category as category,
|
|
|
|
|
- n.status as status,
|
|
|
|
|
- n.leader as leader,
|
|
|
|
|
- n.origin as origin,
|
|
|
|
|
- n.blood_resource as blood_resource,
|
|
|
|
|
- n.organization as organization,
|
|
|
|
|
- id(t) as tag_id,
|
|
|
|
|
- t.name_zh as tag_name
|
|
|
|
|
- ORDER BY time DESC
|
|
|
|
|
- SKIP $skip
|
|
|
|
|
- LIMIT $limit
|
|
|
|
|
- """
|
|
|
|
|
- else:
|
|
|
|
|
- # 无标签查询时,先过滤DataModel节点,然后可选连接标签
|
|
|
|
|
- query = f"""
|
|
|
|
|
- MATCH (n:DataModel)
|
|
|
|
|
- {datamodel_where_str}
|
|
|
|
|
- WITH n
|
|
|
|
|
- OPTIONAL MATCH (n)-[:LABEL]->(t)
|
|
|
|
|
- RETURN
|
|
|
|
|
- id(n) as id,
|
|
|
|
|
- n.name_zh as name_zh,
|
|
|
|
|
- n.name_en as name_en,
|
|
|
|
|
- n.create_time as create_time,
|
|
|
|
|
- n.describe as describe,
|
|
|
|
|
- n.level as level,
|
|
|
|
|
- n.category as category,
|
|
|
|
|
- n.status as status,
|
|
|
|
|
- n.leader as leader,
|
|
|
|
|
- n.origin as origin,
|
|
|
|
|
- n.blood_resource as blood_resource,
|
|
|
|
|
- n.organization as organization,
|
|
|
|
|
- id(t) as tag_id,
|
|
|
|
|
- t.name_zh as tag_name
|
|
|
|
|
- ORDER BY n.create_time DESC
|
|
|
|
|
- SKIP $skip
|
|
|
|
|
- LIMIT $limit
|
|
|
|
|
- """
|
|
|
|
|
-
|
|
|
|
|
- logger.debug(f"Main query: {query}")
|
|
|
|
|
-
|
|
|
|
|
- result = session.run(query, skip=skip_count, limit=page_size, **params)
|
|
|
|
|
-
|
|
|
|
|
- # 处理结果
|
|
|
|
|
- data = []
|
|
|
|
|
- for record in result:
|
|
|
|
|
- item = {
|
|
|
|
|
- "id": record['id'],
|
|
|
|
|
- "name_zh": record['name_zh'],
|
|
|
|
|
- "name_en": record['name_en'],
|
|
|
|
|
- "create_time": record['create_time'],
|
|
|
|
|
- "describe": record['describe'],
|
|
|
|
|
- "category": record['category'],
|
|
|
|
|
- "status": record['status'],
|
|
|
|
|
- "leader": record['leader'],
|
|
|
|
|
- "origin": record['origin'],
|
|
|
|
|
- "blood_resource": record['blood_resource'],
|
|
|
|
|
- "organization": record['organization'],
|
|
|
|
|
- "level": record['level'],
|
|
|
|
|
- "tag": {"id": record['tag_id'], "name_zh": record['tag_name']} if record['tag_id'] is not None else None
|
|
|
|
|
- }
|
|
|
|
|
- data.append(item)
|
|
|
|
|
-
|
|
|
|
|
- logger.info(f"Query returned {len(data)} items out of {total} total")
|
|
|
|
|
- return data, total
|
|
|
|
|
-
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- logger.error(f"Error in model_list: {str(e)}")
|
|
|
|
|
- import traceback
|
|
|
|
|
- traceback.print_exc()
|
|
|
|
|
- return [], 0
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-# 有血缘关系的数据资源列表
|
|
|
|
|
-def model_resource_list(skip_count, page_size, name_zh_filter=None, id=None,
|
|
|
|
|
- category=None, create_time=None):
|
|
|
|
|
- """
|
|
|
|
|
- 获取数据模型相关的数据资源列表
|
|
|
|
|
-
|
|
|
|
|
- Args:
|
|
|
|
|
- skip_count: 跳过的数量
|
|
|
|
|
- page_size: 页面大小
|
|
|
|
|
- name_zh_filter: 名称过滤条件
|
|
|
|
|
- id: 数据模型ID
|
|
|
|
|
- category: 类别过滤条件
|
|
|
|
|
- create_time: 时间过滤条件
|
|
|
|
|
-
|
|
|
|
|
- Returns:
|
|
|
|
|
- tuple: (数据资源列表, 总数量)
|
|
|
|
|
- """
|
|
|
|
|
- try:
|
|
|
|
|
- # 构建基础查询
|
|
|
|
|
- base_query = """
|
|
|
|
|
- MATCH (n:DataModel)
|
|
|
|
|
- WHERE id(n) = $nodeId
|
|
|
|
|
- MATCH (n)-[:children]->(m:DataResource)
|
|
|
|
|
- """
|
|
|
|
|
-
|
|
|
|
|
- # 计算总数量
|
|
|
|
|
- count_query = base_query + """
|
|
|
|
|
- RETURN COUNT(m) as count
|
|
|
|
|
- """
|
|
|
|
|
-
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- # 执行计数查询
|
|
|
|
|
- count_result = session.run(count_query, nodeId=id)
|
|
|
|
|
- count_record = count_result.single()
|
|
|
|
|
- total = count_record['count'] if count_record else 0
|
|
|
|
|
-
|
|
|
|
|
- # 使用分页和筛选条件构建主查询
|
|
|
|
|
- main_query = base_query + """
|
|
|
|
|
- MATCH (m)-[:LABEL]->(l)
|
|
|
|
|
- WHERE id(n) = $nodeId and labels(m) <> ['DataMeta']
|
|
|
|
|
- RETURN m.name_zh as name_zh,
|
|
|
|
|
- m.name_en as name_en,
|
|
|
|
|
- id(m) as id,
|
|
|
|
|
- l.name_zh as label,
|
|
|
|
|
- m.create_time as create_time,
|
|
|
|
|
- m.description as description,
|
|
|
|
|
- m.category as category
|
|
|
|
|
- ORDER BY m.create_time DESC
|
|
|
|
|
- SKIP $skip LIMIT $limit
|
|
|
|
|
- """
|
|
|
|
|
-
|
|
|
|
|
- # 执行主查询
|
|
|
|
|
- result = session.run(main_query, nodeId=id, skip=skip_count, limit=page_size)
|
|
|
|
|
-
|
|
|
|
|
- # 处理结果
|
|
|
|
|
- data = []
|
|
|
|
|
- for record in result:
|
|
|
|
|
- item = {
|
|
|
|
|
- "name_zh": record['name_zh'],
|
|
|
|
|
- "name_en": record['name_en'],
|
|
|
|
|
- "id": record['id'],
|
|
|
|
|
- "label": record['label'],
|
|
|
|
|
- "create_time": record['create_time'],
|
|
|
|
|
- "description": record['description'],
|
|
|
|
|
- "category": record['category']
|
|
|
|
|
- }
|
|
|
|
|
- data.append(item)
|
|
|
|
|
-
|
|
|
|
|
- return data, total
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- print(f"Error in model_resource_list: {str(e)}")
|
|
|
|
|
- import traceback
|
|
|
|
|
- traceback.print_exc()
|
|
|
|
|
- return [], 0
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-# 数据模型血缘图谱
|
|
|
|
|
-def model_kinship_graph(nodeid, meta=False):
|
|
|
|
|
- """
|
|
|
|
|
- 生成数据模型的血缘关系图谱
|
|
|
|
|
- 按照DERIVED_FROM关系进行递归查找,从当前节点作为起点查找所有DERIVED_FROM关系指向的节点
|
|
|
|
|
-
|
|
|
|
|
- Args:
|
|
|
|
|
- nodeid: 节点ID
|
|
|
|
|
- meta: 是否包含元数据
|
|
|
|
|
-
|
|
|
|
|
- Returns:
|
|
|
|
|
- dict: 包含节点和连线信息的图谱数据
|
|
|
|
|
- """
|
|
|
|
|
- try:
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- # 确保nodeid为整数
|
|
|
|
|
- try:
|
|
|
|
|
- nodeid_int = int(nodeid)
|
|
|
|
|
- except (ValueError, TypeError):
|
|
|
|
|
- logger.error(f"节点ID不是有效的整数: {nodeid}")
|
|
|
|
|
- return {"nodes": [], "lines": []}
|
|
|
|
|
-
|
|
|
|
|
- # 查询起始模型节点是否存在
|
|
|
|
|
- start_node_query = """
|
|
|
|
|
- MATCH (n:DataModel)
|
|
|
|
|
- WHERE id(n) = $nodeId
|
|
|
|
|
- RETURN n
|
|
|
|
|
- """
|
|
|
|
|
-
|
|
|
|
|
- start_result = session.run(start_node_query, nodeId=nodeid_int)
|
|
|
|
|
- start_record = start_result.single()
|
|
|
|
|
-
|
|
|
|
|
- if not start_record:
|
|
|
|
|
- logger.error(f"未找到ID为{nodeid_int}的DataModel节点")
|
|
|
|
|
- return {"nodes": [], "lines": []}
|
|
|
|
|
-
|
|
|
|
|
- # 递归查找DERIVED_FROM关系
|
|
|
|
|
- cypher = """
|
|
|
|
|
- MATCH (start:DataModel)
|
|
|
|
|
- WHERE id(start) = $nodeId
|
|
|
|
|
- MATCH path = (start)-[:DERIVED_FROM*0..]->(target)
|
|
|
|
|
- WHERE target:DataResource OR target:DataModel
|
|
|
|
|
- RETURN path
|
|
|
|
|
- """
|
|
|
|
|
-
|
|
|
|
|
- result = session.run(cypher, nodeId=nodeid_int)
|
|
|
|
|
-
|
|
|
|
|
- # 收集节点和关系
|
|
|
|
|
- nodes = {}
|
|
|
|
|
- lines = {}
|
|
|
|
|
-
|
|
|
|
|
- for record in result:
|
|
|
|
|
- # 处理路径
|
|
|
|
|
- path = record['path']
|
|
|
|
|
- logger.debug(f"处理路径,长度: {len(path)}, 节点数: {len(path.nodes)}, 关系数: {len(path.relationships)}")
|
|
|
|
|
-
|
|
|
|
|
- # 处理路径中的所有节点
|
|
|
|
|
- for node in path.nodes:
|
|
|
|
|
- node_id = int(node.id) # 直接转换为整数
|
|
|
|
|
- if node_id not in nodes:
|
|
|
|
|
- node_dict = serialize_node_properties(node)
|
|
|
|
|
- node_dict["id"] = str(node_id)
|
|
|
|
|
- node_dict["node_type"] = list(node.labels)[0] if node.labels else ""
|
|
|
|
|
- nodes[node_id] = node_dict
|
|
|
|
|
- logger.debug(f"添加节点: ID={node_id}, 标签={list(node.labels)}")
|
|
|
|
|
-
|
|
|
|
|
- # 处理路径中的所有关系
|
|
|
|
|
- for rel in path.relationships:
|
|
|
|
|
- rel_id = int(rel.id) # 直接转换为整数
|
|
|
|
|
- if rel_id not in lines:
|
|
|
|
|
- rel_dict = {
|
|
|
|
|
- "id": str(rel_id),
|
|
|
|
|
- "from": str(int(rel.start_node.id)),
|
|
|
|
|
- "to": str(int(rel.end_node.id)),
|
|
|
|
|
- "text": rel.type
|
|
|
|
|
- }
|
|
|
|
|
- lines[rel_id] = rel_dict
|
|
|
|
|
- logger.debug(f"添加关系: ID={rel_id}, 类型={rel.type}, 从{int(rel.start_node.id)}到{int(rel.end_node.id)}")
|
|
|
|
|
-
|
|
|
|
|
- # 如果需要元数据,查询INCLUDES关系
|
|
|
|
|
- if meta:
|
|
|
|
|
- meta_cypher = """
|
|
|
|
|
- MATCH (start:DataModel)-[r:INCLUDES]->(meta:DataMeta)
|
|
|
|
|
- WHERE id(start) = $nodeId
|
|
|
|
|
- RETURN start, r, meta
|
|
|
|
|
- """
|
|
|
|
|
- meta_result = session.run(meta_cypher, nodeId=nodeid_int)
|
|
|
|
|
-
|
|
|
|
|
- for meta_record in meta_result:
|
|
|
|
|
- start_node = meta_record['start']
|
|
|
|
|
- rel = meta_record['r']
|
|
|
|
|
- meta_node = meta_record['meta']
|
|
|
|
|
-
|
|
|
|
|
- # 添加元数据节点
|
|
|
|
|
- meta_node_id = int(meta_node.id)
|
|
|
|
|
- if meta_node_id not in nodes:
|
|
|
|
|
- node_dict = serialize_node_properties(meta_node)
|
|
|
|
|
- node_dict["id"] = str(meta_node_id)
|
|
|
|
|
- node_dict["node_type"] = list(meta_node.labels)[0] if meta_node.labels else ""
|
|
|
|
|
- nodes[meta_node_id] = node_dict
|
|
|
|
|
-
|
|
|
|
|
- # 添加INCLUDES关系
|
|
|
|
|
- rel_id = int(rel.id)
|
|
|
|
|
- if rel_id not in lines:
|
|
|
|
|
- rel_dict = {
|
|
|
|
|
- "id": str(rel_id),
|
|
|
|
|
- "from": str(nodeid_int),
|
|
|
|
|
- "to": str(meta_node_id),
|
|
|
|
|
- "text": rel.type
|
|
|
|
|
- }
|
|
|
|
|
- lines[rel_id] = rel_dict
|
|
|
|
|
-
|
|
|
|
|
- logger.info(f"成功获取血缘关系图谱,ID: {nodeid_int}, 节点数: {len(nodes)}, 关系数: {len(lines)}")
|
|
|
|
|
- return {
|
|
|
|
|
- "nodes": list(nodes.values()),
|
|
|
|
|
- "lines": list(lines.values())
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- logger.error(f"获取数据模型血缘关系图谱失败: {str(e)}")
|
|
|
|
|
- import traceback
|
|
|
|
|
- logger.error(f"错误详情: {traceback.format_exc()}")
|
|
|
|
|
- return {"nodes": [], "lines": []}
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-# 数据模型影响图谱
|
|
|
|
|
-def model_impact_graph(nodeid, meta=False):
|
|
|
|
|
- """
|
|
|
|
|
- 生成数据模型的影响关系图谱
|
|
|
|
|
- 按照DERIVED_FROM关系进行递归查找,从当前节点作为终点查找所有指向这个终点的节点
|
|
|
|
|
-
|
|
|
|
|
- Args:
|
|
|
|
|
- nodeid: 节点ID
|
|
|
|
|
- meta: 是否包含元数据
|
|
|
|
|
-
|
|
|
|
|
- Returns:
|
|
|
|
|
- dict: 包含节点和连线信息的图谱数据
|
|
|
|
|
- """
|
|
|
|
|
- try:
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- # 确保nodeid为整数
|
|
|
|
|
- try:
|
|
|
|
|
- nodeid_int = int(nodeid)
|
|
|
|
|
- except (ValueError, TypeError):
|
|
|
|
|
- logger.error(f"节点ID不是有效的整数: {nodeid}")
|
|
|
|
|
- return {"nodes": [], "lines": []}
|
|
|
|
|
-
|
|
|
|
|
- # 查询起始模型节点是否存在
|
|
|
|
|
- start_node_query = """
|
|
|
|
|
- MATCH (n:DataModel)
|
|
|
|
|
- WHERE id(n) = $nodeId
|
|
|
|
|
- RETURN n
|
|
|
|
|
- """
|
|
|
|
|
-
|
|
|
|
|
- start_result = session.run(start_node_query, nodeId=nodeid_int)
|
|
|
|
|
- start_record = start_result.single()
|
|
|
|
|
-
|
|
|
|
|
- if not start_record:
|
|
|
|
|
- logger.error(f"未找到ID为{nodeid_int}的DataModel节点")
|
|
|
|
|
- return {"nodes": [], "lines": []}
|
|
|
|
|
-
|
|
|
|
|
- # 递归查找指向当前节点的DERIVED_FROM关系
|
|
|
|
|
- cypher = """
|
|
|
|
|
- MATCH (target:DataModel)
|
|
|
|
|
- WHERE id(target) = $nodeId
|
|
|
|
|
- MATCH path = (source)-[:DERIVED_FROM*0..]->(target)
|
|
|
|
|
- WHERE source:DataResource OR source:DataModel
|
|
|
|
|
- RETURN path
|
|
|
|
|
- """
|
|
|
|
|
-
|
|
|
|
|
- result = session.run(cypher, nodeId=nodeid_int)
|
|
|
|
|
-
|
|
|
|
|
- # 收集节点和关系
|
|
|
|
|
- nodes = {}
|
|
|
|
|
- lines = {}
|
|
|
|
|
-
|
|
|
|
|
- for record in result:
|
|
|
|
|
- # 处理路径
|
|
|
|
|
- path = record['path']
|
|
|
|
|
- logger.debug(f"处理影响路径,长度: {len(path)}, 节点数: {len(path.nodes)}, 关系数: {len(path.relationships)}")
|
|
|
|
|
-
|
|
|
|
|
- # 处理路径中的所有节点
|
|
|
|
|
- for node in path.nodes:
|
|
|
|
|
- node_id = int(node.id) # 直接转换为整数
|
|
|
|
|
- if node_id not in nodes:
|
|
|
|
|
- node_dict = serialize_node_properties(node)
|
|
|
|
|
- node_dict["id"] = str(node_id)
|
|
|
|
|
- node_dict["node_type"] = list(node.labels)[0] if node.labels else ""
|
|
|
|
|
- nodes[node_id] = node_dict
|
|
|
|
|
- logger.debug(f"添加影响节点: ID={node_id}, 标签={list(node.labels)}")
|
|
|
|
|
-
|
|
|
|
|
- # 处理路径中的所有关系
|
|
|
|
|
- for rel in path.relationships:
|
|
|
|
|
- rel_id = int(rel.id) # 直接转换为整数
|
|
|
|
|
- if rel_id not in lines:
|
|
|
|
|
- rel_dict = {
|
|
|
|
|
- "id": str(rel_id),
|
|
|
|
|
- "from": str(int(rel.start_node.id)),
|
|
|
|
|
- "to": str(int(rel.end_node.id)),
|
|
|
|
|
- "text": rel.type
|
|
|
|
|
- }
|
|
|
|
|
- lines[rel_id] = rel_dict
|
|
|
|
|
- logger.debug(f"添加影响关系: ID={rel_id}, 类型={rel.type}, 从{int(rel.start_node.id)}到{int(rel.end_node.id)}")
|
|
|
|
|
-
|
|
|
|
|
- # 如果需要元数据,查询INCLUDES关系
|
|
|
|
|
- if meta:
|
|
|
|
|
- meta_cypher = """
|
|
|
|
|
- MATCH (target:DataModel)-[r:INCLUDES]->(meta:DataMeta)
|
|
|
|
|
- WHERE id(target) = $nodeId
|
|
|
|
|
- RETURN target, r, meta
|
|
|
|
|
- """
|
|
|
|
|
- meta_result = session.run(meta_cypher, nodeId=nodeid_int)
|
|
|
|
|
-
|
|
|
|
|
- for meta_record in meta_result:
|
|
|
|
|
- target_node = meta_record['target']
|
|
|
|
|
- rel = meta_record['r']
|
|
|
|
|
- meta_node = meta_record['meta']
|
|
|
|
|
-
|
|
|
|
|
- # 添加元数据节点
|
|
|
|
|
- meta_node_id = int(meta_node.id)
|
|
|
|
|
- if meta_node_id not in nodes:
|
|
|
|
|
- node_dict = serialize_node_properties(meta_node)
|
|
|
|
|
- node_dict["id"] = str(meta_node_id)
|
|
|
|
|
- node_dict["node_type"] = list(meta_node.labels)[0] if meta_node.labels else ""
|
|
|
|
|
- nodes[meta_node_id] = node_dict
|
|
|
|
|
-
|
|
|
|
|
- # 添加INCLUDES关系
|
|
|
|
|
- rel_id = int(rel.id)
|
|
|
|
|
- if rel_id not in lines:
|
|
|
|
|
- rel_dict = {
|
|
|
|
|
- "id": str(rel_id),
|
|
|
|
|
- "from": str(nodeid_int),
|
|
|
|
|
- "to": str(meta_node_id),
|
|
|
|
|
- "text": rel.type
|
|
|
|
|
- }
|
|
|
|
|
- lines[rel_id] = rel_dict
|
|
|
|
|
-
|
|
|
|
|
- logger.info(f"成功获取影响关系图谱,ID: {nodeid_int}, 节点数: {len(nodes)}, 关系数: {len(lines)}")
|
|
|
|
|
- return {
|
|
|
|
|
- "nodes": list(nodes.values()),
|
|
|
|
|
- "lines": list(lines.values())
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- logger.error(f"获取数据模型影响关系图谱失败: {str(e)}")
|
|
|
|
|
- import traceback
|
|
|
|
|
- logger.error(f"错误详情: {traceback.format_exc()}")
|
|
|
|
|
- return {"nodes": [], "lines": []}
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-# 数据模型全部图谱
|
|
|
|
|
-def model_all_graph(nodeid, meta=False):
|
|
|
|
|
- """
|
|
|
|
|
- 生成数据模型的所有关系图谱
|
|
|
|
|
- 分别调用model_impact_graph查找影响关系,调用model_kinship_graph查找血缘关系,
|
|
|
|
|
- 然后合并两部分数据返回
|
|
|
|
|
-
|
|
|
|
|
- Args:
|
|
|
|
|
- nodeid: 节点ID
|
|
|
|
|
- meta: 是否包含元数据
|
|
|
|
|
-
|
|
|
|
|
- Returns:
|
|
|
|
|
- dict: 包含节点和连线信息的图谱数据
|
|
|
|
|
- """
|
|
|
|
|
- try:
|
|
|
|
|
- # 获取血缘关系图谱
|
|
|
|
|
- kinship_data = model_kinship_graph(nodeid, meta)
|
|
|
|
|
-
|
|
|
|
|
- # 获取影响关系图谱
|
|
|
|
|
- impact_data = model_impact_graph(nodeid, meta)
|
|
|
|
|
-
|
|
|
|
|
- # 合并节点数据,使用字典去重
|
|
|
|
|
- merged_nodes = {}
|
|
|
|
|
- merged_lines = {}
|
|
|
|
|
-
|
|
|
|
|
- # 添加血缘关系的节点和连线
|
|
|
|
|
- if kinship_data and 'nodes' in kinship_data:
|
|
|
|
|
- for node in kinship_data['nodes']:
|
|
|
|
|
- node_id = node.get('id')
|
|
|
|
|
- if node_id:
|
|
|
|
|
- merged_nodes[node_id] = node
|
|
|
|
|
-
|
|
|
|
|
- if kinship_data and 'lines' in kinship_data:
|
|
|
|
|
- for line in kinship_data['lines']:
|
|
|
|
|
- line_id = line.get('id')
|
|
|
|
|
- if line_id:
|
|
|
|
|
- merged_lines[line_id] = line
|
|
|
|
|
-
|
|
|
|
|
- # 添加影响关系的节点和连线
|
|
|
|
|
- if impact_data and 'nodes' in impact_data:
|
|
|
|
|
- for node in impact_data['nodes']:
|
|
|
|
|
- node_id = node.get('id')
|
|
|
|
|
- if node_id:
|
|
|
|
|
- merged_nodes[node_id] = node
|
|
|
|
|
-
|
|
|
|
|
- if impact_data and 'lines' in impact_data:
|
|
|
|
|
- for line in impact_data['lines']:
|
|
|
|
|
- line_id = line.get('id')
|
|
|
|
|
- if line_id:
|
|
|
|
|
- merged_lines[line_id] = line
|
|
|
|
|
-
|
|
|
|
|
- # 构建最终结果
|
|
|
|
|
- result = {
|
|
|
|
|
- "nodes": list(merged_nodes.values()),
|
|
|
|
|
- "lines": list(merged_lines.values())
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- logger.info(f"成功获取完整关系图谱,ID: {nodeid}, 节点数: {len(merged_nodes)}, 关系数: {len(merged_lines)}")
|
|
|
|
|
- return result
|
|
|
|
|
-
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- logger.error(f"获取数据模型完整关系图谱失败: {str(e)}")
|
|
|
|
|
- return {"nodes": [], "lines": []}
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-# 更新数据模型
|
|
|
|
|
-def data_model_edit(receiver):
|
|
|
|
|
- """
|
|
|
|
|
- 更新数据模型
|
|
|
|
|
-
|
|
|
|
|
- Args:
|
|
|
|
|
- receiver: 接收到的请求参数
|
|
|
|
|
-
|
|
|
|
|
- Returns:
|
|
|
|
|
- 更新结果
|
|
|
|
|
- """
|
|
|
|
|
- id = receiver.get('id')
|
|
|
|
|
- name = receiver.get('name_zh')
|
|
|
|
|
- name_en = receiver.get('name_en')
|
|
|
|
|
- category = receiver.get('category')
|
|
|
|
|
- describe = receiver.get('describe')
|
|
|
|
|
- tag = receiver.get('tag')
|
|
|
|
|
- frequency = receiver.get('frequency')
|
|
|
|
|
- leader = receiver.get('leader')
|
|
|
|
|
- organization = receiver.get('organization')
|
|
|
|
|
- status = bool(receiver.get('status')) if receiver.get('status') is not None else None
|
|
|
|
|
- meta_data = receiver.get('metaData', [])
|
|
|
|
|
-
|
|
|
|
|
- # 更新数据模型节点 - 添加新的字段
|
|
|
|
|
- query = """
|
|
|
|
|
- MATCH (n:DataModel) WHERE id(n) = $id
|
|
|
|
|
- SET n.name_zh = $name_zh,
|
|
|
|
|
- n.name_en = $name_en,
|
|
|
|
|
- n.category = $category,
|
|
|
|
|
- n.describe = $describe,
|
|
|
|
|
- n.frequency = $frequency,
|
|
|
|
|
- n.leader = $leader,
|
|
|
|
|
- n.organization = $organization,
|
|
|
|
|
- n.status = $status,
|
|
|
|
|
- n.create_time = $create_time
|
|
|
|
|
- RETURN n
|
|
|
|
|
- """
|
|
|
|
|
-
|
|
|
|
|
- create_time = get_formatted_time()
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- result = session.run(query,
|
|
|
|
|
- id=id,
|
|
|
|
|
- name_zh=name,
|
|
|
|
|
- name_en=name_en,
|
|
|
|
|
- category=category,
|
|
|
|
|
- describe=describe,
|
|
|
|
|
- frequency=frequency,
|
|
|
|
|
- leader=leader,
|
|
|
|
|
- organization=organization,
|
|
|
|
|
- status=status,
|
|
|
|
|
- create_time=create_time).data()
|
|
|
|
|
-
|
|
|
|
|
- # 处理标签关系
|
|
|
|
|
- if tag:
|
|
|
|
|
- # 先删除所有标签关系
|
|
|
|
|
- delete_query = """
|
|
|
|
|
- MATCH (n:DataModel)-[r:LABEL]->() WHERE id(n) = $id
|
|
|
|
|
- DELETE r
|
|
|
|
|
- """
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- session.run(delete_query, id=id)
|
|
|
|
|
-
|
|
|
|
|
- # 再创建新的标签关系
|
|
|
|
|
- tag_node = get_node_by_id('DataLabel', tag)
|
|
|
|
|
- if tag_node:
|
|
|
|
|
- model_node = get_node_by_id_no_label(id)
|
|
|
|
|
- if model_node:
|
|
|
|
|
- # 获取节点ID
|
|
|
|
|
- model_id = model_node.id if hasattr(model_node, 'id') else model_node
|
|
|
|
|
- tag_id = tag_node.id if hasattr(tag_node, 'id') else tag_node
|
|
|
|
|
-
|
|
|
|
|
- # 直接使用Cypher查询检查关系是否存在
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- rel_query = """
|
|
|
|
|
- MATCH (a)-[r:LABEL]->(b)
|
|
|
|
|
- WHERE id(a) = $start_id AND id(b) = $end_id
|
|
|
|
|
- RETURN count(r) > 0 as exists
|
|
|
|
|
- """
|
|
|
|
|
- rel_result = session.run(rel_query,
|
|
|
|
|
- start_id=int(model_id),
|
|
|
|
|
- end_id=int(tag_id)).single()
|
|
|
|
|
-
|
|
|
|
|
- # 如果关系不存在,则创建关系
|
|
|
|
|
- if not (rel_result and rel_result["exists"]):
|
|
|
|
|
- session.execute_write(
|
|
|
|
|
- lambda tx: tx.run(
|
|
|
|
|
- "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:LABEL]->(b)",
|
|
|
|
|
- a_id=int(model_id), b_id=int(tag_id)
|
|
|
|
|
- )
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- # 处理DataMeta节点关系更新
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- # 先删除DataModel关联的所有DataMeta关系
|
|
|
|
|
- delete_meta_query = """
|
|
|
|
|
- MATCH (n:DataModel)-[r:INCLUDES]->(m:DataMeta)
|
|
|
|
|
- WHERE id(n) = $id
|
|
|
|
|
- DELETE r
|
|
|
|
|
- """
|
|
|
|
|
- session.run(delete_meta_query, id=id)
|
|
|
|
|
- logger.info(f"已删除DataModel({id})的所有DataMeta关系")
|
|
|
|
|
-
|
|
|
|
|
- # 根据上传的metaData数据是否有值来决定是否重新构建INCLUDES关系
|
|
|
|
|
- if meta_data:
|
|
|
|
|
- # 根据上传的metaData数据重新构建INCLUDES关系
|
|
|
|
|
- for meta_item in meta_data:
|
|
|
|
|
- meta_id = meta_item.get('id')
|
|
|
|
|
- if meta_id:
|
|
|
|
|
- try:
|
|
|
|
|
- meta_id = int(meta_id)
|
|
|
|
|
- # 验证DataMeta节点是否存在
|
|
|
|
|
- check_meta_query = """
|
|
|
|
|
- MATCH (m:DataMeta)
|
|
|
|
|
- WHERE id(m) = $meta_id
|
|
|
|
|
- RETURN m
|
|
|
|
|
- """
|
|
|
|
|
- meta_result = session.run(check_meta_query, meta_id=meta_id)
|
|
|
|
|
-
|
|
|
|
|
- if meta_result.single():
|
|
|
|
|
- # 创建INCLUDES关系
|
|
|
|
|
- create_includes_query = """
|
|
|
|
|
- MATCH (n:DataModel), (m:DataMeta)
|
|
|
|
|
- WHERE id(n) = $model_id AND id(m) = $meta_id
|
|
|
|
|
- CREATE (n)-[:INCLUDES]->(m)
|
|
|
|
|
- RETURN n, m
|
|
|
|
|
- """
|
|
|
|
|
- session.run(create_includes_query, model_id=id, meta_id=meta_id)
|
|
|
|
|
- logger.info(f"成功创建INCLUDES关系: DataModel({id}) -> DataMeta({meta_id})")
|
|
|
|
|
- else:
|
|
|
|
|
- logger.warning(f"DataMeta节点不存在,ID: {meta_id}")
|
|
|
|
|
- except (ValueError, TypeError) as e:
|
|
|
|
|
- logger.error(f"无效的meta_id: {meta_id}, 错误: {str(e)}")
|
|
|
|
|
- else:
|
|
|
|
|
- logger.info(f"meta_data为空,不需要重新创建INCLUDES关系,DataModel({id})将不关联任何DataMeta节点")
|
|
|
|
|
-
|
|
|
|
|
- return {"message": "数据模型更新成功"}
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-def model_community(tag=None):
|
|
|
|
|
- """
|
|
|
|
|
- 查询DataModel的所有节点及DERIVED_FROM关系
|
|
|
|
|
-
|
|
|
|
|
- Args:
|
|
|
|
|
- tag: 可选的标签ID,如果指定则只查找有该标签的DataModel节点
|
|
|
|
|
-
|
|
|
|
|
- Returns:
|
|
|
|
|
- dict: 包含节点和连线信息的图谱数据,格式与model_kinship_graph相同
|
|
|
|
|
- """
|
|
|
|
|
- try:
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- # 构建查询条件
|
|
|
|
|
- if tag is not None:
|
|
|
|
|
- # 确保tag参数是整数类型
|
|
|
|
|
- try:
|
|
|
|
|
- tag_id = int(tag)
|
|
|
|
|
- except (ValueError, TypeError):
|
|
|
|
|
- logger.warning(f"Invalid tag parameter: {tag}, expected integer")
|
|
|
|
|
- return {"nodes": [], "lines": []}
|
|
|
|
|
-
|
|
|
|
|
- # 有标签查询条件时,查询有指定标签的DataModel节点及其DERIVED_FROM关系
|
|
|
|
|
- cypher = """
|
|
|
|
|
- MATCH (dm:DataModel)-[:LABEL]->(t)
|
|
|
|
|
- WHERE id(t) = $tag_id
|
|
|
|
|
- WITH dm
|
|
|
|
|
- MATCH path = (dm)-[:DERIVED_FROM*0..]->(target:DataModel)
|
|
|
|
|
- RETURN path
|
|
|
|
|
- UNION
|
|
|
|
|
- MATCH (dm:DataModel)-[:LABEL]->(t)
|
|
|
|
|
- WHERE id(t) = $tag_id
|
|
|
|
|
- WITH dm
|
|
|
|
|
- MATCH path = (source:DataModel)-[:DERIVED_FROM*0..]->(dm)
|
|
|
|
|
- RETURN path
|
|
|
|
|
- """
|
|
|
|
|
-
|
|
|
|
|
- result = session.run(cypher, tag_id=tag_id)
|
|
|
|
|
- else:
|
|
|
|
|
- # 没有标签查询条件时,查询所有DataModel节点及其DERIVED_FROM关系
|
|
|
|
|
- cypher = """
|
|
|
|
|
- MATCH (dm:DataModel)
|
|
|
|
|
- WITH dm
|
|
|
|
|
- MATCH path = (dm)-[:DERIVED_FROM*0..]->(target:DataModel)
|
|
|
|
|
- RETURN path
|
|
|
|
|
- UNION
|
|
|
|
|
- MATCH (dm:DataModel)
|
|
|
|
|
- WITH dm
|
|
|
|
|
- MATCH path = (source:DataModel)-[:DERIVED_FROM*0..]->(dm)
|
|
|
|
|
- RETURN path
|
|
|
|
|
- """
|
|
|
|
|
-
|
|
|
|
|
- result = session.run(cypher)
|
|
|
|
|
-
|
|
|
|
|
- # 收集节点和关系
|
|
|
|
|
- nodes = {}
|
|
|
|
|
- lines = {}
|
|
|
|
|
-
|
|
|
|
|
- for record in result:
|
|
|
|
|
- # 处理路径
|
|
|
|
|
- path = record['path']
|
|
|
|
|
- logger.debug(f"处理社区路径,长度: {len(path)}, 节点数: {len(path.nodes)}, 关系数: {len(path.relationships)}")
|
|
|
|
|
-
|
|
|
|
|
- # 处理路径中的所有节点
|
|
|
|
|
- for node in path.nodes:
|
|
|
|
|
- node_id = int(node.id) # 直接转换为整数
|
|
|
|
|
- if node_id not in nodes:
|
|
|
|
|
- node_dict = serialize_node_properties(node)
|
|
|
|
|
- node_dict["id"] = str(node_id)
|
|
|
|
|
- node_dict["node_type"] = list(node.labels)[0] if node.labels else ""
|
|
|
|
|
- nodes[node_id] = node_dict
|
|
|
|
|
- logger.debug(f"添加社区节点: ID={node_id}, 标签={list(node.labels)}")
|
|
|
|
|
-
|
|
|
|
|
- # 处理路径中的所有关系
|
|
|
|
|
- for rel in path.relationships:
|
|
|
|
|
- rel_id = int(rel.id) # 直接转换为整数
|
|
|
|
|
- if rel_id not in lines:
|
|
|
|
|
- rel_dict = {
|
|
|
|
|
- "id": str(rel_id),
|
|
|
|
|
- "from": str(int(rel.start_node.id)),
|
|
|
|
|
- "to": str(int(rel.end_node.id)),
|
|
|
|
|
- "text": rel.type
|
|
|
|
|
- }
|
|
|
|
|
- lines[rel_id] = rel_dict
|
|
|
|
|
- logger.debug(f"添加社区关系: ID={rel_id}, 类型={rel.type}, 从{int(rel.start_node.id)}到{int(rel.end_node.id)}")
|
|
|
|
|
-
|
|
|
|
|
- logger.info(f"成功获取数据模型社区图谱,标签ID: {tag}, 节点数: {len(nodes)}, 关系数: {len(lines)}")
|
|
|
|
|
- return {
|
|
|
|
|
- "nodes": list(nodes.values()),
|
|
|
|
|
- "lines": list(lines.values())
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- logger.error(f"获取数据模型社区图谱失败: {str(e)}")
|
|
|
|
|
- import traceback
|
|
|
|
|
- logger.error(f"错误详情: {traceback.format_exc()}")
|
|
|
|
|
- return {"nodes": [], "lines": []}
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-def model_search_list(model_id, page, page_size, name_en_filter=None,
|
|
|
|
|
- name_zh_filter=None, category_filter=None, tag_filter=None):
|
|
|
|
|
- """获取特定数据模型关联的元数据列表"""
|
|
|
|
|
- try:
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- # 确保model_id为整数
|
|
|
|
|
- try:
|
|
|
|
|
- model_id_int = int(model_id)
|
|
|
|
|
- except (ValueError, TypeError):
|
|
|
|
|
- logger.error(f"模型ID不是有效的整数: {model_id}")
|
|
|
|
|
- return [], 0
|
|
|
|
|
-
|
|
|
|
|
- # 基本匹配语句 - 支持DataMeta和Metadata标签
|
|
|
|
|
- match_clause = """
|
|
|
|
|
- MATCH (n:DataModel)-[:INCLUDES]->(m)
|
|
|
|
|
- WHERE id(n) = $model_id
|
|
|
|
|
- AND (m:DataMeta OR m:Metadata)
|
|
|
|
|
- """
|
|
|
|
|
-
|
|
|
|
|
- where_conditions = []
|
|
|
|
|
-
|
|
|
|
|
- if name_en_filter:
|
|
|
|
|
- where_conditions.append(f"m.name_en CONTAINS '{name_en_filter}'")
|
|
|
|
|
-
|
|
|
|
|
- if name_zh_filter:
|
|
|
|
|
- where_conditions.append(f"m.name_zh CONTAINS '{name_zh_filter}'")
|
|
|
|
|
-
|
|
|
|
|
- if category_filter:
|
|
|
|
|
- where_conditions.append(f"m.category = '{category_filter}'")
|
|
|
|
|
-
|
|
|
|
|
- # 标签过滤需要额外的匹配
|
|
|
|
|
- tag_match = ""
|
|
|
|
|
- if tag_filter:
|
|
|
|
|
- tag_match = "MATCH (m)-[:HAS_TAG]->(t:Tag) WHERE t.name_zh = $tag_filter"
|
|
|
|
|
-
|
|
|
|
|
- where_clause = " AND " + " AND ".join(where_conditions) if where_conditions else ""
|
|
|
|
|
-
|
|
|
|
|
- # 计算总数
|
|
|
|
|
- count_cypher = f"""
|
|
|
|
|
- {match_clause}{where_clause}
|
|
|
|
|
- {tag_match}
|
|
|
|
|
- RETURN count(m) as count
|
|
|
|
|
- """
|
|
|
|
|
- count_params = {"model_id": model_id_int}
|
|
|
|
|
- if tag_filter:
|
|
|
|
|
- count_params["tag_filter"] = tag_filter
|
|
|
|
|
-
|
|
|
|
|
- count_result = session.run(count_cypher, count_params)
|
|
|
|
|
- count_record = count_result.single()
|
|
|
|
|
- total_count = count_record["count"] if count_record else 0
|
|
|
|
|
-
|
|
|
|
|
- # 分页查询
|
|
|
|
|
- skip = (page - 1) * page_size
|
|
|
|
|
- cypher = f"""
|
|
|
|
|
- {match_clause}{where_clause}
|
|
|
|
|
- {tag_match}
|
|
|
|
|
- RETURN m
|
|
|
|
|
- ORDER BY m.name_zh
|
|
|
|
|
- SKIP {skip} LIMIT {page_size}
|
|
|
|
|
- """
|
|
|
|
|
-
|
|
|
|
|
- result = session.run(cypher, count_params) # type: ignore[arg-type]
|
|
|
|
|
-
|
|
|
|
|
- # 格式化结果
|
|
|
|
|
- metadata_list = []
|
|
|
|
|
- for record in result:
|
|
|
|
|
- meta = serialize_node_properties(record["m"])
|
|
|
|
|
- meta["id"] = record["m"].id
|
|
|
|
|
- metadata_list.append(meta)
|
|
|
|
|
-
|
|
|
|
|
- logger.info(f"成功获取数据模型关联元数据,ID: {model_id_int}, 元数据数量: {total_count}")
|
|
|
|
|
- return metadata_list, total_count
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- logger.error(f"获取数据模型关联的元数据列表失败: {str(e)}")
|
|
|
|
|
- return [], 0
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-def get_businessdomain_node(name_zh):
|
|
|
|
|
- """
|
|
|
|
|
- 查找BusinessDomain节点,需要同时满足两个条件:
|
|
|
|
|
- 1. name_zh匹配
|
|
|
|
|
- 2. 存在与"数据模型"标签的BELONGS_TO关系
|
|
|
|
|
-
|
|
|
|
|
- Args:
|
|
|
|
|
- name_zh: 业务域节点的中文名称
|
|
|
|
|
-
|
|
|
|
|
- Returns:
|
|
|
|
|
- 节点对象或None(如果不存在)
|
|
|
|
|
- """
|
|
|
|
|
- try:
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- query = """
|
|
|
|
|
- MATCH (bd:BusinessDomain)-[:BELONGS_TO]->(label:DataLabel)
|
|
|
|
|
- WHERE bd.name_zh = $name_zh
|
|
|
|
|
- AND (label.name_zh = '数据模型' OR label.name_en = 'data_model')
|
|
|
|
|
- RETURN bd
|
|
|
|
|
- LIMIT 1
|
|
|
|
|
- """
|
|
|
|
|
- result = session.run(query, name_zh=name_zh)
|
|
|
|
|
- record = result.single()
|
|
|
|
|
-
|
|
|
|
|
- if record and record.get('bd'):
|
|
|
|
|
- logger.info(f"找到已存在的BusinessDomain节点: name_zh={name_zh}")
|
|
|
|
|
- return record['bd']
|
|
|
|
|
- else:
|
|
|
|
|
- logger.info(f"未找到BusinessDomain节点: name_zh={name_zh}")
|
|
|
|
|
- return None
|
|
|
|
|
-
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- logger.error(f"查询BusinessDomain节点时发生错误: {str(e)}")
|
|
|
|
|
- return None
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-def handle_businessdomain_node(data_model, result_list, result, receiver, id_list):
|
|
|
|
|
- """
|
|
|
|
|
- 创建一个BusinessDomain业务域节点,属性和关联关系与DataModel节点一致
|
|
|
|
|
- 额外创建与DataLabel中"数据模型"标签的BELONGS_TO关系
|
|
|
|
|
-
|
|
|
|
|
- Args:
|
|
|
|
|
- data_model: 数据模型名称
|
|
|
|
|
- result_list: 数据模型英文名列表
|
|
|
|
|
- result: 序列化的ID列表
|
|
|
|
|
- receiver: 接收到的请求参数
|
|
|
|
|
- id_list: ID列表(用于处理资源关系)
|
|
|
|
|
-
|
|
|
|
|
- Returns:
|
|
|
|
|
- tuple: (node_id, business_domain_node)
|
|
|
|
|
- """
|
|
|
|
|
- try:
|
|
|
|
|
- logger.info(f"开始创建BusinessDomain节点,名称: {data_model}")
|
|
|
|
|
-
|
|
|
|
|
- # 添加数据资源 血缘关系的字段 blood_resource
|
|
|
|
|
- data_model_en = result_list[0] if result_list and len(result_list) > 0 else ""
|
|
|
|
|
-
|
|
|
|
|
- # 准备BusinessDomain节点的属性(与DataModel相同)
|
|
|
|
|
- bd_attributes = {
|
|
|
|
|
- 'name_zh': data_model,
|
|
|
|
|
- 'name_en': data_model_en,
|
|
|
|
|
- 'id_list': result,
|
|
|
|
|
- 'create_time': get_formatted_time(),
|
|
|
|
|
- 'description': receiver.get('description', ''),
|
|
|
|
|
- 'category': receiver.get('category', ''),
|
|
|
|
|
- 'leader': receiver.get('leader', ''),
|
|
|
|
|
- 'origin': receiver.get('origin', ''),
|
|
|
|
|
- 'frequency': receiver.get('frequency', ''),
|
|
|
|
|
- 'organization': receiver.get('organization', ''),
|
|
|
|
|
- 'data_sensitivity': receiver.get('data_sensitivity', ''),
|
|
|
|
|
- 'status': receiver.get('status', '')
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- # 创建BusinessDomain节点
|
|
|
|
|
- # 使用专用函数查找,需要同时满足name_zh和BELONGS_TO关系
|
|
|
|
|
- business_domain_node = get_businessdomain_node(data_model) or create_or_get_node('BusinessDomain', **bd_attributes)
|
|
|
|
|
-
|
|
|
|
|
- logger.info(f"BusinessDomain节点创建成功,data: {business_domain_node}")
|
|
|
|
|
-
|
|
|
|
|
- # 获取节点ID
|
|
|
|
|
- node_id = business_domain_node
|
|
|
|
|
- if hasattr(business_domain_node, 'id'):
|
|
|
|
|
- node_id = business_domain_node.id
|
|
|
|
|
- else:
|
|
|
|
|
- # 如果节点没有id属性,尝试通过查询获取
|
|
|
|
|
- query = """
|
|
|
|
|
- MATCH (n:BusinessDomain {name_zh: $name})
|
|
|
|
|
- RETURN id(n) as node_id
|
|
|
|
|
- """
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- result_query = session.run(query, name=data_model)
|
|
|
|
|
- record = result_query.single()
|
|
|
|
|
- if record and "node_id" in record:
|
|
|
|
|
- node_id = record["node_id"]
|
|
|
|
|
-
|
|
|
|
|
- logger.info(f"BusinessDomain节点ID: {node_id}")
|
|
|
|
|
-
|
|
|
|
|
- # 1. 处理子节点关系(child关系)
|
|
|
|
|
- child_list = receiver.get('childrenId', [])
|
|
|
|
|
- if child_list:
|
|
|
|
|
- logger.info(f"处理BusinessDomain的child关系,子节点数量: {len(child_list)}")
|
|
|
|
|
- for child_id in child_list:
|
|
|
|
|
- child_node = get_node_by_id_no_label(child_id)
|
|
|
|
|
- if child_node:
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- rel_query = """
|
|
|
|
|
- MATCH (a)-[r:child]->(b)
|
|
|
|
|
- WHERE id(a) = $start_id AND id(b) = $end_id
|
|
|
|
|
- RETURN count(r) > 0 as exists
|
|
|
|
|
- """
|
|
|
|
|
- child_node_id = child_node.id if hasattr(child_node, 'id') else int(child_node)
|
|
|
|
|
- rel_result = session.run(rel_query,
|
|
|
|
|
- start_id=int(node_id),
|
|
|
|
|
- end_id=int(child_node_id)).single()
|
|
|
|
|
-
|
|
|
|
|
- if not (rel_result and rel_result["exists"]):
|
|
|
|
|
- child_id_int = int(child_node_id)
|
|
|
|
|
- session.execute_write(
|
|
|
|
|
- lambda tx: tx.run(
|
|
|
|
|
- "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:child]->(b)",
|
|
|
|
|
- a_id=int(node_id), b_id=child_id_int
|
|
|
|
|
- )
|
|
|
|
|
- )
|
|
|
|
|
- logger.info(f"创建BusinessDomain child关系: {node_id} -> {child_node_id}")
|
|
|
|
|
-
|
|
|
|
|
- # 2. 处理标签关系(LABEL关系)
|
|
|
|
|
- if receiver.get('tag'):
|
|
|
|
|
- logger.info(f"处理BusinessDomain的LABEL关系,标签ID: {receiver['tag']}")
|
|
|
|
|
- tag = get_node_by_id('DataLabel', receiver['tag'])
|
|
|
|
|
- if tag:
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- rel_query = """
|
|
|
|
|
- MATCH (a)-[r:LABEL]->(b)
|
|
|
|
|
- WHERE id(a) = $start_id AND id(b) = $end_id
|
|
|
|
|
- RETURN count(r) > 0 as exists
|
|
|
|
|
- """
|
|
|
|
|
- rel_result = session.run(rel_query,
|
|
|
|
|
- start_id=int(node_id),
|
|
|
|
|
- end_id=int(tag.id)).single()
|
|
|
|
|
-
|
|
|
|
|
- if not (rel_result and rel_result["exists"]):
|
|
|
|
|
- session.execute_write(
|
|
|
|
|
- lambda tx: tx.run(
|
|
|
|
|
- "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:LABEL]->(b)",
|
|
|
|
|
- a_id=int(node_id), b_id=int(tag.id)
|
|
|
|
|
- )
|
|
|
|
|
- )
|
|
|
|
|
- logger.info(f"创建BusinessDomain LABEL关系: {node_id} -> {tag.id}")
|
|
|
|
|
-
|
|
|
|
|
- # 3. 处理数据源关系(COME_FROM关系)
|
|
|
|
|
- data_source = receiver.get('data_source')
|
|
|
|
|
- if data_source:
|
|
|
|
|
- logger.info(f"处理BusinessDomain的COME_FROM关系,数据源: {data_source}")
|
|
|
|
|
- try:
|
|
|
|
|
- data_source_id = None
|
|
|
|
|
- data_source_name_en = None
|
|
|
|
|
-
|
|
|
|
|
- # 获取数据源标识(支持多种格式)
|
|
|
|
|
- if isinstance(data_source, (int, float)) or (isinstance(data_source, str) and data_source.isdigit()):
|
|
|
|
|
- data_source_id = int(data_source)
|
|
|
|
|
- elif isinstance(data_source, dict) and data_source.get('name_en'):
|
|
|
|
|
- data_source_name_en = data_source['name_en']
|
|
|
|
|
- elif isinstance(data_source, str):
|
|
|
|
|
- data_source_name_en = data_source
|
|
|
|
|
-
|
|
|
|
|
- # 创建BusinessDomain与数据源的关系
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- if data_source_id is not None:
|
|
|
|
|
- check_ds_cypher = "MATCH (b:DataSource) WHERE id(b) = $ds_id RETURN b"
|
|
|
|
|
- check_ds_result = session.run(check_ds_cypher, ds_id=data_source_id)
|
|
|
|
|
-
|
|
|
|
|
- if not check_ds_result.single():
|
|
|
|
|
- logger.warning(f"数据源节点不存在: ID={data_source_id},跳过关系创建")
|
|
|
|
|
- else:
|
|
|
|
|
- rel_check_query = """
|
|
|
|
|
- MATCH (a:BusinessDomain)-[r:COME_FROM]->(b:DataSource)
|
|
|
|
|
- WHERE id(a) = $bd_id AND id(b) = $ds_id
|
|
|
|
|
- RETURN count(r) > 0 as exists
|
|
|
|
|
- """
|
|
|
|
|
- rel_check_result = session.run(rel_check_query,
|
|
|
|
|
- bd_id=int(node_id),
|
|
|
|
|
- ds_id=data_source_id).single()
|
|
|
|
|
-
|
|
|
|
|
- if not (rel_check_result and rel_check_result["exists"]):
|
|
|
|
|
- create_rel_cypher = """
|
|
|
|
|
- MATCH (a:BusinessDomain), (b:DataSource)
|
|
|
|
|
- WHERE id(a) = $bd_id AND id(b) = $ds_id
|
|
|
|
|
- CREATE (a)-[r:COME_FROM]->(b)
|
|
|
|
|
- RETURN r
|
|
|
|
|
- """
|
|
|
|
|
- session.run(create_rel_cypher,
|
|
|
|
|
- bd_id=int(node_id),
|
|
|
|
|
- ds_id=data_source_id)
|
|
|
|
|
- logger.info(f"创建BusinessDomain与数据源的COME_FROM关系: bd_id={node_id} -> data_source_id={data_source_id}")
|
|
|
|
|
-
|
|
|
|
|
- elif data_source_name_en:
|
|
|
|
|
- check_ds_cypher = "MATCH (b:DataSource {name_en: $name_en}) RETURN b"
|
|
|
|
|
- check_ds_result = session.run(check_ds_cypher, name_en=data_source_name_en)
|
|
|
|
|
-
|
|
|
|
|
- if not check_ds_result.single():
|
|
|
|
|
- logger.warning(f"数据源节点不存在: name_en={data_source_name_en},跳过关系创建")
|
|
|
|
|
- else:
|
|
|
|
|
- rel_check_query = """
|
|
|
|
|
- MATCH (a:BusinessDomain)-[r:COME_FROM]->(b:DataSource {name_en: $ds_name_en})
|
|
|
|
|
- WHERE id(a) = $bd_id
|
|
|
|
|
- RETURN count(r) > 0 as exists
|
|
|
|
|
- """
|
|
|
|
|
- rel_check_result = session.run(rel_check_query,
|
|
|
|
|
- bd_id=int(node_id),
|
|
|
|
|
- ds_name_en=data_source_name_en).single()
|
|
|
|
|
-
|
|
|
|
|
- if not (rel_check_result and rel_check_result["exists"]):
|
|
|
|
|
- create_rel_cypher = """
|
|
|
|
|
- MATCH (a:BusinessDomain), (b:DataSource {name_en: $ds_name_en})
|
|
|
|
|
- WHERE id(a) = $bd_id
|
|
|
|
|
- CREATE (a)-[r:COME_FROM]->(b)
|
|
|
|
|
- RETURN r
|
|
|
|
|
- """
|
|
|
|
|
- session.run(create_rel_cypher,
|
|
|
|
|
- bd_id=int(node_id),
|
|
|
|
|
- ds_name_en=data_source_name_en)
|
|
|
|
|
- logger.info(f"创建BusinessDomain与数据源的COME_FROM关系: bd_id={node_id} -> name_en={data_source_name_en}")
|
|
|
|
|
- else:
|
|
|
|
|
- logger.warning(f"data_source参数无效,无法识别格式: {data_source}")
|
|
|
|
|
-
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- logger.error(f"创建BusinessDomain与数据源关系时发生错误: {str(e)}")
|
|
|
|
|
-
|
|
|
|
|
- # 4. 处理与id_list中资源和元数据的关系(如果有)
|
|
|
|
|
- if id_list:
|
|
|
|
|
- logger.info(f"处理BusinessDomain与资源/元数据的关系,id_list数量: {len(id_list)}")
|
|
|
|
|
- # 构建meta_id和resouce_id的列表
|
|
|
|
|
- resouce_ids = [record['resource_id'] for record in id_list if 'resource_id' in record]
|
|
|
|
|
- meta_ids = [record['id'] for id_list_item in id_list for record in id_list_item.get('metaData', []) if 'id' in record]
|
|
|
|
|
-
|
|
|
|
|
- # 创建与DataResource的关系
|
|
|
|
|
- if resouce_ids:
|
|
|
|
|
- query = """
|
|
|
|
|
- MATCH (source:BusinessDomain), (target:DataResource)
|
|
|
|
|
- WHERE id(source)=$source_id AND id(target) IN $target_ids
|
|
|
|
|
- MERGE (source)-[:resource]->(target)
|
|
|
|
|
- """
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- session.run(query, source_id=int(node_id), target_ids=resouce_ids)
|
|
|
|
|
- logger.info(f"创建BusinessDomain与DataResource的关系,资源数量: {len(resouce_ids)}")
|
|
|
|
|
-
|
|
|
|
|
- # 处理元数据关系
|
|
|
|
|
- if meta_ids:
|
|
|
|
|
- for item in id_list:
|
|
|
|
|
- for meta_item in item.get('metaData', []):
|
|
|
|
|
- meta_id = meta_item['id']
|
|
|
|
|
- data_standard = meta_item.get('data_standard', '')
|
|
|
|
|
- name_en = meta_item.get('name_en', '')
|
|
|
|
|
- name_zh = meta_item.get('name_zh', '')
|
|
|
|
|
-
|
|
|
|
|
- # 创建meta_node节点
|
|
|
|
|
- meta_params = {
|
|
|
|
|
- 'name_zh': name_zh,
|
|
|
|
|
- 'name_en': name_en,
|
|
|
|
|
- 'standard': data_standard,
|
|
|
|
|
- 'create_time': get_formatted_time()
|
|
|
|
|
- }
|
|
|
|
|
- meta_node = create_or_get_node('DataMeta', **meta_params)
|
|
|
|
|
-
|
|
|
|
|
- # 创建BusinessDomain与DataMeta的关系
|
|
|
|
|
- if meta_node:
|
|
|
|
|
- meta_node_id = meta_node.id if hasattr(meta_node, 'id') else meta_node
|
|
|
|
|
- query = """
|
|
|
|
|
- MATCH (source:BusinessDomain), (target:DataMeta)
|
|
|
|
|
- WHERE id(source) = $source_id AND id(target) = $target_id
|
|
|
|
|
- MERGE (source)-[:INCLUDES]->(target)
|
|
|
|
|
- """
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- session.run(query, source_id=int(node_id), target_id=int(meta_node_id))
|
|
|
|
|
- logger.info(f"创建BusinessDomain与DataMeta的关系,元数据数量: {len(meta_ids)}")
|
|
|
|
|
-
|
|
|
|
|
- # 5. 创建与DataLabel中"数据模型"标签的BELONGS_TO关系
|
|
|
|
|
- logger.info("查找DataLabel中的'数据模型'标签")
|
|
|
|
|
- with connect_graph().session() as session:
|
|
|
|
|
- # 查找名称为"数据模型"的DataLabel节点
|
|
|
|
|
- find_label_query = """
|
|
|
|
|
- MATCH (label:DataLabel)
|
|
|
|
|
- WHERE label.name_zh = '数据模型' OR label.name_en = 'data_model'
|
|
|
|
|
- RETURN id(label) as label_id
|
|
|
|
|
- LIMIT 1
|
|
|
|
|
- """
|
|
|
|
|
- label_result = session.run(find_label_query)
|
|
|
|
|
- label_record = label_result.single()
|
|
|
|
|
-
|
|
|
|
|
- if label_record:
|
|
|
|
|
- label_id = label_record['label_id']
|
|
|
|
|
- logger.info(f"找到'数据模型'标签,ID: {label_id}")
|
|
|
|
|
-
|
|
|
|
|
- # 检查BELONGS_TO关系是否已存在
|
|
|
|
|
- rel_check_query = """
|
|
|
|
|
- MATCH (a:BusinessDomain)-[r:BELONGS_TO]->(b:DataLabel)
|
|
|
|
|
- WHERE id(a) = $bd_id AND id(b) = $label_id
|
|
|
|
|
- RETURN count(r) > 0 as exists
|
|
|
|
|
- """
|
|
|
|
|
- rel_check_result = session.run(rel_check_query,
|
|
|
|
|
- bd_id=int(node_id),
|
|
|
|
|
- label_id=label_id).single()
|
|
|
|
|
-
|
|
|
|
|
- if not (rel_check_result and rel_check_result["exists"]):
|
|
|
|
|
- # 创建BELONGS_TO关系
|
|
|
|
|
- create_rel_query = """
|
|
|
|
|
- MATCH (a:BusinessDomain), (b:DataLabel)
|
|
|
|
|
- WHERE id(a) = $bd_id AND id(b) = $label_id
|
|
|
|
|
- CREATE (a)-[r:BELONGS_TO]->(b)
|
|
|
|
|
- RETURN r
|
|
|
|
|
- """
|
|
|
|
|
- session.run(create_rel_query, bd_id=int(node_id), label_id=label_id)
|
|
|
|
|
- logger.info(f"成功创建BusinessDomain与'数据模型'标签的BELONGS_TO关系: bd_id={node_id} -> label_id={label_id}")
|
|
|
|
|
- else:
|
|
|
|
|
- logger.info(f"BusinessDomain与'数据模型'标签的BELONGS_TO关系已存在")
|
|
|
|
|
- else:
|
|
|
|
|
- logger.warning("未找到名称为'数据模型'的DataLabel节点,跳过BELONGS_TO关系创建")
|
|
|
|
|
-
|
|
|
|
|
- logger.info(f"BusinessDomain节点创建完成,ID: {node_id}")
|
|
|
|
|
- return node_id, business_domain_node
|
|
|
|
|
-
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- logger.error(f"创建BusinessDomain节点时发生错误: {str(e)}")
|
|
|
|
|
- import traceback
|
|
|
|
|
- logger.error(f"错误详情: {traceback.format_exc()}")
|
|
|
|
|
- raise
|
|
|