123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047 |
- """
- 数据模型核心业务逻辑模块
- 本模块包含了数据模型相关的所有核心业务逻辑函数,包括:
- - 数据模型的创建、更新、删除
- - 数据模型与数据资源、元数据之间的关系处理
- - 数据模型血缘关系管理
- - 数据模型图谱生成
- - 数据模型层级计算等功能
- """
- import math
- import threading
- from concurrent.futures import ThreadPoolExecutor
- import pandas as pd
- from py2neo import Relationship
- import logging
- import json
- # Configure logger
- logger = logging.getLogger(__name__)
- from app.core.graph.graph_operations import relationship_exists
- from app.core.graph.graph_operations import connect_graph,create_or_get_node,get_node
- from app.services.neo4j_driver import neo4j_driver
- from app.core.meta_data import get_formatted_time, handle_id_unstructured
- from app.core.common import delete_relationships, update_or_create_node, get_node_by_id_no_label
- from app.core.data_resource.resource import get_node_by_id
- # 根据child关系计算数据模型当前的level自动保存
- def calculate_model_level(id):
- """
- 根据child关系计算数据模型当前的level并自动保存
-
- Args:
- id: 数据模型的节点ID(整数)
-
- Returns:
- None
- """
- # 确保id是整数类型
- node_id = int(id) if id is not None else None
-
- cql = """
- MATCH (start_node:data_model)
- WHERE id(start_node) = $nodeId
- CALL {
- WITH start_node
- OPTIONAL MATCH path = (start_node)-[:child*]->(end_node)
- RETURN length(path) AS level
- }
- WITH coalesce(max(level), 0) AS max_level
- RETURN max_level
- """
-
- with connect_graph().session() as session:
- result = session.run(cql, nodeId=node_id)
- record = result.single()
- data = record["max_level"] if record and "max_level" in record else 0
-
- # 更新level属性
- update_query = """
- MATCH (n:data_model)
- WHERE id(n) = $nodeId
- SET n.level = $level
- RETURN n
- """
-
- with connect_graph().session() as session:
- session.run(update_query, nodeId=node_id, level=data)
- # 处理数据模型血缘关系
- def handle_model_relation(resource_ids):
- """
- 处理数据模型血缘关系
-
- Args:
- resource_ids: 数据资源ID
-
- Returns:
- 血缘关系数据
- """
- query = """
- MATCH (search:data_resource)-[:connection]->(common_node:meta_node)<-[:connection]-(connect:data_resource)
- WHERE id(search) = $resource_Ids
- WITH search, connect, common_node
- MATCH (search)-[:connection]->(search_node:meta_node)
- WITH search, connect, common_node, collect(DISTINCT id(search_node)) AS search_nodes
- MATCH (connect)-[:connection]->(connect_node:meta_node)
- WITH search, connect, common_node, search_nodes, collect(DISTINCT id(connect_node)) AS connect_nodes
- WITH search, connect, search_nodes, connect_nodes, collect(DISTINCT id(common_node)) AS common_nodes
- // 剔除 search_nodes 和 connect_nodes 中包含在 common_nodes 中的内容
- WITH search, connect, common_nodes,
- [node IN search_nodes WHERE NOT node IN common_nodes] AS filtered_search_nodes,
- [node IN connect_nodes WHERE NOT node IN common_nodes] AS filtered_connect_nodes
- RETURN id(connect) as blood_resources, common_nodes,
- filtered_search_nodes as origin_nodes, filtered_connect_nodes as blood_nodes
- """
- with connect_graph().session() as session:
- result = session.run(query, resource_Ids=resource_ids)
- return result.data()
- # 创建一个数据模型节点
- def handle_data_model(data_model, result_list, result, receiver):
- """
- 创建一个数据模型节点
-
- Args:
- data_model: 数据模型名称
- result_list: 数据模型英文名列表
- result: 序列化的ID列表
- receiver: 接收到的请求参数
-
- Returns:
- tuple: (id, data_model_node)
- """
- try:
- # 添加数据资源 血缘关系的字段 blood_resource
- data_model_en = result_list[0] if result_list and len(result_list) > 0 else ""
- receiver['id_list'] = result
- add_attribute = {
- 'time': get_formatted_time(),
- 'en_name': data_model_en
- }
- receiver.update(add_attribute)
- data_model_node = get_node('data_model', name=data_model) or create_or_get_node('data_model', **receiver)
- # 安全地处理子节点关系
- child_list = receiver.get('childrenId', [])
- for child_id in child_list:
- child_node = get_node_by_id_no_label(child_id)
- if child_node and not relationship_exists(data_model_node, 'child', child_node):
- with connect_graph().session() as session:
- session.execute_write(
- lambda tx: tx.run(
- "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:child]->(b)",
- a_id=data_model_node.id, b_id=child_node.id
- )
- )
- # 根据传入参数id,和数据标签建立关系
- if receiver.get('tag'):
- tag = get_node_by_id('data_label', receiver['tag'])
- if tag and not relationship_exists(data_model_node, 'label', tag):
- with connect_graph().session() as session:
- session.execute_write(
- lambda tx: tx.run(
- "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:label]->(b)",
- a_id=data_model_node.id, b_id=tag.id
- )
- )
- # 获取节点ID
- node_id = None
- if hasattr(data_model_node, 'id'):
- node_id = data_model_node.id
- else:
- # 如果节点没有identity属性,尝试通过查询获取
- query = """
- MATCH (n:data_model {name: $name})
- RETURN id(n) as node_id
- """
- with connect_graph().session() as session:
- result = session.run(query, name=data_model)
- record = result.single()
- if record and "node_id" in record:
- node_id = record["node_id"]
- return node_id, data_model_node
- except Exception as e:
- logging.error(f"Error in handle_data_model: {str(e)}")
- raise
- # (从数据资源中选取)
- def resource_handle_meta_data_model(id_lists, data_model_node_id):
- """
- 处理从数据资源中选取的数据模型与元数据的关系
-
- Args:
- id_lists: ID列表
- data_model_node_id: 数据模型节点ID
-
- Returns:
- None
- """
- try:
- logger.info(f"开始处理数据模型与元数据的关系,数据模型ID: {data_model_node_id}")
-
- # 构建meta_id和resouce_id的列表
- resouce_ids = [record['resource_id'] for record in id_lists]
- meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']]
-
- logger.info(f"资源ID列表: {resouce_ids}")
- logger.info(f"元数据ID列表: {meta_ids}")
-
- # 创建与meta_node的关系 组成关系
- if meta_ids:
- logger.info("开始创建数据模型与元数据的关系")
- query = """
- MATCH (source:data_model), (target:meta_node)
- WHERE id(source)=$source_id AND id(target) IN $target_ids
- MERGE (source)-[:INCLUDE]->(target)
- RETURN count(*) as count
- """
- with connect_graph().session() as session:
- result = session.run(query, source_id=data_model_node_id, target_ids=meta_ids)
- count = result.single()["count"]
- logger.info(f"成功创建 {count} 个数据模型与元数据的关系")
- # 创建与data_resource的关系 资源关系
- if resouce_ids:
- logger.info("开始创建数据模型与数据资源的关系")
- query = """
- MATCH (source:data_model), (target:data_resource)
- WHERE id(source)=$source_id AND id(target) IN $target_ids
- MERGE (source)-[:DERIVES_FROM]->(target)
- RETURN count(*) as count
- """
- with connect_graph().session() as session:
- result = session.run(query, source_id=data_model_node_id, target_ids=resouce_ids)
- count = result.single()["count"]
- logger.info(f"成功创建 {count} 个数据模型与数据资源的关系")
-
- except Exception as e:
- logger.error(f"处理数据模型与元数据的关系时发生错误: {str(e)}")
- raise
- # (从数据模型中选取)
- def model_handle_meta_data_model(id_lists, data_model_node_id):
- """
- 处理从数据模型中选取的数据模型与元数据的关系
-
- Args:
- id_lists: ID列表
- data_model_node_id: 数据模型节点ID
-
- Returns:
- None
- """
- # 构建meta_id和model_id的列表
- model_ids = [record['model_id'] for record in id_lists]
- meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']]
-
- # 创建与meta_node的关系 组成关系
- if meta_ids:
- query = """
- MATCH (source:data_model), (target:meta_node)
- WHERE id(source)=$source_id AND id(target) IN $target_ids
- MERGE (source)-[:component]->(target)
- """
- with neo4j_driver.get_session() as session:
- session.run(query, source_id=data_model_node_id, target_ids=meta_ids)
- # 创建与data_model的关系 模型关系
- if model_ids:
- query = """
- MATCH (source:data_model), (target:data_model)
- WHERE id(source)=$source_id AND id(target) IN $target_ids
- MERGE (source)-[:use]->(target)
- """
- with neo4j_driver.get_session() as session:
- session.run(query, source_id=data_model_node_id, target_ids=model_ids)
- # (从DDL中选取)
- def handle_no_meta_data_model(id_lists, receiver, data_model_node):
- """
- 处理从DDL中选取的没有元数据的数据模型
-
- Args:
- id_lists: ID列表
- receiver: 接收到的请求参数
- data_model_node: 数据模型节点
-
- Returns:
- None
- """
- # 构建meta_id和resouce_id的列表
- resouce_ids = [record['resource_id'] for record in id_lists]
- meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']]
-
- # 获取数据模型节点ID
- data_model_node_id = None
- if hasattr(data_model_node, 'id'):
- data_model_node_id = data_model_node.id
- else:
- # 如果节点没有id属性,尝试通过查询获取
- query = """
- MATCH (n:data_model {name: $name})
- RETURN id(n) as node_id
- """
- with connect_graph().session() as session:
- result = session.run(query, name=data_model_node.get('name'))
- record = result.single()
- if record:
- data_model_node_id = record["node_id"]
-
- if not data_model_node_id:
- return
-
- # 创建与data_resource的关系 资源关系
- if resouce_ids:
- query = """
- MATCH (source:data_model), (target:data_resource)
- WHERE id(source)=$source_id AND id(target) IN $target_ids
- MERGE (source)-[:resource]->(target)
- """
- with connect_graph().session() as session:
- session.run(query, source_id=data_model_node_id, target_ids=resouce_ids)
- if meta_ids:
- meta_node_list = []
- for id in meta_ids:
- query = """
- MATCH (n)
- WHERE id(n) = $node_id
- RETURN n
- """
- with connect_graph().session() as session:
- result = session.run(query, node_id=id)
- if result:
- record = result.data()
- if record:
- meta_node_list.append(record[0]['n'])
-
- # 提取接收到的数据并创建meta_node节点
- meta_node = None
- resource_ids = []
-
- for item in id_lists:
- resource_id = item['resource_id']
- resource_ids.append(resource_id)
-
- for meta_item in item['metaData']:
- meta_id = meta_item['id']
- data_standard = meta_item.get('data_standard', '')
- en_name_zh = meta_item.get('en_name_zh', '')
- data_name = meta_item.get('data_name', '')
-
- # 使用传递的参数创建meta_node节点
- meta_params = {
- 'name': data_name,
- 'cn_name': en_name_zh,
- 'standard': data_standard,
- 'time': get_formatted_time()
- }
-
- # 创建meta_node节点
- meta_node = create_or_get_node('meta_node', **meta_params)
-
- # 创建与data_model的关系
- if meta_node and not relationship_exists(data_model_node, 'component', meta_node):
- with connect_graph().session() as session:
- session.execute_write(
- lambda tx: tx.run(
- "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:component]->(b)",
- a_id=data_model_node.id, b_id=meta_node.id
- )
- )
- # 数据模型-详情接口
- def handle_id_model(id):
- """
- 获取数据模型详情
-
- Args:
- id: 数据模型的节点ID
-
- Returns:
- dict: 包含数据模型详情的字典,格式为:
- {"data_model": {
- "resource_selected": [...],
- "leader": ...,
- "origin": ...,
- "frequency": ...,
- "childrenId": [...],
- "organization": ...,
- "name": ...,
- "en_name": ...,
- "data_sensitivity": ...,
- "describe": ...,
- "tag": ...,
- "time": ...,
- "category": ...,
- "status": ...
- }}
- """
- node_id = id
- cql = """
- MATCH (n:data_model) WHERE id(n) = $nodeId
- OPTIONAL MATCH (n)-[:connection]->(meta:meta_node)
- OPTIONAL MATCH (n)<-[:belongs_to]-(resource:data_resource)
- OPTIONAL MATCH (n)-[:label]->(tag:data_label)
- OPTIONAL MATCH (uses:model_use)-[:use]->(n)
- OPTIONAL MATCH (n)-[:has_component]->(component)
- WITH n,
- collect(DISTINCT meta) as meta_nodes,
- collect(DISTINCT resource) as resources,
- collect(DISTINCT component) as components,
- collect(DISTINCT uses) as uses,
- collect(DISTINCT tag) as tags,
- CASE WHEN n.childrenId IS NOT NULL THEN n.childrenId ELSE [] END as children
- RETURN {
- // 基本信息
- id: id(n),
- name: n.name,
- en_name: n.en_name,
- time: n.time,
- description: n.description,
- describe: n.description, // 使用description作为describe字段
- category: n.category,
- level: n.level,
- tag: CASE WHEN size(tags) > 0 AND tags[0] IS NOT NULL THEN {id: id(tags[0]), name: tags[0].name} ELSE null END,
- // 添加其他必需字段
- leader: n.leader,
- origin: n.origin,
- blood_resource: n.blood_resource,
- frequency: n.frequency,
- organization: n.organization,
- data_sensitivity: n.data_sensitivity,
- status: n.status,
- // 子节点列表
- childrenId: children
- } AS result,
- // 资源列表
- [{
- data_resource: [resource IN resources WHERE resource IS NOT NULL | {
- id: id(resource),
- name: resource.name,
- en_name: resource.en_name,
- description: resource.description
- }],
- resource_id: [resource IN resources WHERE resource IS NOT NULL | id(resource)],
- meta_ids: [meta IN meta_nodes WHERE meta IS NOT NULL | {
- id: id(meta),
- name: meta.name,
- en_name: meta.en_name,
- data_type: meta.data_type
- }]
- }] AS resource_selected
- """
-
- with connect_graph().session() as session:
- result = session.run(cql, nodeId=node_id)
-
- # 处理查询结果
- record = result.single()
- logging.info(f"获得查询结果---------->>>{record}")
-
- if record:
- # 获取基本属性和资源选择列表
- properties = record["result"]
- resource_selected = record["resource_selected"]
-
- # 确保所有必需字段都有默认值,避免空值
- required_fields = ['tag', 'description', 'leader', 'origin', 'blood_resource',
- 'frequency', 'describe', 'organization', 'name', 'en_name',
- 'data_sensitivity', 'time', 'category', 'status', 'childrenId']
-
- for field in required_fields:
- if field not in properties or properties[field] is None:
- if field == 'tag':
- properties[field] = {}
- elif field == 'childrenId':
- properties[field] = []
- else:
- properties[field] = ""
-
- # 构建最终返回格式
- final_data = {
- "resource_selected": resource_selected,
- **properties
- }
-
- return {"data_model": final_data}
- else:
- # 如果没有查询到结果,返回空的结构
- return {"data_model": {
- "resource_selected": [{"meta_ids": [], "data_resource": None, "resource_id": None}],
- "leader": None, "origin": None, "frequency": None, "childrenId": [],
- "organization": None, "name": None, "en_name": None, "data_sensitivity": None,
- "describe": None, "tag": {}, "time": None, "category": None, "status": None
- }}
- # 数据模型列表
- def model_list(skip_count, page_size, en_name_filter=None, name_filter=None,
- category=None, tag=None, level=None):
- """
- 获取数据模型列表
-
- Args:
- skip_count: 跳过的数量
- page_size: 页面大小
- en_name_filter: 英文名称过滤条件
- name_filter: 名称过滤条件
- category: 类别过滤条件
- tag: 标签过滤条件
- level: 层级过滤条件
-
- Returns:
- tuple: (数据模型列表, 总数量)
- """
- try:
- # 构建where子句
- where_clause = []
- params = {}
- if name_filter is not None:
- where_clause.append("n.name =~ $name")
- params['name'] = f".*{name_filter}.*"
-
- if en_name_filter is not None:
- where_clause.append("n.en_name =~ $en_name")
- params['en_name'] = f".*{en_name_filter}.*"
- if category is not None:
- where_clause.append("n.category = $category")
- params['category'] = category
-
- if level is not None:
- where_clause.append("n.level = $level")
- params['level'] = level
-
- if tag is not None:
- where_clause.append("id(t) = $tag")
- params['tag'] = tag
- # At the end of where_clause construction
- where_str = " AND ".join(where_clause)
- if where_str:
- where_str = f"WHERE {where_str}"
-
- # 构建查询
- with connect_graph().session() as session:
- # 计算总数量
- count_query = f"""
- MATCH (n:data_model)
- OPTIONAL MATCH (n)-[:label]->(t)
- {where_str}
- RETURN COUNT(DISTINCT n) AS count
- """
- count_result = session.run(count_query, **params)
- count_record = count_result.single()
- total = count_record['count'] if count_record else 0
- # 查询数据
- query = f"""
- MATCH (n:data_model)
- OPTIONAL MATCH (n)-[:label]->(t)
- {where_str}
- RETURN DISTINCT
- id(n) as id,
- n.name as name,
- n.en_name as en_name,
- n.time as time,
- n.description as description,
- n.level as level,
- id(t) as tag_id,
- t.name as tag_name
- ORDER BY n.time DESC
- SKIP $skip
- LIMIT $limit
- """
-
- result = session.run(query, skip=skip_count, limit=page_size, **params)
-
- # 处理结果
- data = []
- for record in result:
- item = {
- "id": record['id'],
- "name": record['name'],
- "en_name": record['en_name'],
- "time": record['time'],
- "description": record['description'],
- "level": record['level'],
- "tag": {"id": record['tag_id'], "name": record['tag_name']} if record['tag_id'] is not None else None
- }
- data.append(item)
-
- return data, total
- except Exception as e:
- print(f"Error in model_list: {str(e)}")
- import traceback
- traceback.print_exc()
- return [], 0
- # 有血缘关系的数据资源列表
- def model_resource_list(skip_count, page_size, name_filter=None, id=None,
- category=None, time=None):
- """
- 获取数据模型相关的数据资源列表
-
- Args:
- skip_count: 跳过的数量
- page_size: 页面大小
- name_filter: 名称过滤条件
- id: 数据模型ID
- category: 类别过滤条件
- time: 时间过滤条件
-
- Returns:
- tuple: (数据资源列表, 总数量)
- """
- try:
- # 构建基础查询
- base_query = """
- MATCH (n:data_model)
- WHERE id(n) = $nodeId
- MATCH (n)-[:children]->(m:data_resource)
- """
-
- # 计算总数量
- count_query = base_query + """
- RETURN COUNT(m) as count
- """
-
- with connect_graph().session() as session:
- # 执行计数查询
- count_result = session.run(count_query, nodeId=id)
- count_record = count_result.single()
- total = count_record['count'] if count_record else 0
-
- # 使用分页和筛选条件构建主查询
- main_query = base_query + """
- MATCH (m)-[:label]->(l)
- WHERE id(n) = $nodeId and labels(m) <> ['meta_node']
- RETURN m.name as name,
- m.en_name as en_name,
- id(m) as id,
- l.name as label,
- m.time as time,
- m.description as description,
- m.category as category
- ORDER BY m.time DESC
- SKIP $skip LIMIT $limit
- """
-
- # 执行主查询
- result = session.run(main_query, nodeId=id, skip=skip_count, limit=page_size)
-
- # 处理结果
- data = []
- for record in result:
- item = {
- "name": record['name'],
- "en_name": record['en_name'],
- "id": record['id'],
- "label": record['label'],
- "time": record['time'],
- "description": record['description'],
- "category": record['category']
- }
- data.append(item)
-
- return data, total
- except Exception as e:
- print(f"Error in model_resource_list: {str(e)}")
- import traceback
- traceback.print_exc()
- return [], 0
- # 数据模型血缘图谱
- def model_kinship_graph(nodeid, meta=False):
- """
- 生成数据模型的血缘关系图谱
-
- Args:
- nodeid: 节点ID
- meta: 是否包含元数据
-
- Returns:
- dict: 包含节点和连线信息的图谱数据
- """
- result = {}
-
- with connect_graph().session() as session:
- # 查询起始模型节点
- start_node_query = """
- MATCH (n:data_model)
- WHERE id(n) = $nodeId
- RETURN n.name as name, n.en_name as en_name
- """
-
- start_result = session.run(start_node_query, nodeId=nodeid)
- start_record = start_result.single()
-
- if not start_record:
- return {"nodes": [], "lines": []}
-
- # 查询与模型关联的数据资源
- resource_query = """
- MATCH (n:data_model)
- WHERE id(n) = $nodeId
- MATCH p = (n)-[:children]->(resource:data_resource)
- RETURN resource
- """
-
- resource_result = session.run(resource_query, nodeId=nodeid)
-
- nodes = [{"id": str(nodeid), "text": start_record['name'], "type": "model"}]
- lines = []
-
- # 处理资源节点
- for record in resource_result:
- if 'resource' in record:
- resource = record['resource']
- resource_id = str(resource.id)
- resource_name = resource.get('name', '')
- resource_en_name = resource.get('en_name', '')
-
- # 创建资源节点
- resource_node = {
- "id": resource_id,
- "text": resource_name,
- "type": "resource"
- }
- nodes.append(resource_node)
-
- # 创建资源到模型的关系
- line = {
- "from": str(nodeid),
- "to": resource_id,
- "text": "resource"
- }
- lines.append(line)
- # 处理元数据节点
- if meta:
- meta_query = """
- MATCH (n:data_model)
- WHERE id(n) = $nodeId and labels(m) <> ['meta_node']
- MATCH p = (n)-[:meta]->(meta:meta_node)
- RETURN meta
- """
- meta_result = session.run(meta_query, nodeId=nodeid)
-
- for record in meta_result:
- if 'meta' in record:
- meta_node = record['meta']
- meta_id = str(meta.id)
- meta_name = meta.get('name', '')
- meta_en_name = meta.get('en_name', '')
-
- # 创建元数据节点
- meta_node = {
- "id": meta_id,
- "text": meta_name,
- "type": "meta"
- }
- nodes.append(meta_node)
-
- # 创建模型到元数据的标签关系
- tag_line = {
- "from": str(nodeid),
- "to": meta_id,
- "text": "component"
- }
- lines.append(tag_line)
- # 构建结果
- result = {
- "nodes": nodes,
- "lines": lines
- }
- return result
- # 数据模型影响图谱
- def model_impact_graph(nodeid, meta=False):
- """
- 生成数据模型的影响关系图谱
-
- Args:
- nodeid: 节点ID
- meta: 是否包含元数据
-
- Returns:
- dict: 包含节点和连线信息的图谱数据
- """
- result = {}
-
- with connect_graph().session() as session:
- # 查询起始模型节点
- start_node_query = """
- MATCH (n:data_model)
- WHERE id(n) = $nodeId
- RETURN n.name as name, n.en_name as en_name
- """
-
- start_result = session.run(start_node_query, nodeId=nodeid)
- start_record = start_result.single()
-
- if not start_record:
- return {"nodes": [], "lines": []}
-
- # 查询影响模型的数据资源
- resource_query = """
- MATCH (n:data_model)
- WHERE id(n) = $nodeId
- MATCH p = (n)-[:children]->(resource:data_resource)
- RETURN resource
- """
-
- resource_result = session.run(resource_query, nodeId=nodeid)
-
- nodes = [{"id": str(nodeid), "text": start_record['name'], "type": "model"}]
- lines = []
-
- # 处理资源节点
- for record in resource_result:
- if 'resource' in record:
- resource = record['resource']
- resource_id = str(resource.id)
- resource_name = resource.get('name', '')
- resource_en_name = resource.get('en_name', '')
-
- # 创建资源节点
- resource_node = {
- "id": resource_id,
- "text": resource_name,
- "type": "resource"
- }
- nodes.append(resource_node)
-
- # 创建资源到模型的关系
- line = {
- "from": str(nodeid),
- "to": resource_id,
- "text": "resource"
- }
- lines.append(line)
- # 处理元数据节点
- if meta:
- meta_query = """
- MATCH (n:data_model)
- WHERE id(n) = $nodeId and labels(m) <> ['meta_node']
- MATCH p = (n)-[:meta]->(meta:meta_node)
- RETURN meta
- """
- meta_result = session.run(meta_query, nodeId=nodeid)
-
- for record in meta_result:
- if 'meta' in record:
- meta_node = record['meta']
- meta_id = str(meta.id)
- meta_name = meta.get('name', '')
- meta_en_name = meta.get('en_name', '')
-
- # 创建元数据节点
- meta_node = {
- "id": meta_id,
- "text": meta_name,
- "type": "meta"
- }
- nodes.append(meta_node)
-
- # 创建模型到元数据的标签关系
- tag_line = {
- "from": str(nodeid),
- "to": meta_id,
- "text": "component"
- }
- lines.append(tag_line)
- # 构建结果
- result = {
- "nodes": nodes,
- "lines": lines
- }
- return result
- # 数据模型全部图谱
- def model_all_graph(nodeid, meta=False):
- """
- 生成数据模型的所有关系图谱
-
- Args:
- nodeid: 节点ID
- meta: 是否包含元数据
-
- Returns:
- dict: 包含节点和连线信息的图谱数据
- """
- result = {}
-
- with connect_graph().session() as session:
- # 查询起始模型节点
- start_node_query = """
- MATCH (n:data_model)
- WHERE id(n) = $nodeId
- RETURN n.name as name, n.en_name as en_name
- """
-
- start_result = session.run(start_node_query, nodeId=nodeid)
- start_record = start_result.single()
-
- if not start_record:
- return {"nodes": [], "lines": []}
-
- # 查询与模型关联的数据资源
- resource_query = """
- MATCH (n:data_model)
- WHERE id(n) = $nodeId
- MATCH p = (n)-[:children]->(resource:data_resource)
- RETURN resource
- """
-
- resource_result = session.run(resource_query, nodeId=nodeid)
-
- # 查询与模型关联的元数据
- meta_query = """
- MATCH (n:data_model)
- WHERE id(n) = $nodeId and labels(m) <> ['meta_node']
- MATCH p = (n)-[:meta]->(meta:meta_node)
- RETURN meta
- """
-
- nodes = [{"id": str(nodeid), "text": start_record['name'], "type": "model"}]
- lines = []
-
- # 处理资源节点
- for record in resource_result:
- if 'resource' in record:
- resource = record['resource']
- resource_id = str(resource.id)
- resource_name = resource.get('name', '')
- resource_en_name = resource.get('en_name', '')
-
- # 创建资源节点
- resource_node = {
- "id": resource_id,
- "text": resource_name,
- "type": "resource"
- }
- nodes.append(resource_node)
-
- # 创建资源到模型的关系
- line = {
- "from": str(nodeid),
- "to": resource_id,
- "text": "resource"
- }
- lines.append(line)
- # 处理元数据节点
- if meta:
- meta_result = session.run(meta_query, nodeId=nodeid)
-
- for record in meta_result:
- if 'meta' in record:
- meta_node = record['meta']
- meta_id = str(meta.id)
- meta_name = meta.get('name', '')
- meta_en_name = meta.get('en_name', '')
-
- # 创建元数据节点
- meta_node = {
- "id": meta_id,
- "text": meta_name,
- "type": "meta"
- }
- nodes.append(meta_node)
-
- # 创建模型到元数据的标签关系
- tag_line = {
- "from": str(nodeid),
- "to": meta_id,
- "text": "component"
- }
- lines.append(tag_line)
- # 构建结果
- result = {
- "nodes": nodes,
- "lines": lines
- }
- return result
- # 更新数据模型
- def data_model_edit(receiver):
- """
- 更新数据模型
-
- Args:
- receiver: 接收到的请求参数
-
- Returns:
- 更新结果
- """
- id = receiver.get('id')
- name = receiver.get('name')
- en_name = receiver.get('en_name')
- category = receiver.get('category')
- description = receiver.get('description')
- tag = receiver.get('tag')
-
- # 更新数据模型节点
- query = """
- MATCH (n:data_model) WHERE id(n) = $id
- SET n.name = $name, n.en_name = $en_name, n.category = $category, n.description = $description
- RETURN n
- """
-
- with connect_graph().session() as session:
- result = session.run(query, id=id, name=name, en_name=en_name,
- category=category, description=description).data()
-
- # 处理标签关系
- if tag:
- # 先删除所有标签关系
- delete_query = """
- MATCH (n:data_model)-[r:label]->() WHERE id(n) = $id
- DELETE r
- """
- with connect_graph().session() as session:
- session.run(delete_query, id=id)
-
- # 再创建新的标签关系
- tag_node = get_node_by_id('data_label', tag)
- if tag_node:
- model_node = get_node_by_id_no_label(id)
- if model_node and not relationship_exists(model_node, 'label', tag_node):
- with connect_graph().session() as session:
- session.execute_write(
- lambda tx: tx.run(
- "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:label]->(b)",
- a_id=model_node.id, b_id=tag_node.id
- )
- )
-
- return {"message": "数据模型更新成功"}
|