| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511 |
- """
- 数据模型核心业务逻辑模块
- 本模块包含了数据模型相关的所有核心业务逻辑函数,包括:
- - 数据模型的创建、更新、删除
- - 数据模型与数据资源、元数据之间的关系处理
- - 数据模型血缘关系管理
- - 数据模型图谱生成
- - 数据模型层级计算等功能
- """
- import math
- import threading
- from concurrent.futures import ThreadPoolExecutor
- import pandas as pd
- from py2neo import Relationship
- import logging
- import json
- # Configure logger
- logger = logging.getLogger(__name__)
- from app.core.graph.graph_operations import relationship_exists
- from app.core.graph.graph_operations import connect_graph,create_or_get_node,get_node
- from app.services.neo4j_driver import neo4j_driver
- from app.core.meta_data import get_formatted_time, handle_id_unstructured
- from app.core.common import delete_relationships, update_or_create_node, get_node_by_id_no_label
- from app.core.data_resource.resource import get_node_by_id, serialize_node_properties
- # 根据child关系计算数据模型当前的level自动保存
- def calculate_model_level(id):
- """
- 根据child关系计算数据模型当前的level并自动保存
-
- Args:
- id: 数据模型的节点ID(整数)
-
- Returns:
- None
- """
- # 确保id是整数类型
- node_id = int(id) if id is not None else None
-
- cql = """
- MATCH (start_node:DataModel)
- WHERE id(start_node) = $nodeId
- CALL {
- WITH start_node
- OPTIONAL MATCH path = (start_node)-[:child*]->(end_node)
- RETURN length(path) AS level
- }
- WITH coalesce(max(level), 0) AS max_level
- RETURN max_level
- """
-
- with connect_graph().session() as session:
- result = session.run(cql, nodeId=node_id)
- record = result.single()
- data = record["max_level"] if record and "max_level" in record else 0
-
- # 更新level属性
- update_query = """
- MATCH (n:DataModel)
- WHERE id(n) = $nodeId
- SET n.level = $level
- RETURN n
- """
-
- with connect_graph().session() as session:
- session.run(update_query, nodeId=node_id, level=data)
- # 处理数据模型血缘关系
- def handle_model_relation(resource_ids):
- """
- 处理数据模型血缘关系
-
- Args:
- resource_ids: 数据资源ID
-
- Returns:
- 血缘关系数据
- """
- query = """
- MATCH (search:DataResource)-[:connection]->(common_node:meta_node)<-[:connection]-(connect:DataResource)
- WHERE id(search) = $resource_Ids
- WITH search, connect, common_node
- MATCH (search)-[:connection]->(search_node:meta_node)
- WITH search, connect, common_node, collect(DISTINCT id(search_node)) AS search_nodes
- MATCH (connect)-[:connection]->(connect_node:meta_node)
- WITH search, connect, common_node, search_nodes, collect(DISTINCT id(connect_node)) AS connect_nodes
- WITH search, connect, search_nodes, connect_nodes, collect(DISTINCT id(common_node)) AS common_nodes
- // 剔除 search_nodes 和 connect_nodes 中包含在 common_nodes 中的内容
- WITH search, connect, common_nodes,
- [node IN search_nodes WHERE NOT node IN common_nodes] AS filtered_search_nodes,
- [node IN connect_nodes WHERE NOT node IN common_nodes] AS filtered_connect_nodes
- RETURN id(connect) as blood_resources, common_nodes,
- filtered_search_nodes as origin_nodes, filtered_connect_nodes as blood_nodes
- """
- with connect_graph().session() as session:
- result = session.run(query, resource_Ids=resource_ids)
- return result.data()
- # 创建一个数据模型节点
- def handle_data_model(data_model, result_list, result, receiver):
- """
- 创建一个数据模型节点
-
- Args:
- data_model: 数据模型名称
- result_list: 数据模型英文名列表
- result: 序列化的ID列表
- receiver: 接收到的请求参数
-
- Returns:
- tuple: (id, data_model_node)
- """
- try:
- # 添加数据资源 血缘关系的字段 blood_resource
- data_model_en = result_list[0] if result_list and len(result_list) > 0 else ""
- receiver['id_list'] = result
- add_attribute = {
- 'create_time': get_formatted_time(),
- 'name_en': data_model_en
- }
- receiver.update(add_attribute)
- data_model_node = get_node('DataModel', name_zh=data_model) or create_or_get_node('DataModel', **receiver)
- logger.info(f"通过查询或创建节点获得节点ID111,data_model_node: {data_model_node}")
- # 获取节点ID,确保我们能安全地访问节点ID
- node_id = data_model_node
- if hasattr(data_model_node, 'id'):
- logger.info(f"通过节点ID获取节点ID222,data_model_node: {data_model_node}")
- node_id = data_model_node.id
- else:
- logger.info(f"通过查询节点名称获取节点ID333,data_model_node: {data_model_node}")
- # 如果节点没有id属性,尝试通过查询获取
- query = """
- MATCH (n:DataModel {name: $name})
- RETURN id(n) as node_id
- """
- with connect_graph().session() as session:
- result = session.run(query, name=data_model)
- record = result.single()
- logger.info(f"通过查询节点名称获取节点ID444,record: {record}")
- if record and "node_id" in record:
- logger.info(f"通过查询节点名称获取节点ID555,record: {record}")
- node_id = record["node_id"]
-
- # 安全地处理子节点关系
- child_list = receiver.get('childrenId', [])
- for child_id in child_list:
- child_node = get_node_by_id_no_label(child_id)
- if child_node:
- # 直接使用Cypher查询检查关系是否存在
- with connect_graph().session() as session:
- rel_query = """
- MATCH (a)-[r:child]->(b)
- WHERE id(a) = $start_id AND id(b) = $end_id
- RETURN count(r) > 0 as exists
- """
- rel_result = session.run(rel_query,
- start_id=int(node_id),
- end_id=int(child_node.id)).single()
-
- # 如果关系不存在,则创建关系
- if not (rel_result and rel_result["exists"]):
- session.execute_write(
- lambda tx: tx.run(
- "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:child]->(b)",
- a_id=int(node_id), b_id=int(child_node.id)
- )
- )
- # 根据传入参数id,和数据标签建立关系
- if receiver.get('tag'):
- tag = get_node_by_id('DataLabel', receiver['tag'])
- if tag:
- # 直接使用Cypher查询检查关系是否存在
- with connect_graph().session() as session:
- rel_query = """
- MATCH (a)-[r:LABEL]->(b)
- WHERE id(a) = $start_id AND id(b) = $end_id
- RETURN count(r) > 0 as exists
- """
- rel_result = session.run(rel_query,
- start_id=int(node_id),
- end_id=int(tag.id)).single()
-
- # 如果关系不存在,则创建关系
- if not (rel_result and rel_result["exists"]):
- session.execute_write(
- lambda tx: tx.run(
- "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:LABEL]->(b)",
- a_id=int(node_id), b_id=int(tag.id)
- )
- )
- # 处理数据源关系 - 创建COME_FROM关系
- data_source = receiver.get('data_source')
- if data_source:
- try:
- # 获取数据源的标识(支持多种格式)
- data_source_id = None
- data_source_name_en = None
-
- # 1. 如果是数字(节点ID)
- if isinstance(data_source, (int, float)) or (isinstance(data_source, str) and data_source.isdigit()):
- data_source_id = int(data_source)
- logger.info(f"data_source 为节点ID: {data_source_id}")
- # 2. 如果是字典且包含name_en
- elif isinstance(data_source, dict) and data_source.get('name_en'):
- data_source_name_en = data_source['name_en']
- logger.info(f"data_source 为字典,提取name_en: {data_source_name_en}")
- # 3. 如果是字符串(name_en)
- elif isinstance(data_source, str):
- data_source_name_en = data_source
- logger.info(f"data_source 为字符串name_en: {data_source_name_en}")
-
- # 创建数据模型与数据源的关系
- with connect_graph().session() as session:
- if data_source_id is not None:
- # 使用节点ID创建关系
- # 首先检查数据源节点是否存在
- check_ds_cypher = "MATCH (b:DataSource) WHERE id(b) = $ds_id RETURN b"
- check_ds_result = session.run(check_ds_cypher, ds_id=data_source_id)
-
- if not check_ds_result.single():
- logger.warning(f"数据源节点不存在: ID={data_source_id},跳过关系创建")
- else:
- # 检查关系是否已存在
- rel_check_query = """
- MATCH (a:DataModel)-[r:COME_FROM]->(b:DataSource)
- WHERE id(a) = $model_id AND id(b) = $ds_id
- RETURN count(r) > 0 as exists
- """
- rel_check_result = session.run(rel_check_query,
- model_id=int(node_id),
- ds_id=data_source_id).single()
-
- # 如果关系不存在,则创建COME_FROM关系
- if not (rel_check_result and rel_check_result["exists"]):
- create_rel_cypher = """
- MATCH (a:DataModel), (b:DataSource)
- WHERE id(a) = $model_id AND id(b) = $ds_id
- CREATE (a)-[r:COME_FROM]->(b)
- RETURN r
- """
- session.run(create_rel_cypher,
- model_id=int(node_id),
- ds_id=data_source_id)
- logger.info(f"已创建数据模型与数据源的COME_FROM关系: model_id={node_id} -> data_source_id={data_source_id}")
- else:
- logger.info(f"数据模型与数据源的COME_FROM关系已存在: model_id={node_id} -> data_source_id={data_source_id}")
-
- elif data_source_name_en:
- # 使用name_en创建关系(兼容旧方式)
- # 首先检查数据源节点是否存在
- check_ds_cypher = "MATCH (b:DataSource) WHERE b.name_en = $ds_name_en RETURN b"
- check_ds_result = session.run(check_ds_cypher, ds_name_en=data_source_name_en)
-
- if not check_ds_result.single():
- logger.warning(f"数据源节点不存在: name_en={data_source_name_en},跳过关系创建")
- else:
- # 检查关系是否已存在
- rel_check_query = """
- MATCH (a:DataModel)-[r:COME_FROM]->(b:DataSource)
- WHERE id(a) = $model_id AND b.name_en = $ds_name_en
- RETURN count(r) > 0 as exists
- """
- rel_check_result = session.run(rel_check_query,
- model_id=int(node_id),
- ds_name_en=data_source_name_en).single()
-
- # 如果关系不存在,则创建COME_FROM关系
- if not (rel_check_result and rel_check_result["exists"]):
- create_rel_cypher = """
- MATCH (a:DataModel), (b:DataSource)
- WHERE id(a) = $model_id AND b.name_en = $ds_name_en
- CREATE (a)-[r:COME_FROM]->(b)
- RETURN r
- """
- session.run(create_rel_cypher,
- model_id=int(node_id),
- ds_name_en=data_source_name_en)
- logger.info(f"已创建数据模型与数据源的COME_FROM关系: model_id={node_id} -> name_en={data_source_name_en}")
- else:
- logger.info(f"数据模型与数据源的COME_FROM关系已存在: model_id={node_id} -> name_en={data_source_name_en}")
- else:
- logger.warning(f"data_source参数无效,无法识别格式: {data_source}")
-
- except Exception as e:
- # 数据源关系创建失败不应该中断主流程
- logger.error(f"处理数据源关系失败(不中断主流程): {str(e)}")
- # 不再抛出异常,允许主流程继续
- return node_id, data_model_node
- except Exception as e:
- logging.error(f"Error in handle_data_model: {str(e)}")
- raise
- # (从数据资源中选取)
- def resource_handle_meta_data_model(id_lists, data_model_node_id):
- """
- 处理从数据资源中选取的数据模型与元数据的关系
-
- Args:
- id_lists: ID列表
- data_model_node_id: 数据模型节点ID
-
- Returns:
- None
- """
- try:
- logger.info(f"开始处理数据模型与元数据的关系,数据模型ID: {data_model_node_id}")
-
- # 构建meta_id和resouce_id的列表
- resouce_ids = [record['resource_id'] for record in id_lists]
- meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']]
-
- logger.info(f"资源ID列表: {resouce_ids}")
- logger.info(f"元数据ID列表: {meta_ids}")
-
- # 创建与meta_node的关系 组成关系
- if meta_ids:
- logger.info("开始创建数据模型与元数据的关系")
- query = """
- MATCH (source:DataModel), (target:DataMeta)
- WHERE id(source)=$source_id AND id(target) IN $target_ids
- MERGE (source)-[:INCLUDES]->(target)
- RETURN count(*) as count
- """
- with connect_graph().session() as session:
- result = session.run(query, source_id=data_model_node_id, target_ids=meta_ids)
- count = result.single()["count"]
- logger.info(f"成功创建 {count} 个数据模型与元数据的关系")
- # 创建与DataResource的关系 资源关系
- # 不在创建Modle时创建资源关系,将资源关系创建放在数据流程创建时处理
- # 关系名称为DERIVED_FROM
- # commented by mxl 2025-06-27
- #
- # if resouce_ids:
- # logger.info("开始创建数据模型与数据资源的关系")
- # query = """
- # MATCH (source:DataModel), (target:DataResource)
- # WHERE id(source)=$source_id AND id(target) IN $target_ids
- # MERGE (source)-[:DERIVED_FROM]->(target)
- # RETURN count(*) as count
- # """
- # with connect_graph().session() as session:
- # result = session.run(query, source_id=data_model_node_id, target_ids=resouce_ids)
- # count = result.single()["count"]
- # logger.info(f"成功创建 {count} 个数据模型与数据资源的关系")
-
- except Exception as e:
- logger.error(f"处理数据模型与元数据的关系时发生错误: {str(e)}")
- raise
- # (从数据模型中选取)
- def model_handle_meta_data_model(id_lists, data_model_node_id):
- """
- 处理从数据模型中选取的数据模型与元数据的关系
-
- Args:
- id_lists: ID列表
- data_model_node_id: 数据模型节点ID
-
- Returns:
- None
- """
- # 构建meta_id和model_id的列表
- model_ids = [record['model_id'] for record in id_lists]
- meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']]
-
- # 创建与meta_node的关系 组成关系
- if meta_ids:
- query = """
- MATCH (source:DataModel), (target:DataMeta)
- WHERE id(source)=$source_id AND id(target) IN $target_ids
- MERGE (source)-[:component]->(target)
- """
- with neo4j_driver.get_session() as session:
- session.run(query, source_id=data_model_node_id, target_ids=meta_ids)
- # 创建与data_model的关系 模型关系
- if model_ids:
- query = """
- MATCH (source:DataModel), (target:DataModel)
- WHERE id(source)=$source_id AND id(target) IN $target_ids
- MERGE (source)-[:use]->(target)
- """
- with neo4j_driver.get_session() as session:
- session.run(query, source_id=data_model_node_id, target_ids=model_ids)
- # (从DDL中选取)
- def handle_no_meta_data_model(id_lists, receiver, data_model_node):
- """
- 处理从DDL中选取的没有元数据的数据模型
-
- Args:
- id_lists: ID列表
- receiver: 接收到的请求参数
- data_model_node: 数据模型节点
-
- Returns:
- None
- """
- # 构建meta_id和resouce_id的列表
- resouce_ids = [record['resource_id'] for record in id_lists]
- meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']]
-
- # 获取数据模型节点ID
- data_model_node_id = None
- if hasattr(data_model_node, 'id'):
- data_model_node_id = data_model_node.id
- else:
- # 如果节点没有id属性,尝试通过查询获取
- query = """
- MATCH (n:DataModel {name_zh: $name_zh})
- RETURN id(n) as node_id
- """
- with connect_graph().session() as session:
- result = session.run(query, name_zh=data_model_node.get('name_zh'))
- record = result.single()
- if record:
- data_model_node_id = record["node_id"]
-
- if not data_model_node_id:
- return
-
- # 创建与DataResource的关系 资源关系
- if resouce_ids:
- query = """
- MATCH (source:DataModel), (target:DataResource)
- WHERE id(source)=$source_id AND id(target) IN $target_ids
- MERGE (source)-[:resource]->(target)
- """
- with connect_graph().session() as session:
- session.run(query, source_id=data_model_node_id, target_ids=resouce_ids)
- if meta_ids:
- meta_node_list = []
- for id in meta_ids:
- query = """
- MATCH (n)
- WHERE id(n) = $node_id
- RETURN n
- """
- with connect_graph().session() as session:
- result = session.run(query, node_id=id)
- if result:
- record = result.data()
- if record:
- meta_node_list.append(record[0]['n'])
-
- # 提取接收到的数据并创建meta_node节点
- meta_node = None
- resource_ids = []
-
- for item in id_lists:
- resource_id = item['resource_id']
- resource_ids.append(resource_id)
-
- for meta_item in item['metaData']:
- meta_id = meta_item['id']
- data_standard = meta_item.get('data_standard', '')
- name_en = meta_item.get('name_en', '')
- name_zh = meta_item.get('name_zh', '')
-
- # 使用传递的参数创建meta_node节点
- meta_params = {
- 'name_zh': name_zh,
- 'name_en': name_en,
- 'standard': data_standard,
- 'create_time': get_formatted_time()
- }
-
- # 创建meta_node节点
- meta_node = create_or_get_node('DataMeta', **meta_params)
-
- # 获取数据模型节点ID
- dm_id = data_model_node_id if data_model_node_id is not None else data_model_node
-
- if meta_node:
- # 直接使用Cypher查询检查关系是否存在
- with connect_graph().session() as session:
- rel_query = """
- MATCH (a)-[r:component]->(b)
- WHERE id(a) = $start_id AND id(b) = $end_id
- RETURN count(r) > 0 as exists
- """
- rel_result = session.run(rel_query,
- start_id=int(dm_id),
- end_id=int(meta_node)).single()
-
- # 如果关系不存在,则创建关系
- if not (rel_result and rel_result["exists"]):
- session.execute_write(
- lambda tx: tx.run(
- "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:component]->(b)",
- a_id=int(dm_id), b_id=int(meta_node)
- )
- )
- # 数据模型-详情接口
- def handle_id_model(id):
- """
- 获取数据模型详情
-
- Args:
- id: 数据模型的节点ID
-
- Returns:
- dict: 包含数据模型详情的字典,格式为:
- {"data_model": {
- "resource_selected": [...],
- "leader": ...,
- "origin": ...,
- "frequency": ...,
- "childrenId": [...],
- "organization": ...,
- "name_zh": ...,
- "name_en": ...,
- "data_sensitivity": ...,
- "describe": ...,
- "tag": ...,
- "create_time": ...,
- "category": ...,
- "status": ...
- }}
- """
- node_id = id
- cql = """
- MATCH (n:DataModel) WHERE id(n) = $nodeId
- OPTIONAL MATCH (n)-[:INCLUDES]->(meta:DataMeta)
- OPTIONAL MATCH (n)-[:DERIVED_FROM]->(resource:DataResource)
- OPTIONAL MATCH (n)-[:LABEL]->(tag:DataLabel)
- OPTIONAL MATCH (uses:model_use)-[:use]->(n)
- OPTIONAL MATCH (n)-[:has_component]->(component)
- WITH n,
- collect(DISTINCT meta) as meta_nodes,
- collect(DISTINCT resource) as resources,
- collect(DISTINCT component) as components,
- collect(DISTINCT uses) as uses,
- collect(DISTINCT tag) as tags,
- CASE WHEN n.childrenId IS NOT NULL THEN n.childrenId ELSE [] END as children
- RETURN {
- // 基本信息
- id: id(n),
- name_zh: n.name_zh,
- name_en: n.name_en,
- create_time: n.create_time,
- describe: n.describe,
- category: n.category,
- level: n.level,
- tag: CASE WHEN size(tags) > 0 AND tags[0] IS NOT NULL THEN {id: id(tags[0]), name: tags[0].name} ELSE null END,
- // 添加其他必需字段
- leader: n.leader,
- origin: n.origin,
- blood_resource: n.blood_resource,
- frequency: n.frequency,
- organization: n.organization,
- data_sensitivity: n.data_sensitivity,
- status: n.status,
- // 子节点列表
- childrenId: children
- } AS result,
- // 资源列表
- [{
- data_resource: [resource IN resources WHERE resource IS NOT NULL | {
- id: id(resource),
- name_zh: resource.name_zh,
- name_en: resource.name_en,
- description: resource.description
- }],
- resource_id: [resource IN resources WHERE resource IS NOT NULL | id(resource)],
- meta_ids: [meta IN meta_nodes WHERE meta IS NOT NULL | {
- id: id(meta),
- name_zh: meta.name_zh,
- name_en: meta.name_en,
- data_type: meta.data_type
- }]
- }] AS resource_selected
- """
-
- with connect_graph().session() as session:
- result = session.run(cql, nodeId=node_id)
-
- # 处理查询结果
- record = result.single()
- logging.info(f"获得查询结果---------->>>{record}")
-
- if record:
- # 获取基本属性和资源选择列表
- properties = record["result"]
- resource_selected = record["resource_selected"]
-
- # 确保所有必需字段都有默认值,避免空值
- required_fields = ['tag', 'leader', 'origin', 'blood_resource',
- 'frequency', 'describe', 'organization', 'name_zh', 'name_en',
- 'data_sensitivity', 'create_time', 'category', 'status', 'childrenId']
-
- for field in required_fields:
- if field not in properties or properties[field] is None:
- if field == 'tag':
- properties[field] = {}
- elif field == 'childrenId':
- properties[field] = []
- else:
- properties[field] = ""
-
- # 构建最终返回格式
- final_data = {
- "resource_selected": resource_selected,
- **properties
- }
-
- return {"data_model": final_data}
- else:
- # 如果没有查询到结果,返回空的结构
- return {"data_model": {
- "resource_selected": [{"meta_ids": [], "data_resource": None, "resource_id": None}],
- "leader": None, "origin": None, "frequency": None, "childrenId": [],
- "organization": None, "name_zh": None, "name_en": None, "data_sensitivity": None,
- "describe": None, "tag": {}, "create_time": None, "category": None, "status": None
- }}
- # 数据模型列表
- def model_list(skip_count, page_size, name_en_filter=None, name_zh_filter=None,
- category=None, tag=None, level=None):
- """
- 获取数据模型列表
-
- Args:
- skip_count: 跳过的数量
- page_size: 页面大小
- name_en_filter: 英文名称过滤条件
- name_zh_filter: 名称过滤条件
- category: 类别过滤条件
- tag: 标签过滤条件
- level: 层级过滤条件
-
- Returns:
- tuple: (数据模型列表, 总数量)
- """
- try:
- # 构建where子句 - 只针对DataModel节点的过滤条件
- datamodel_where_clause = []
- params = {}
- if name_zh_filter is not None:
- datamodel_where_clause.append("n.name_zh =~ $name_zh")
- params['name_zh'] = f".*{name_zh_filter}.*"
-
- if name_en_filter is not None:
- datamodel_where_clause.append("n.name_en =~ $name_en")
- params['name_en'] = f".*{name_en_filter}.*"
- if category is not None:
- datamodel_where_clause.append("n.category = $category")
- params['category'] = category
-
- if level is not None:
- datamodel_where_clause.append("n.level = $level")
- params['level'] = level
- # 处理标签查询
- if tag is not None:
- # 确保tag参数是整数类型
- try:
- tag_id = int(tag)
- params['tag'] = tag_id
- except (ValueError, TypeError):
- logger.warning(f"Invalid tag parameter: {tag}, expected integer")
- return [], 0
-
- # 有标签查询条件时,需要确保标签关系存在
- match_clause = "MATCH (n:DataModel)-[:LABEL]->(t)"
- datamodel_where_clause.append("id(t) = $tag")
- else:
- # 没有标签查询条件时,先匹配DataModel,然后可选连接标签
- match_clause = "MATCH (n:DataModel)"
- # 构建DataModel节点的WHERE子句
- datamodel_where_str = " AND ".join(datamodel_where_clause)
- if datamodel_where_str:
- datamodel_where_str = f"WHERE {datamodel_where_str}"
-
- # 构建查询
- with connect_graph().session() as session:
- # 计算总数量
- if tag is not None:
- # 有标签查询时,直接使用标签连接
- count_query = f"""
- {match_clause}
- {datamodel_where_str}
- RETURN COUNT(DISTINCT n) AS count
- """
- else:
- # 无标签查询时,只计算DataModel节点
- count_query = f"""
- MATCH (n:DataModel)
- {datamodel_where_str}
- RETURN COUNT(n) AS count
- """
-
- logger.debug(f"Count query: {count_query}")
- logger.debug(f"Query parameters: {params}")
-
- count_result = session.run(count_query, **params)
- count_record = count_result.single()
- total = count_record['count'] if count_record else 0
- # 查询数据 - 修复OPTIONAL MATCH的笛卡尔积问题
- if tag is not None:
- # 有标签查询时,直接使用标签连接
- query = f"""
- {match_clause}
- {datamodel_where_str}
- RETURN DISTINCT
- id(n) as id,
- n.name_zh as name_zh,
- n.name_en as name_en,
- n.create_time as create_time,
- n.describe as describe,
- n.level as level,
- n.category as category,
- n.status as status,
- n.leader as leader,
- n.origin as origin,
- n.blood_resource as blood_resource,
- n.organization as organization,
- id(t) as tag_id,
- t.name_zh as tag_name
- ORDER BY time DESC
- SKIP $skip
- LIMIT $limit
- """
- else:
- # 无标签查询时,先过滤DataModel节点,然后可选连接标签
- query = f"""
- MATCH (n:DataModel)
- {datamodel_where_str}
- WITH n
- OPTIONAL MATCH (n)-[:LABEL]->(t)
- RETURN
- id(n) as id,
- n.name_zh as name_zh,
- n.name_en as name_en,
- n.create_time as create_time,
- n.describe as describe,
- n.level as level,
- n.category as category,
- n.status as status,
- n.leader as leader,
- n.origin as origin,
- n.blood_resource as blood_resource,
- n.organization as organization,
- id(t) as tag_id,
- t.name_zh as tag_name
- ORDER BY n.create_time DESC
- SKIP $skip
- LIMIT $limit
- """
-
- logger.debug(f"Main query: {query}")
-
- result = session.run(query, skip=skip_count, limit=page_size, **params)
-
- # 处理结果
- data = []
- for record in result:
- item = {
- "id": record['id'],
- "name_zh": record['name_zh'],
- "name_en": record['name_en'],
- "create_time": record['create_time'],
- "describe": record['describe'],
- "category": record['category'],
- "status": record['status'],
- "leader": record['leader'],
- "origin": record['origin'],
- "blood_resource": record['blood_resource'],
- "organization": record['organization'],
- "level": record['level'],
- "tag": {"id": record['tag_id'], "name_zh": record['tag_name']} if record['tag_id'] is not None else None
- }
- data.append(item)
-
- logger.info(f"Query returned {len(data)} items out of {total} total")
- return data, total
-
- except Exception as e:
- logger.error(f"Error in model_list: {str(e)}")
- import traceback
- traceback.print_exc()
- return [], 0
- # 有血缘关系的数据资源列表
- def model_resource_list(skip_count, page_size, name_zh_filter=None, id=None,
- category=None, create_time=None):
- """
- 获取数据模型相关的数据资源列表
-
- Args:
- skip_count: 跳过的数量
- page_size: 页面大小
- name_zh_filter: 名称过滤条件
- id: 数据模型ID
- category: 类别过滤条件
- create_time: 时间过滤条件
-
- Returns:
- tuple: (数据资源列表, 总数量)
- """
- try:
- # 构建基础查询
- base_query = """
- MATCH (n:DataModel)
- WHERE id(n) = $nodeId
- MATCH (n)-[:children]->(m:DataResource)
- """
-
- # 计算总数量
- count_query = base_query + """
- RETURN COUNT(m) as count
- """
-
- with connect_graph().session() as session:
- # 执行计数查询
- count_result = session.run(count_query, nodeId=id)
- count_record = count_result.single()
- total = count_record['count'] if count_record else 0
-
- # 使用分页和筛选条件构建主查询
- main_query = base_query + """
- MATCH (m)-[:LABEL]->(l)
- WHERE id(n) = $nodeId and labels(m) <> ['DataMeta']
- RETURN m.name_zh as name_zh,
- m.name_en as name_en,
- id(m) as id,
- l.name_zh as label,
- m.create_time as create_time,
- m.description as description,
- m.category as category
- ORDER BY m.create_time DESC
- SKIP $skip LIMIT $limit
- """
-
- # 执行主查询
- result = session.run(main_query, nodeId=id, skip=skip_count, limit=page_size)
-
- # 处理结果
- data = []
- for record in result:
- item = {
- "name_zh": record['name_zh'],
- "name_en": record['name_en'],
- "id": record['id'],
- "label": record['label'],
- "create_time": record['create_time'],
- "description": record['description'],
- "category": record['category']
- }
- data.append(item)
-
- return data, total
- except Exception as e:
- print(f"Error in model_resource_list: {str(e)}")
- import traceback
- traceback.print_exc()
- return [], 0
- # 数据模型血缘图谱
- def model_kinship_graph(nodeid, meta=False):
- """
- 生成数据模型的血缘关系图谱
- 按照DERIVED_FROM关系进行递归查找,从当前节点作为起点查找所有DERIVED_FROM关系指向的节点
-
- Args:
- nodeid: 节点ID
- meta: 是否包含元数据
-
- Returns:
- dict: 包含节点和连线信息的图谱数据
- """
- try:
- with connect_graph().session() as session:
- # 确保nodeid为整数
- try:
- nodeid_int = int(nodeid)
- except (ValueError, TypeError):
- logger.error(f"节点ID不是有效的整数: {nodeid}")
- return {"nodes": [], "lines": []}
-
- # 查询起始模型节点是否存在
- start_node_query = """
- MATCH (n:DataModel)
- WHERE id(n) = $nodeId
- RETURN n
- """
-
- start_result = session.run(start_node_query, nodeId=nodeid_int)
- start_record = start_result.single()
-
- if not start_record:
- logger.error(f"未找到ID为{nodeid_int}的DataModel节点")
- return {"nodes": [], "lines": []}
-
- # 递归查找DERIVED_FROM关系
- cypher = """
- MATCH (start:DataModel)
- WHERE id(start) = $nodeId
- MATCH path = (start)-[:DERIVED_FROM*0..]->(target)
- WHERE target:DataResource OR target:DataModel
- RETURN path
- """
-
- result = session.run(cypher, nodeId=nodeid_int)
-
- # 收集节点和关系
- nodes = {}
- lines = {}
-
- for record in result:
- # 处理路径
- path = record['path']
- logger.debug(f"处理路径,长度: {len(path)}, 节点数: {len(path.nodes)}, 关系数: {len(path.relationships)}")
-
- # 处理路径中的所有节点
- for node in path.nodes:
- node_id = int(node.id) # 直接转换为整数
- if node_id not in nodes:
- node_dict = serialize_node_properties(node)
- node_dict["id"] = str(node_id)
- node_dict["node_type"] = list(node.labels)[0] if node.labels else ""
- nodes[node_id] = node_dict
- logger.debug(f"添加节点: ID={node_id}, 标签={list(node.labels)}")
-
- # 处理路径中的所有关系
- for rel in path.relationships:
- rel_id = int(rel.id) # 直接转换为整数
- if rel_id not in lines:
- rel_dict = {
- "id": str(rel_id),
- "from": str(int(rel.start_node.id)),
- "to": str(int(rel.end_node.id)),
- "text": rel.type
- }
- lines[rel_id] = rel_dict
- logger.debug(f"添加关系: ID={rel_id}, 类型={rel.type}, 从{int(rel.start_node.id)}到{int(rel.end_node.id)}")
-
- # 如果需要元数据,查询INCLUDES关系
- if meta:
- meta_cypher = """
- MATCH (start:DataModel)-[r:INCLUDES]->(meta:DataMeta)
- WHERE id(start) = $nodeId
- RETURN start, r, meta
- """
- meta_result = session.run(meta_cypher, nodeId=nodeid_int)
-
- for meta_record in meta_result:
- start_node = meta_record['start']
- rel = meta_record['r']
- meta_node = meta_record['meta']
-
- # 添加元数据节点
- meta_node_id = int(meta_node.id)
- if meta_node_id not in nodes:
- node_dict = serialize_node_properties(meta_node)
- node_dict["id"] = str(meta_node_id)
- node_dict["node_type"] = list(meta_node.labels)[0] if meta_node.labels else ""
- nodes[meta_node_id] = node_dict
-
- # 添加INCLUDES关系
- rel_id = int(rel.id)
- if rel_id not in lines:
- rel_dict = {
- "id": str(rel_id),
- "from": str(nodeid_int),
- "to": str(meta_node_id),
- "text": rel.type
- }
- lines[rel_id] = rel_dict
-
- logger.info(f"成功获取血缘关系图谱,ID: {nodeid_int}, 节点数: {len(nodes)}, 关系数: {len(lines)}")
- return {
- "nodes": list(nodes.values()),
- "lines": list(lines.values())
- }
-
- except Exception as e:
- logger.error(f"获取数据模型血缘关系图谱失败: {str(e)}")
- import traceback
- logger.error(f"错误详情: {traceback.format_exc()}")
- return {"nodes": [], "lines": []}
- # 数据模型影响图谱
- def model_impact_graph(nodeid, meta=False):
- """
- 生成数据模型的影响关系图谱
- 按照DERIVED_FROM关系进行递归查找,从当前节点作为终点查找所有指向这个终点的节点
-
- Args:
- nodeid: 节点ID
- meta: 是否包含元数据
-
- Returns:
- dict: 包含节点和连线信息的图谱数据
- """
- try:
- with connect_graph().session() as session:
- # 确保nodeid为整数
- try:
- nodeid_int = int(nodeid)
- except (ValueError, TypeError):
- logger.error(f"节点ID不是有效的整数: {nodeid}")
- return {"nodes": [], "lines": []}
-
- # 查询起始模型节点是否存在
- start_node_query = """
- MATCH (n:DataModel)
- WHERE id(n) = $nodeId
- RETURN n
- """
-
- start_result = session.run(start_node_query, nodeId=nodeid_int)
- start_record = start_result.single()
-
- if not start_record:
- logger.error(f"未找到ID为{nodeid_int}的DataModel节点")
- return {"nodes": [], "lines": []}
-
- # 递归查找指向当前节点的DERIVED_FROM关系
- cypher = """
- MATCH (target:DataModel)
- WHERE id(target) = $nodeId
- MATCH path = (source)-[:DERIVED_FROM*0..]->(target)
- WHERE source:DataResource OR source:DataModel
- RETURN path
- """
-
- result = session.run(cypher, nodeId=nodeid_int)
-
- # 收集节点和关系
- nodes = {}
- lines = {}
-
- for record in result:
- # 处理路径
- path = record['path']
- logger.debug(f"处理影响路径,长度: {len(path)}, 节点数: {len(path.nodes)}, 关系数: {len(path.relationships)}")
-
- # 处理路径中的所有节点
- for node in path.nodes:
- node_id = int(node.id) # 直接转换为整数
- if node_id not in nodes:
- node_dict = serialize_node_properties(node)
- node_dict["id"] = str(node_id)
- node_dict["node_type"] = list(node.labels)[0] if node.labels else ""
- nodes[node_id] = node_dict
- logger.debug(f"添加影响节点: ID={node_id}, 标签={list(node.labels)}")
-
- # 处理路径中的所有关系
- for rel in path.relationships:
- rel_id = int(rel.id) # 直接转换为整数
- if rel_id not in lines:
- rel_dict = {
- "id": str(rel_id),
- "from": str(int(rel.start_node.id)),
- "to": str(int(rel.end_node.id)),
- "text": rel.type
- }
- lines[rel_id] = rel_dict
- logger.debug(f"添加影响关系: ID={rel_id}, 类型={rel.type}, 从{int(rel.start_node.id)}到{int(rel.end_node.id)}")
-
- # 如果需要元数据,查询INCLUDES关系
- if meta:
- meta_cypher = """
- MATCH (target:DataModel)-[r:INCLUDES]->(meta:DataMeta)
- WHERE id(target) = $nodeId
- RETURN target, r, meta
- """
- meta_result = session.run(meta_cypher, nodeId=nodeid_int)
-
- for meta_record in meta_result:
- target_node = meta_record['target']
- rel = meta_record['r']
- meta_node = meta_record['meta']
-
- # 添加元数据节点
- meta_node_id = int(meta_node.id)
- if meta_node_id not in nodes:
- node_dict = serialize_node_properties(meta_node)
- node_dict["id"] = str(meta_node_id)
- node_dict["node_type"] = list(meta_node.labels)[0] if meta_node.labels else ""
- nodes[meta_node_id] = node_dict
-
- # 添加INCLUDES关系
- rel_id = int(rel.id)
- if rel_id not in lines:
- rel_dict = {
- "id": str(rel_id),
- "from": str(nodeid_int),
- "to": str(meta_node_id),
- "text": rel.type
- }
- lines[rel_id] = rel_dict
-
- logger.info(f"成功获取影响关系图谱,ID: {nodeid_int}, 节点数: {len(nodes)}, 关系数: {len(lines)}")
- return {
- "nodes": list(nodes.values()),
- "lines": list(lines.values())
- }
-
- except Exception as e:
- logger.error(f"获取数据模型影响关系图谱失败: {str(e)}")
- import traceback
- logger.error(f"错误详情: {traceback.format_exc()}")
- return {"nodes": [], "lines": []}
- # 数据模型全部图谱
- def model_all_graph(nodeid, meta=False):
- """
- 生成数据模型的所有关系图谱
- 分别调用model_impact_graph查找影响关系,调用model_kinship_graph查找血缘关系,
- 然后合并两部分数据返回
-
- Args:
- nodeid: 节点ID
- meta: 是否包含元数据
-
- Returns:
- dict: 包含节点和连线信息的图谱数据
- """
- try:
- # 获取血缘关系图谱
- kinship_data = model_kinship_graph(nodeid, meta)
-
- # 获取影响关系图谱
- impact_data = model_impact_graph(nodeid, meta)
-
- # 合并节点数据,使用字典去重
- merged_nodes = {}
- merged_lines = {}
-
- # 添加血缘关系的节点和连线
- if kinship_data and 'nodes' in kinship_data:
- for node in kinship_data['nodes']:
- node_id = node.get('id')
- if node_id:
- merged_nodes[node_id] = node
-
- if kinship_data and 'lines' in kinship_data:
- for line in kinship_data['lines']:
- line_id = line.get('id')
- if line_id:
- merged_lines[line_id] = line
-
- # 添加影响关系的节点和连线
- if impact_data and 'nodes' in impact_data:
- for node in impact_data['nodes']:
- node_id = node.get('id')
- if node_id:
- merged_nodes[node_id] = node
-
- if impact_data and 'lines' in impact_data:
- for line in impact_data['lines']:
- line_id = line.get('id')
- if line_id:
- merged_lines[line_id] = line
-
- # 构建最终结果
- result = {
- "nodes": list(merged_nodes.values()),
- "lines": list(merged_lines.values())
- }
-
- logger.info(f"成功获取完整关系图谱,ID: {nodeid}, 节点数: {len(merged_nodes)}, 关系数: {len(merged_lines)}")
- return result
-
- except Exception as e:
- logger.error(f"获取数据模型完整关系图谱失败: {str(e)}")
- return {"nodes": [], "lines": []}
- # 更新数据模型
- def data_model_edit(receiver):
- """
- 更新数据模型
-
- Args:
- receiver: 接收到的请求参数
-
- Returns:
- 更新结果
- """
- id = receiver.get('id')
- name = receiver.get('name_zh')
- name_en = receiver.get('name_en')
- category = receiver.get('category')
- describe = receiver.get('describe')
- tag = receiver.get('tag')
- frequency = receiver.get('frequency')
- leader = receiver.get('leader')
- organization = receiver.get('organization')
- status = bool(receiver.get('status')) if receiver.get('status') is not None else None
- meta_data = receiver.get('metaData', [])
-
- # 更新数据模型节点 - 添加新的字段
- query = """
- MATCH (n:DataModel) WHERE id(n) = $id
- SET n.name_zh = $name_zh,
- n.name_en = $name_en,
- n.category = $category,
- n.describe = $describe,
- n.frequency = $frequency,
- n.leader = $leader,
- n.organization = $organization,
- n.status = $status,
- n.create_time = $create_time
- RETURN n
- """
-
- create_time = get_formatted_time()
- with connect_graph().session() as session:
- result = session.run(query,
- id=id,
- name_zh=name,
- name_en=name_en,
- category=category,
- describe=describe,
- frequency=frequency,
- leader=leader,
- organization=organization,
- status=status,
- create_time=create_time).data()
-
- # 处理标签关系
- if tag:
- # 先删除所有标签关系
- delete_query = """
- MATCH (n:DataModel)-[r:LABEL]->() WHERE id(n) = $id
- DELETE r
- """
- with connect_graph().session() as session:
- session.run(delete_query, id=id)
-
- # 再创建新的标签关系
- tag_node = get_node_by_id('DataLabel', tag)
- if tag_node:
- model_node = get_node_by_id_no_label(id)
- if model_node:
- # 获取节点ID
- model_id = model_node.id if hasattr(model_node, 'id') else model_node
- tag_id = tag_node.id if hasattr(tag_node, 'id') else tag_node
-
- # 直接使用Cypher查询检查关系是否存在
- with connect_graph().session() as session:
- rel_query = """
- MATCH (a)-[r:LABEL]->(b)
- WHERE id(a) = $start_id AND id(b) = $end_id
- RETURN count(r) > 0 as exists
- """
- rel_result = session.run(rel_query,
- start_id=int(model_id),
- end_id=int(tag_id)).single()
-
- # 如果关系不存在,则创建关系
- if not (rel_result and rel_result["exists"]):
- session.execute_write(
- lambda tx: tx.run(
- "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:LABEL]->(b)",
- a_id=int(model_id), b_id=int(tag_id)
- )
- )
-
- # 处理DataMeta节点关系更新
- with connect_graph().session() as session:
- # 先删除DataModel关联的所有DataMeta关系
- delete_meta_query = """
- MATCH (n:DataModel)-[r:INCLUDES]->(m:DataMeta)
- WHERE id(n) = $id
- DELETE r
- """
- session.run(delete_meta_query, id=id)
- logger.info(f"已删除DataModel({id})的所有DataMeta关系")
-
- # 根据上传的metaData数据是否有值来决定是否重新构建INCLUDES关系
- if meta_data:
- # 根据上传的metaData数据重新构建INCLUDES关系
- for meta_item in meta_data:
- meta_id = meta_item.get('id')
- if meta_id:
- try:
- meta_id = int(meta_id)
- # 验证DataMeta节点是否存在
- check_meta_query = """
- MATCH (m:DataMeta)
- WHERE id(m) = $meta_id
- RETURN m
- """
- meta_result = session.run(check_meta_query, meta_id=meta_id)
-
- if meta_result.single():
- # 创建INCLUDES关系
- create_includes_query = """
- MATCH (n:DataModel), (m:DataMeta)
- WHERE id(n) = $model_id AND id(m) = $meta_id
- CREATE (n)-[:INCLUDES]->(m)
- RETURN n, m
- """
- session.run(create_includes_query, model_id=id, meta_id=meta_id)
- logger.info(f"成功创建INCLUDES关系: DataModel({id}) -> DataMeta({meta_id})")
- else:
- logger.warning(f"DataMeta节点不存在,ID: {meta_id}")
- except (ValueError, TypeError) as e:
- logger.error(f"无效的meta_id: {meta_id}, 错误: {str(e)}")
- else:
- logger.info(f"meta_data为空,不需要重新创建INCLUDES关系,DataModel({id})将不关联任何DataMeta节点")
-
- return {"message": "数据模型更新成功"}
- def model_community(tag=None):
- """
- 查询DataModel的所有节点及DERIVED_FROM关系
-
- Args:
- tag: 可选的标签ID,如果指定则只查找有该标签的DataModel节点
-
- Returns:
- dict: 包含节点和连线信息的图谱数据,格式与model_kinship_graph相同
- """
- try:
- with connect_graph().session() as session:
- # 构建查询条件
- if tag is not None:
- # 确保tag参数是整数类型
- try:
- tag_id = int(tag)
- except (ValueError, TypeError):
- logger.warning(f"Invalid tag parameter: {tag}, expected integer")
- return {"nodes": [], "lines": []}
-
- # 有标签查询条件时,查询有指定标签的DataModel节点及其DERIVED_FROM关系
- cypher = """
- MATCH (dm:DataModel)-[:LABEL]->(t)
- WHERE id(t) = $tag_id
- WITH dm
- MATCH path = (dm)-[:DERIVED_FROM*0..]->(target:DataModel)
- RETURN path
- UNION
- MATCH (dm:DataModel)-[:LABEL]->(t)
- WHERE id(t) = $tag_id
- WITH dm
- MATCH path = (source:DataModel)-[:DERIVED_FROM*0..]->(dm)
- RETURN path
- """
-
- result = session.run(cypher, tag_id=tag_id)
- else:
- # 没有标签查询条件时,查询所有DataModel节点及其DERIVED_FROM关系
- cypher = """
- MATCH (dm:DataModel)
- WITH dm
- MATCH path = (dm)-[:DERIVED_FROM*0..]->(target:DataModel)
- RETURN path
- UNION
- MATCH (dm:DataModel)
- WITH dm
- MATCH path = (source:DataModel)-[:DERIVED_FROM*0..]->(dm)
- RETURN path
- """
-
- result = session.run(cypher)
-
- # 收集节点和关系
- nodes = {}
- lines = {}
-
- for record in result:
- # 处理路径
- path = record['path']
- logger.debug(f"处理社区路径,长度: {len(path)}, 节点数: {len(path.nodes)}, 关系数: {len(path.relationships)}")
-
- # 处理路径中的所有节点
- for node in path.nodes:
- node_id = int(node.id) # 直接转换为整数
- if node_id not in nodes:
- node_dict = serialize_node_properties(node)
- node_dict["id"] = str(node_id)
- node_dict["node_type"] = list(node.labels)[0] if node.labels else ""
- nodes[node_id] = node_dict
- logger.debug(f"添加社区节点: ID={node_id}, 标签={list(node.labels)}")
-
- # 处理路径中的所有关系
- for rel in path.relationships:
- rel_id = int(rel.id) # 直接转换为整数
- if rel_id not in lines:
- rel_dict = {
- "id": str(rel_id),
- "from": str(int(rel.start_node.id)),
- "to": str(int(rel.end_node.id)),
- "text": rel.type
- }
- lines[rel_id] = rel_dict
- logger.debug(f"添加社区关系: ID={rel_id}, 类型={rel.type}, 从{int(rel.start_node.id)}到{int(rel.end_node.id)}")
-
- logger.info(f"成功获取数据模型社区图谱,标签ID: {tag}, 节点数: {len(nodes)}, 关系数: {len(lines)}")
- return {
- "nodes": list(nodes.values()),
- "lines": list(lines.values())
- }
-
- except Exception as e:
- logger.error(f"获取数据模型社区图谱失败: {str(e)}")
- import traceback
- logger.error(f"错误详情: {traceback.format_exc()}")
- return {"nodes": [], "lines": []}
- def model_search_list(model_id, page, page_size, name_en_filter=None,
- name_zh_filter=None, category_filter=None, tag_filter=None):
- """获取特定数据模型关联的元数据列表"""
- try:
- with connect_graph().session() as session:
- # 确保model_id为整数
- try:
- model_id_int = int(model_id)
- except (ValueError, TypeError):
- logger.error(f"模型ID不是有效的整数: {model_id}")
- return [], 0
-
- # 基本匹配语句 - 支持DataMeta和Metadata标签
- match_clause = """
- MATCH (n:DataModel)-[:INCLUDES]->(m)
- WHERE id(n) = $model_id
- AND (m:DataMeta OR m:Metadata)
- """
-
- where_conditions = []
-
- if name_en_filter:
- where_conditions.append(f"m.name_en CONTAINS '{name_en_filter}'")
-
- if name_zh_filter:
- where_conditions.append(f"m.name_zh CONTAINS '{name_zh_filter}'")
-
- if category_filter:
- where_conditions.append(f"m.category = '{category_filter}'")
-
- # 标签过滤需要额外的匹配
- tag_match = ""
- if tag_filter:
- tag_match = "MATCH (m)-[:HAS_TAG]->(t:Tag) WHERE t.name_zh = $tag_filter"
-
- where_clause = " AND " + " AND ".join(where_conditions) if where_conditions else ""
-
- # 计算总数
- count_cypher = f"""
- {match_clause}{where_clause}
- {tag_match}
- RETURN count(m) as count
- """
- count_params = {"model_id": model_id_int}
- if tag_filter:
- count_params["tag_filter"] = tag_filter
-
- count_result = session.run(count_cypher, **count_params)
- total_count = count_result.single()["count"]
-
- # 分页查询
- skip = (page - 1) * page_size
- cypher = f"""
- {match_clause}{where_clause}
- {tag_match}
- RETURN m
- ORDER BY m.name_zh
- SKIP {skip} LIMIT {page_size}
- """
-
- result = session.run(cypher, **count_params)
-
- # 格式化结果
- metadata_list = []
- for record in result:
- meta = serialize_node_properties(record["m"])
- meta["id"] = record["m"].id
- metadata_list.append(meta)
-
- logger.info(f"成功获取数据模型关联元数据,ID: {model_id_int}, 元数据数量: {total_count}")
- return metadata_list, total_count
- except Exception as e:
- logger.error(f"获取数据模型关联的元数据列表失败: {str(e)}")
- return [], 0
|