1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117 |
- """
- 数据模型核心业务逻辑模块
- 本模块包含了数据模型相关的所有核心业务逻辑函数,包括:
- - 数据模型的创建、更新、删除
- - 数据模型与数据资源、元数据之间的关系处理
- - 数据模型血缘关系管理
- - 数据模型图谱生成
- - 数据模型层级计算等功能
- """
- import math
- import threading
- from concurrent.futures import ThreadPoolExecutor
- import pandas as pd
- from py2neo import Relationship
- import logging
- import json
- # Configure logger
- logger = logging.getLogger(__name__)
- from app.core.graph.graph_operations import relationship_exists
- from app.core.graph.graph_operations import connect_graph,create_or_get_node,get_node
- from app.services.neo4j_driver import neo4j_driver
- from app.core.meta_data import get_formatted_time, handle_id_unstructured
- from app.core.common import delete_relationships, update_or_create_node, get_node_by_id_no_label
- from app.core.data_resource.resource import get_node_by_id
- # 根据child关系计算数据模型当前的level自动保存
- def calculate_model_level(id):
- """
- 根据child关系计算数据模型当前的level并自动保存
-
- Args:
- id: 数据模型的节点ID(整数)
-
- Returns:
- None
- """
- # 确保id是整数类型
- node_id = int(id) if id is not None else None
-
- cql = """
- MATCH (start_node:data_model)
- WHERE id(start_node) = $nodeId
- CALL {
- WITH start_node
- OPTIONAL MATCH path = (start_node)-[:child*]->(end_node)
- RETURN length(path) AS level
- }
- WITH coalesce(max(level), 0) AS max_level
- RETURN max_level
- """
-
- with connect_graph().session() as session:
- result = session.run(cql, nodeId=node_id)
- record = result.single()
- data = record["max_level"] if record and "max_level" in record else 0
-
- # 更新level属性
- update_query = """
- MATCH (n:data_model)
- WHERE id(n) = $nodeId
- SET n.level = $level
- RETURN n
- """
-
- with connect_graph().session() as session:
- session.run(update_query, nodeId=node_id, level=data)
- # 处理数据模型血缘关系
- def handle_model_relation(resource_ids):
- """
- 处理数据模型血缘关系
-
- Args:
- resource_ids: 数据资源ID
-
- Returns:
- 血缘关系数据
- """
- query = """
- MATCH (search:data_resource)-[:connection]->(common_node:meta_node)<-[:connection]-(connect:data_resource)
- WHERE id(search) = $resource_Ids
- WITH search, connect, common_node
- MATCH (search)-[:connection]->(search_node:meta_node)
- WITH search, connect, common_node, collect(DISTINCT id(search_node)) AS search_nodes
- MATCH (connect)-[:connection]->(connect_node:meta_node)
- WITH search, connect, common_node, search_nodes, collect(DISTINCT id(connect_node)) AS connect_nodes
- WITH search, connect, search_nodes, connect_nodes, collect(DISTINCT id(common_node)) AS common_nodes
- // 剔除 search_nodes 和 connect_nodes 中包含在 common_nodes 中的内容
- WITH search, connect, common_nodes,
- [node IN search_nodes WHERE NOT node IN common_nodes] AS filtered_search_nodes,
- [node IN connect_nodes WHERE NOT node IN common_nodes] AS filtered_connect_nodes
- RETURN id(connect) as blood_resources, common_nodes,
- filtered_search_nodes as origin_nodes, filtered_connect_nodes as blood_nodes
- """
- with connect_graph().session() as session:
- result = session.run(query, resource_Ids=resource_ids)
- return result.data()
- # 创建一个数据模型节点
- def handle_data_model(data_model, result_list, result, receiver):
- """
- 创建一个数据模型节点
-
- Args:
- data_model: 数据模型名称
- result_list: 数据模型英文名列表
- result: 序列化的ID列表
- receiver: 接收到的请求参数
-
- Returns:
- tuple: (id, data_model_node)
- """
- try:
- # 添加数据资源 血缘关系的字段 blood_resource
- data_model_en = result_list[0] if result_list and len(result_list) > 0 else ""
- receiver['id_list'] = result
- add_attribute = {
- 'time': get_formatted_time(),
- 'en_name': data_model_en
- }
- receiver.update(add_attribute)
- data_model_node = get_node('data_model', name=data_model) or create_or_get_node('data_model', **receiver)
- logger.info(f"通过查询或创建节点获得节点ID111,data_model_node: {data_model_node}")
- # 获取节点ID,确保我们能安全地访问节点ID
- node_id = data_model_node
- if hasattr(data_model_node, 'id'):
- logger.info(f"通过节点ID获取节点ID222,data_model_node: {data_model_node}")
- node_id = data_model_node.id
- else:
- logger.info(f"通过查询节点名称获取节点ID333,data_model_node: {data_model_node}")
- # 如果节点没有id属性,尝试通过查询获取
- query = """
- MATCH (n:data_model {name: $name})
- RETURN id(n) as node_id
- """
- with connect_graph().session() as session:
- result = session.run(query, name=data_model)
- record = result.single()
- logger.info(f"通过查询节点名称获取节点ID444,record: {record}")
- if record and "node_id" in record:
- logger.info(f"通过查询节点名称获取节点ID555,record: {record}")
- node_id = record["node_id"]
-
- # 安全地处理子节点关系
- child_list = receiver.get('childrenId', [])
- for child_id in child_list:
- child_node = get_node_by_id_no_label(child_id)
- if child_node:
- # 直接使用Cypher查询检查关系是否存在
- with connect_graph().session() as session:
- rel_query = """
- MATCH (a)-[r:child]->(b)
- WHERE id(a) = $start_id AND id(b) = $end_id
- RETURN count(r) > 0 as exists
- """
- rel_result = session.run(rel_query,
- start_id=int(node_id),
- end_id=int(child_node.id)).single()
-
- # 如果关系不存在,则创建关系
- if not (rel_result and rel_result["exists"]):
- session.execute_write(
- lambda tx: tx.run(
- "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:child]->(b)",
- a_id=int(node_id), b_id=int(child_node.id)
- )
- )
- # 根据传入参数id,和数据标签建立关系
- if receiver.get('tag'):
- tag = get_node_by_id('data_label', receiver['tag'])
- if tag:
- # 直接使用Cypher查询检查关系是否存在
- with connect_graph().session() as session:
- rel_query = """
- MATCH (a)-[r:label]->(b)
- WHERE id(a) = $start_id AND id(b) = $end_id
- RETURN count(r) > 0 as exists
- """
- rel_result = session.run(rel_query,
- start_id=int(node_id),
- end_id=int(tag.id)).single()
-
- # 如果关系不存在,则创建关系
- if not (rel_result and rel_result["exists"]):
- session.execute_write(
- lambda tx: tx.run(
- "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:label]->(b)",
- a_id=int(node_id), b_id=int(tag.id)
- )
- )
- return node_id, data_model_node
- except Exception as e:
- logging.error(f"Error in handle_data_model: {str(e)}")
- raise
- # (从数据资源中选取)
- def resource_handle_meta_data_model(id_lists, data_model_node_id):
- """
- 处理从数据资源中选取的数据模型与元数据的关系
-
- Args:
- id_lists: ID列表
- data_model_node_id: 数据模型节点ID
-
- Returns:
- None
- """
- try:
- logger.info(f"开始处理数据模型与元数据的关系,数据模型ID: {data_model_node_id}")
-
- # 构建meta_id和resouce_id的列表
- resouce_ids = [record['resource_id'] for record in id_lists]
- meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']]
-
- logger.info(f"资源ID列表: {resouce_ids}")
- logger.info(f"元数据ID列表: {meta_ids}")
-
- # 创建与meta_node的关系 组成关系
- if meta_ids:
- logger.info("开始创建数据模型与元数据的关系")
- query = """
- MATCH (source:data_model), (target:meta_data)
- WHERE id(source)=$source_id AND id(target) IN $target_ids
- MERGE (source)-[:INCLUDE]->(target)
- RETURN count(*) as count
- """
- with connect_graph().session() as session:
- result = session.run(query, source_id=data_model_node_id, target_ids=meta_ids)
- count = result.single()["count"]
- logger.info(f"成功创建 {count} 个数据模型与元数据的关系")
- # 创建与data_resource的关系 资源关系
- if resouce_ids:
- logger.info("开始创建数据模型与数据资源的关系")
- query = """
- MATCH (source:data_model), (target:data_resource)
- WHERE id(source)=$source_id AND id(target) IN $target_ids
- MERGE (source)-[:DERIVES_FROM]->(target)
- RETURN count(*) as count
- """
- with connect_graph().session() as session:
- result = session.run(query, source_id=data_model_node_id, target_ids=resouce_ids)
- count = result.single()["count"]
- logger.info(f"成功创建 {count} 个数据模型与数据资源的关系")
-
- except Exception as e:
- logger.error(f"处理数据模型与元数据的关系时发生错误: {str(e)}")
- raise
- # (从数据模型中选取)
- def model_handle_meta_data_model(id_lists, data_model_node_id):
- """
- 处理从数据模型中选取的数据模型与元数据的关系
-
- Args:
- id_lists: ID列表
- data_model_node_id: 数据模型节点ID
-
- Returns:
- None
- """
- # 构建meta_id和model_id的列表
- model_ids = [record['model_id'] for record in id_lists]
- meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']]
-
- # 创建与meta_node的关系 组成关系
- if meta_ids:
- query = """
- MATCH (source:data_model), (target:meta_node)
- WHERE id(source)=$source_id AND id(target) IN $target_ids
- MERGE (source)-[:component]->(target)
- """
- with neo4j_driver.get_session() as session:
- session.run(query, source_id=data_model_node_id, target_ids=meta_ids)
- # 创建与data_model的关系 模型关系
- if model_ids:
- query = """
- MATCH (source:data_model), (target:data_model)
- WHERE id(source)=$source_id AND id(target) IN $target_ids
- MERGE (source)-[:use]->(target)
- """
- with neo4j_driver.get_session() as session:
- session.run(query, source_id=data_model_node_id, target_ids=model_ids)
- # (从DDL中选取)
- def handle_no_meta_data_model(id_lists, receiver, data_model_node):
- """
- 处理从DDL中选取的没有元数据的数据模型
-
- Args:
- id_lists: ID列表
- receiver: 接收到的请求参数
- data_model_node: 数据模型节点
-
- Returns:
- None
- """
- # 构建meta_id和resouce_id的列表
- resouce_ids = [record['resource_id'] for record in id_lists]
- meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']]
-
- # 获取数据模型节点ID
- data_model_node_id = None
- if hasattr(data_model_node, 'id'):
- data_model_node_id = data_model_node.id
- else:
- # 如果节点没有id属性,尝试通过查询获取
- query = """
- MATCH (n:data_model {name: $name})
- RETURN id(n) as node_id
- """
- with connect_graph().session() as session:
- result = session.run(query, name=data_model_node.get('name'))
- record = result.single()
- if record:
- data_model_node_id = record["node_id"]
-
- if not data_model_node_id:
- return
-
- # 创建与data_resource的关系 资源关系
- if resouce_ids:
- query = """
- MATCH (source:data_model), (target:data_resource)
- WHERE id(source)=$source_id AND id(target) IN $target_ids
- MERGE (source)-[:resource]->(target)
- """
- with connect_graph().session() as session:
- session.run(query, source_id=data_model_node_id, target_ids=resouce_ids)
- if meta_ids:
- meta_node_list = []
- for id in meta_ids:
- query = """
- MATCH (n)
- WHERE id(n) = $node_id
- RETURN n
- """
- with connect_graph().session() as session:
- result = session.run(query, node_id=id)
- if result:
- record = result.data()
- if record:
- meta_node_list.append(record[0]['n'])
-
- # 提取接收到的数据并创建meta_node节点
- meta_node = None
- resource_ids = []
-
- for item in id_lists:
- resource_id = item['resource_id']
- resource_ids.append(resource_id)
-
- for meta_item in item['metaData']:
- meta_id = meta_item['id']
- data_standard = meta_item.get('data_standard', '')
- en_name_zh = meta_item.get('en_name_zh', '')
- data_name = meta_item.get('data_name', '')
-
- # 使用传递的参数创建meta_node节点
- meta_params = {
- 'name': data_name,
- 'cn_name': en_name_zh,
- 'standard': data_standard,
- 'time': get_formatted_time()
- }
-
- # 创建meta_node节点
- meta_node = create_or_get_node('meta_node', **meta_params)
-
- # 获取数据模型节点ID
- dm_id = data_model_node_id if data_model_node_id is not None else data_model_node
-
- if meta_node:
- # 直接使用Cypher查询检查关系是否存在
- with connect_graph().session() as session:
- rel_query = """
- MATCH (a)-[r:component]->(b)
- WHERE id(a) = $start_id AND id(b) = $end_id
- RETURN count(r) > 0 as exists
- """
- rel_result = session.run(rel_query,
- start_id=int(dm_id),
- end_id=int(meta_node)).single()
-
- # 如果关系不存在,则创建关系
- if not (rel_result and rel_result["exists"]):
- session.execute_write(
- lambda tx: tx.run(
- "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:component]->(b)",
- a_id=int(dm_id), b_id=int(meta_node)
- )
- )
- # 数据模型-详情接口
- def handle_id_model(id):
- """
- 获取数据模型详情
-
- Args:
- id: 数据模型的节点ID
-
- Returns:
- dict: 包含数据模型详情的字典,格式为:
- {"data_model": {
- "resource_selected": [...],
- "leader": ...,
- "origin": ...,
- "frequency": ...,
- "childrenId": [...],
- "organization": ...,
- "name": ...,
- "en_name": ...,
- "data_sensitivity": ...,
- "describe": ...,
- "tag": ...,
- "time": ...,
- "category": ...,
- "status": ...
- }}
- """
- node_id = id
- cql = """
- MATCH (n:data_model) WHERE id(n) = $nodeId
- OPTIONAL MATCH (n)-[:INCLUDE]->(meta:meta_data)
- OPTIONAL MATCH (n)-[:DERIVES_FROM]->(resource:data_resource)
- OPTIONAL MATCH (n)-[:label]->(tag:data_label)
- OPTIONAL MATCH (uses:model_use)-[:use]->(n)
- OPTIONAL MATCH (n)-[:has_component]->(component)
- WITH n,
- collect(DISTINCT meta) as meta_nodes,
- collect(DISTINCT resource) as resources,
- collect(DISTINCT component) as components,
- collect(DISTINCT uses) as uses,
- collect(DISTINCT tag) as tags,
- CASE WHEN n.childrenId IS NOT NULL THEN n.childrenId ELSE [] END as children
- RETURN {
- // 基本信息
- id: id(n),
- name: n.name,
- en_name: n.en_name,
- time: n.time,
- describe: n.describe,
- category: n.category,
- level: n.level,
- tag: CASE WHEN size(tags) > 0 AND tags[0] IS NOT NULL THEN {id: id(tags[0]), name: tags[0].name} ELSE null END,
- // 添加其他必需字段
- leader: n.leader,
- origin: n.origin,
- blood_resource: n.blood_resource,
- frequency: n.frequency,
- organization: n.organization,
- data_sensitivity: n.data_sensitivity,
- status: n.status,
- // 子节点列表
- childrenId: children
- } AS result,
- // 资源列表
- [{
- data_resource: [resource IN resources WHERE resource IS NOT NULL | {
- id: id(resource),
- name: resource.name,
- en_name: resource.en_name,
- description: resource.description
- }],
- resource_id: [resource IN resources WHERE resource IS NOT NULL | id(resource)],
- meta_ids: [meta IN meta_nodes WHERE meta IS NOT NULL | {
- id: id(meta),
- name: meta.name,
- en_name: meta.en_name,
- data_type: meta.data_type
- }]
- }] AS resource_selected
- """
-
- with connect_graph().session() as session:
- result = session.run(cql, nodeId=node_id)
-
- # 处理查询结果
- record = result.single()
- logging.info(f"获得查询结果---------->>>{record}")
-
- if record:
- # 获取基本属性和资源选择列表
- properties = record["result"]
- resource_selected = record["resource_selected"]
-
- # 确保所有必需字段都有默认值,避免空值
- required_fields = ['tag', 'leader', 'origin', 'blood_resource',
- 'frequency', 'describe', 'organization', 'name', 'en_name',
- 'data_sensitivity', 'time', 'category', 'status', 'childrenId']
-
- for field in required_fields:
- if field not in properties or properties[field] is None:
- if field == 'tag':
- properties[field] = {}
- elif field == 'childrenId':
- properties[field] = []
- else:
- properties[field] = ""
-
- # 构建最终返回格式
- final_data = {
- "resource_selected": resource_selected,
- **properties
- }
-
- return {"data_model": final_data}
- else:
- # 如果没有查询到结果,返回空的结构
- return {"data_model": {
- "resource_selected": [{"meta_ids": [], "data_resource": None, "resource_id": None}],
- "leader": None, "origin": None, "frequency": None, "childrenId": [],
- "organization": None, "name": None, "en_name": None, "data_sensitivity": None,
- "describe": None, "tag": {}, "time": None, "category": None, "status": None
- }}
- # 数据模型列表
- def model_list(skip_count, page_size, en_name_filter=None, name_filter=None,
- category=None, tag=None, level=None):
- """
- 获取数据模型列表
-
- Args:
- skip_count: 跳过的数量
- page_size: 页面大小
- en_name_filter: 英文名称过滤条件
- name_filter: 名称过滤条件
- category: 类别过滤条件
- tag: 标签过滤条件
- level: 层级过滤条件
-
- Returns:
- tuple: (数据模型列表, 总数量)
- """
- try:
- # 构建where子句
- where_clause = []
- params = {}
- if name_filter is not None:
- where_clause.append("n.name =~ $name")
- params['name'] = f".*{name_filter}.*"
-
- if en_name_filter is not None:
- where_clause.append("n.en_name =~ $en_name")
- params['en_name'] = f".*{en_name_filter}.*"
- if category is not None:
- where_clause.append("n.category = $category")
- params['category'] = category
-
- if level is not None:
- where_clause.append("n.level = $level")
- params['level'] = level
-
- if tag is not None:
- where_clause.append("id(t) = $tag")
- params['tag'] = tag
- # At the end of where_clause construction
- where_str = " AND ".join(where_clause)
- if where_str:
- where_str = f"WHERE {where_str}"
-
- # 构建查询
- with connect_graph().session() as session:
- # 计算总数量
- count_query = f"""
- MATCH (n:data_model)
- OPTIONAL MATCH (n)-[:label]->(t)
- {where_str}
- RETURN COUNT(DISTINCT n) AS count
- """
- count_result = session.run(count_query, **params)
- count_record = count_result.single()
- total = count_record['count'] if count_record else 0
- # 查询数据
- query = f"""
- MATCH (n:data_model)
- OPTIONAL MATCH (n)-[:label]->(t)
- {where_str}
- RETURN DISTINCT
- id(n) as id,
- n.name as name,
- n.en_name as en_name,
- n.time as time,
- n.describe as describe,
- n.level as level,
- n.category as category,
- n.status as status,
- n.leader as leader,
- n.origin as origin,
- n.blood_resource as blood_resource,
- n.organization as organization,
- id(t) as tag_id,
- t.name as tag_name
- ORDER BY n.time DESC
- SKIP $skip
- LIMIT $limit
- """
-
- result = session.run(query, skip=skip_count, limit=page_size, **params)
-
- # 处理结果
- data = []
- for record in result:
- item = {
- "id": record['id'],
- "name": record['name'],
- "en_name": record['en_name'],
- "time": record['time'],
- "describe": record['describe'],
- "category": record['category'],
- "status": record['status'],
- "leader": record['leader'],
- "origin": record['origin'],
- "blood_resource": record['blood_resource'],
- "organization": record['organization'],
- "level": record['level'],
- "tag": {"id": record['tag_id'], "name": record['tag_name']} if record['tag_id'] is not None else None
- }
- data.append(item)
-
- return data, total
- except Exception as e:
- print(f"Error in model_list: {str(e)}")
- import traceback
- traceback.print_exc()
- return [], 0
- # 有血缘关系的数据资源列表
- def model_resource_list(skip_count, page_size, name_filter=None, id=None,
- category=None, time=None):
- """
- 获取数据模型相关的数据资源列表
-
- Args:
- skip_count: 跳过的数量
- page_size: 页面大小
- name_filter: 名称过滤条件
- id: 数据模型ID
- category: 类别过滤条件
- time: 时间过滤条件
-
- Returns:
- tuple: (数据资源列表, 总数量)
- """
- try:
- # 构建基础查询
- base_query = """
- MATCH (n:data_model)
- WHERE id(n) = $nodeId
- MATCH (n)-[:children]->(m:data_resource)
- """
-
- # 计算总数量
- count_query = base_query + """
- RETURN COUNT(m) as count
- """
-
- with connect_graph().session() as session:
- # 执行计数查询
- count_result = session.run(count_query, nodeId=id)
- count_record = count_result.single()
- total = count_record['count'] if count_record else 0
-
- # 使用分页和筛选条件构建主查询
- main_query = base_query + """
- MATCH (m)-[:label]->(l)
- WHERE id(n) = $nodeId and labels(m) <> ['meta_node']
- RETURN m.name as name,
- m.en_name as en_name,
- id(m) as id,
- l.name as label,
- m.time as time,
- m.description as description,
- m.category as category
- ORDER BY m.time DESC
- SKIP $skip LIMIT $limit
- """
-
- # 执行主查询
- result = session.run(main_query, nodeId=id, skip=skip_count, limit=page_size)
-
- # 处理结果
- data = []
- for record in result:
- item = {
- "name": record['name'],
- "en_name": record['en_name'],
- "id": record['id'],
- "label": record['label'],
- "time": record['time'],
- "description": record['description'],
- "category": record['category']
- }
- data.append(item)
-
- return data, total
- except Exception as e:
- print(f"Error in model_resource_list: {str(e)}")
- import traceback
- traceback.print_exc()
- return [], 0
- # 数据模型血缘图谱
- def model_kinship_graph(nodeid, meta=False):
- """
- 生成数据模型的血缘关系图谱
-
- Args:
- nodeid: 节点ID
- meta: 是否包含元数据
-
- Returns:
- dict: 包含节点和连线信息的图谱数据
- """
- result = {}
-
- with connect_graph().session() as session:
- # 查询起始模型节点
- start_node_query = """
- MATCH (n:data_model)
- WHERE id(n) = $nodeId
- RETURN n.name as name, n.en_name as en_name
- """
-
- start_result = session.run(start_node_query, nodeId=nodeid)
- start_record = start_result.single()
-
- if not start_record:
- return {"nodes": [], "lines": []}
-
- # 查询与模型关联的数据资源
- resource_query = """
- MATCH (n:data_model)
- WHERE id(n) = $nodeId
- MATCH p = (n)-[:children]->(resource:data_resource)
- RETURN resource
- """
-
- resource_result = session.run(resource_query, nodeId=nodeid)
-
- nodes = [{"id": str(nodeid), "text": start_record['name'], "type": "model"}]
- lines = []
-
- # 处理资源节点
- for record in resource_result:
- if 'resource' in record:
- resource = record['resource']
- resource_id = str(resource.id)
- resource_name = resource.get('name', '')
- resource_en_name = resource.get('en_name', '')
-
- # 创建资源节点
- resource_node = {
- "id": resource_id,
- "text": resource_name,
- "type": "resource"
- }
- nodes.append(resource_node)
-
- # 创建资源到模型的关系
- line = {
- "from": str(nodeid),
- "to": resource_id,
- "text": "resource"
- }
- lines.append(line)
- # 处理元数据节点
- if meta:
- meta_query = """
- MATCH (n:data_model)
- WHERE id(n) = $nodeId and labels(m) <> ['meta_node']
- MATCH p = (n)-[:meta]->(meta:meta_node)
- RETURN meta
- """
- meta_result = session.run(meta_query, nodeId=nodeid)
-
- for record in meta_result:
- if 'meta' in record:
- meta_node = record['meta']
- meta_id = str(meta.id)
- meta_name = meta.get('name', '')
- meta_en_name = meta.get('en_name', '')
-
- # 创建元数据节点
- meta_node = {
- "id": meta_id,
- "text": meta_name,
- "type": "meta"
- }
- nodes.append(meta_node)
-
- # 创建模型到元数据的标签关系
- tag_line = {
- "from": str(nodeid),
- "to": meta_id,
- "text": "component"
- }
- lines.append(tag_line)
- # 构建结果
- result = {
- "nodes": nodes,
- "lines": lines
- }
- return result
- # 数据模型影响图谱
- def model_impact_graph(nodeid, meta=False):
- """
- 生成数据模型的影响关系图谱
-
- Args:
- nodeid: 节点ID
- meta: 是否包含元数据
-
- Returns:
- dict: 包含节点和连线信息的图谱数据
- """
- result = {}
-
- with connect_graph().session() as session:
- # 查询起始模型节点
- start_node_query = """
- MATCH (n:data_model)
- WHERE id(n) = $nodeId
- RETURN n.name as name, n.en_name as en_name
- """
-
- start_result = session.run(start_node_query, nodeId=nodeid)
- start_record = start_result.single()
-
- if not start_record:
- return {"nodes": [], "lines": []}
-
- # 查询影响模型的数据资源
- resource_query = """
- MATCH (n:data_model)
- WHERE id(n) = $nodeId
- MATCH p = (n)-[:children]->(resource:data_resource)
- RETURN resource
- """
-
- resource_result = session.run(resource_query, nodeId=nodeid)
-
- nodes = [{"id": str(nodeid), "text": start_record['name'], "type": "model"}]
- lines = []
-
- # 处理资源节点
- for record in resource_result:
- if 'resource' in record:
- resource = record['resource']
- resource_id = str(resource.id)
- resource_name = resource.get('name', '')
- resource_en_name = resource.get('en_name', '')
-
- # 创建资源节点
- resource_node = {
- "id": resource_id,
- "text": resource_name,
- "type": "resource"
- }
- nodes.append(resource_node)
-
- # 创建资源到模型的关系
- line = {
- "from": str(nodeid),
- "to": resource_id,
- "text": "resource"
- }
- lines.append(line)
- # 处理元数据节点
- if meta:
- meta_query = """
- MATCH (n:data_model)
- WHERE id(n) = $nodeId and labels(m) <> ['meta_node']
- MATCH p = (n)-[:meta]->(meta:meta_node)
- RETURN meta
- """
- meta_result = session.run(meta_query, nodeId=nodeid)
-
- for record in meta_result:
- if 'meta' in record:
- meta_node = record['meta']
- meta_id = str(meta.id)
- meta_name = meta.get('name', '')
- meta_en_name = meta.get('en_name', '')
-
- # 创建元数据节点
- meta_node = {
- "id": meta_id,
- "text": meta_name,
- "type": "meta"
- }
- nodes.append(meta_node)
-
- # 创建模型到元数据的标签关系
- tag_line = {
- "from": str(nodeid),
- "to": meta_id,
- "text": "component"
- }
- lines.append(tag_line)
- # 构建结果
- result = {
- "nodes": nodes,
- "lines": lines
- }
- return result
- # 数据模型全部图谱
- def model_all_graph(nodeid, meta=False):
- """
- 生成数据模型的所有关系图谱
-
- Args:
- nodeid: 节点ID
- meta: 是否包含元数据
-
- Returns:
- dict: 包含节点和连线信息的图谱数据
- """
- result = {}
-
- with connect_graph().session() as session:
- # 查询起始模型节点
- start_node_query = """
- MATCH (n:data_model)
- WHERE id(n) = $nodeId
- RETURN n.name as name, n.en_name as en_name
- """
-
- start_result = session.run(start_node_query, nodeId=nodeid)
- start_record = start_result.single()
-
- if not start_record:
- return {"nodes": [], "lines": []}
-
- # 查询与模型关联的数据资源
- resource_query = """
- MATCH (n:data_model)
- WHERE id(n) = $nodeId
- MATCH p = (n)-[:children]->(resource:data_resource)
- RETURN resource
- """
-
- resource_result = session.run(resource_query, nodeId=nodeid)
-
- # 查询与模型关联的元数据
- meta_query = """
- MATCH (n:data_model)
- WHERE id(n) = $nodeId and labels(m) <> ['meta_node']
- MATCH p = (n)-[:meta]->(meta:meta_node)
- RETURN meta
- """
-
- nodes = [{"id": str(nodeid), "text": start_record['name'], "type": "model"}]
- lines = []
-
- # 处理资源节点
- for record in resource_result:
- if 'resource' in record:
- resource = record['resource']
- resource_id = str(resource.id)
- resource_name = resource.get('name', '')
- resource_en_name = resource.get('en_name', '')
-
- # 创建资源节点
- resource_node = {
- "id": resource_id,
- "text": resource_name,
- "type": "resource"
- }
- nodes.append(resource_node)
-
- # 创建资源到模型的关系
- line = {
- "from": str(nodeid),
- "to": resource_id,
- "text": "resource"
- }
- lines.append(line)
- # 处理元数据节点
- if meta:
- meta_result = session.run(meta_query, nodeId=nodeid)
-
- for record in meta_result:
- if 'meta' in record:
- meta_node = record['meta']
- meta_id = str(meta.id)
- meta_name = meta.get('name', '')
- meta_en_name = meta.get('en_name', '')
-
- # 创建元数据节点
- meta_node = {
- "id": meta_id,
- "text": meta_name,
- "type": "meta"
- }
- nodes.append(meta_node)
-
- # 创建模型到元数据的标签关系
- tag_line = {
- "from": str(nodeid),
- "to": meta_id,
- "text": "component"
- }
- lines.append(tag_line)
- # 构建结果
- result = {
- "nodes": nodes,
- "lines": lines
- }
- return result
- # 更新数据模型
- def data_model_edit(receiver):
- """
- 更新数据模型
-
- Args:
- receiver: 接收到的请求参数
-
- Returns:
- 更新结果
- """
- id = receiver.get('id')
- name = receiver.get('name')
- en_name = receiver.get('en_name')
- category = receiver.get('category')
- describe = receiver.get('describe')
- tag = receiver.get('tag')
-
- # 更新数据模型节点
- query = """
- MATCH (n:data_model) WHERE id(n) = $id
- SET n.name = $name, n.en_name = $en_name, n.category = $category, n.describe = $describe
- RETURN n
- """
-
- with connect_graph().session() as session:
- result = session.run(query, id=id, name=name, en_name=en_name,
- category=category, describe=describe).data()
-
- # 处理标签关系
- if tag:
- # 先删除所有标签关系
- delete_query = """
- MATCH (n:data_model)-[r:label]->() WHERE id(n) = $id
- DELETE r
- """
- with connect_graph().session() as session:
- session.run(delete_query, id=id)
-
- # 再创建新的标签关系
- tag_node = get_node_by_id('data_label', tag)
- if tag_node:
- model_node = get_node_by_id_no_label(id)
- if model_node:
- # 获取节点ID
- model_id = model_node.id if hasattr(model_node, 'id') else model_node
- tag_id = tag_node.id if hasattr(tag_node, 'id') else tag_node
-
- # 直接使用Cypher查询检查关系是否存在
- with connect_graph().session() as session:
- rel_query = """
- MATCH (a)-[r:label]->(b)
- WHERE id(a) = $start_id AND id(b) = $end_id
- RETURN count(r) > 0 as exists
- """
- rel_result = session.run(rel_query,
- start_id=int(model_id),
- end_id=int(tag_id)).single()
-
- # 如果关系不存在,则创建关系
- if not (rel_result and rel_result["exists"]):
- session.execute_write(
- lambda tx: tx.run(
- "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:label]->(b)",
- a_id=int(model_id), b_id=int(tag_id)
- )
- )
-
- return {"message": "数据模型更新成功"}
|