model.py 36 KB


  1. """
  2. 数据模型核心业务逻辑模块
  3. 本模块包含了数据模型相关的所有核心业务逻辑函数,包括:
  4. - 数据模型的创建、更新、删除
  5. - 数据模型与数据资源、元数据之间的关系处理
  6. - 数据模型血缘关系管理
  7. - 数据模型图谱生成
  8. - 数据模型层级计算等功能
  9. """
  10. import math
  11. import threading
  12. from concurrent.futures import ThreadPoolExecutor
  13. import pandas as pd
  14. from py2neo import Relationship
  15. import logging
  16. import json
  17. from app.core.graph.graph_operations import relationship_exists
  18. from app.core.graph.graph_operations import connect_graph,create_or_get_node,get_node
  19. from app.services.neo4j_driver import neo4j_driver
  20. from app.core.meta_data import get_formatted_time, handle_id_unstructured
  21. from app.core.common import delete_relationships, update_or_create_node, get_node_by_id_no_label
  22. from app.core.data_resource.resource import get_node_by_id
  23. # 根据child关系计算数据模型当前的level自动保存
  24. def calculate_model_level(id):
  25. """
  26. 根据child关系计算数据模型当前的level并自动保存
  27. Args:
  28. id: 数据模型的节点ID(字符串)
  29. Returns:
  30. None
  31. """
  32. # 确保id是字符串类型
  33. node_id = str(id) if id is not None else None
  34. cql = """
  35. MATCH (start_node:data_model)
  36. WHERE elementId(start_node) = $nodeId
  37. CALL {
  38. WITH start_node
  39. OPTIONAL MATCH path = (start_node)-[:child*]->(end_node)
  40. RETURN length(path) AS level
  41. }
  42. WITH coalesce(max(level), 0) AS max_level
  43. RETURN max_level
  44. """
  45. with connect_graph().session() as session:
  46. result = session.run(cql, nodeId=node_id)
  47. record = result.single()
  48. data = record["max_level"] if record and "max_level" in record else 0
  49. # 更新level属性
  50. update_query = """
  51. MATCH (n:data_model)
  52. WHERE elementId(n) = $nodeId
  53. SET n.level = $level
  54. RETURN n
  55. """
  56. session.run(update_query, nodeId=node_id, level=data)
  57. # 处理数据模型血缘关系
  58. def handle_model_relation(resource_ids):
  59. """
  60. 处理数据模型血缘关系
  61. Args:
  62. resource_ids: 数据资源ID
  63. Returns:
  64. 血缘关系数据
  65. """
  66. query = """
  67. MATCH (search:data_resource)-[:connection]->(common_node:meta_node)<-[:connection]-(connect:data_resource)
  68. WHERE id(search) = $resource_Ids
  69. WITH search, connect, common_node
  70. MATCH (search)-[:connection]->(search_node:meta_node)
  71. WITH search, connect, common_node, collect(DISTINCT id(search_node)) AS search_nodes
  72. MATCH (connect)-[:connection]->(connect_node:meta_node)
  73. WITH search, connect, common_node, search_nodes, collect(DISTINCT id(connect_node)) AS connect_nodes
  74. WITH search, connect, search_nodes, connect_nodes, collect(DISTINCT id(common_node)) AS common_nodes
  75. // 剔除 search_nodes 和 connect_nodes 中包含在 common_nodes 中的内容
  76. WITH search, connect, common_nodes,
  77. [node IN search_nodes WHERE NOT node IN common_nodes] AS filtered_search_nodes,
  78. [node IN connect_nodes WHERE NOT node IN common_nodes] AS filtered_connect_nodes
  79. RETURN id(connect) as blood_resources, common_nodes,
  80. filtered_search_nodes as origin_nodes, filtered_connect_nodes as blood_nodes
  81. """
  82. with connect_graph().session() as session:
  83. result = session.run(query, resource_Ids=resource_ids)
  84. return result.data()
  85. # 创建一个数据模型节点
  86. def handle_data_model(data_model, result_list, result, receiver):
  87. """
  88. 创建一个数据模型节点
  89. Args:
  90. data_model: 数据模型名称
  91. result_list: 数据模型英文名列表
  92. result: 序列化的ID列表
  93. receiver: 接收到的请求参数
  94. Returns:
  95. tuple: (id, data_model_node)
  96. """
  97. # 添加数据资源 血缘关系的字段 blood_resource
  98. data_model_en = result_list[0] if result_list and len(result_list) > 0 else ""
  99. receiver['id_list'] = result
  100. add_attribute = {
  101. 'time': get_formatted_time(),
  102. 'en_name': data_model_en
  103. }
  104. receiver.update(add_attribute)
  105. data_model_node = get_node('data_model', name=data_model) or create_or_get_node('data_model', **receiver)
  106. child_list = receiver.get('childrenId', [])
  107. for child_id in child_list:
  108. child_node = get_node_by_id_no_label(child_id)
  109. if child_node and not relationship_exists(data_model_node, 'child', child_node):
  110. with connect_graph().session() as session:
  111. relationship = Relationship(data_model_node, 'child', child_node)
  112. session.execute_write(
  113. lambda tx: tx.run(
  114. "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:child]->(b)",
  115. a_id=data_model_node.id, b_id=child_node.id
  116. )
  117. )
  118. # 根据传入参数id,和数据标签建立关系
  119. if receiver.get('tag'):
  120. # 使用 Cypher 查询通过 id 查找节点
  121. tag = get_node_by_id('data_label', receiver['tag'])
  122. if tag and not relationship_exists(data_model_node, 'label', tag):
  123. with connect_graph().session() as session:
  124. session.execute_write(
  125. lambda tx: tx.run(
  126. "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:label]->(b)",
  127. a_id=data_model_node.id, b_id=tag.id
  128. )
  129. )
  130. # 获取节点ID
  131. node_id = None
  132. if hasattr(data_model_node, 'id'):
  133. node_id = data_model_node.id
  134. else:
  135. # 如果节点没有id属性,尝试通过查询获取
  136. query = """
  137. MATCH (n:data_model {name: $name})
  138. RETURN id(n) as node_id
  139. """
  140. with connect_graph().session() as session:
  141. result = session.run(query, name=data_model)
  142. record = result.single()
  143. if record and "node_id" in record:
  144. node_id = record["node_id"]
  145. return node_id, data_model_node
  146. # (从数据资源中选取)
  147. def resource_handle_meta_data_model(id_lists, data_model_node_id):
  148. """
  149. 处理从数据资源中选取的数据模型与元数据的关系
  150. Args:
  151. id_lists: ID列表
  152. data_model_node_id: 数据模型节点ID
  153. Returns:
  154. None
  155. """
  156. # 构建meta_id和resouce_id的列表
  157. resouce_ids = [record['resource_id'] for record in id_lists]
  158. meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']]
  159. metaData = [record['data_standard'] for id_list in id_lists for record in id_list['metaData']]
  160. # 创建与meta_node的关系 组成关系
  161. if meta_ids:
  162. query = """
  163. MATCH (source:data_model), (target:meta_node)
  164. WHERE id(source)=$source_id AND id(target) IN $target_ids
  165. MERGE (source)-[:component]->(target)
  166. """
  167. with neo4j_driver.get_session() as session:
  168. session.run(query, source_id=data_model_node_id, target_ids=meta_ids)
  169. # 创建与data_resource的关系 资源关系
  170. if resouce_ids:
  171. query = """
  172. MATCH (source:data_model), (target:data_resource)
  173. WHERE id(source)=$source_id AND id(target) IN $target_ids
  174. MERGE (source)-[:resource]->(target)
  175. """
  176. with neo4j_driver.get_session() as session:
  177. session.run(query, source_id=data_model_node_id, target_ids=resouce_ids)
  178. # (从数据模型中选取)
  179. def model_handle_meta_data_model(id_lists, data_model_node_id):
  180. """
  181. 处理从数据模型中选取的数据模型与元数据的关系
  182. Args:
  183. id_lists: ID列表
  184. data_model_node_id: 数据模型节点ID
  185. Returns:
  186. None
  187. """
  188. # 构建meta_id和model_id的列表
  189. model_ids = [record['model_id'] for record in id_lists]
  190. meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']]
  191. # 创建与meta_node的关系 组成关系
  192. if meta_ids:
  193. query = """
  194. MATCH (source:data_model), (target:meta_node)
  195. WHERE id(source)=$source_id AND id(target) IN $target_ids
  196. MERGE (source)-[:component]->(target)
  197. """
  198. with neo4j_driver.get_session() as session:
  199. session.run(query, source_id=data_model_node_id, target_ids=meta_ids)
  200. # 创建与data_model的关系 模型关系
  201. if model_ids:
  202. query = """
  203. MATCH (source:data_model), (target:data_model)
  204. WHERE id(source)=$source_id AND id(target) IN $target_ids
  205. MERGE (source)-[:use]->(target)
  206. """
  207. with neo4j_driver.get_session() as session:
  208. session.run(query, source_id=data_model_node_id, target_ids=model_ids)
  209. # (从DDL中选取)
  210. def handle_no_meta_data_model(id_lists, receiver, data_model_node):
  211. """
  212. 处理从DDL中选取的没有元数据的数据模型
  213. Args:
  214. id_lists: ID列表
  215. receiver: 接收到的请求参数
  216. data_model_node: 数据模型节点
  217. Returns:
  218. None
  219. """
  220. # 构建meta_id和resouce_id的列表
  221. resouce_ids = [record['resource_id'] for record in id_lists]
  222. meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']]
  223. # 获取数据模型节点ID
  224. data_model_node_id = None
  225. if hasattr(data_model_node, 'id'):
  226. data_model_node_id = data_model_node.id
  227. else:
  228. # 如果节点没有id属性,尝试通过查询获取
  229. query = """
  230. MATCH (n:data_model {name: $name})
  231. RETURN id(n) as node_id
  232. """
  233. with connect_graph().session() as session:
  234. result = session.run(query, name=data_model_node.get('name'))
  235. record = result.single()
  236. if record:
  237. data_model_node_id = record["node_id"]
  238. if not data_model_node_id:
  239. return
  240. # 创建与data_resource的关系 资源关系
  241. if resouce_ids:
  242. query = """
  243. MATCH (source:data_model), (target:data_resource)
  244. WHERE id(source)=$source_id AND id(target) IN $target_ids
  245. MERGE (source)-[:resource]->(target)
  246. """
  247. with connect_graph().session() as session:
  248. session.run(query, source_id=data_model_node_id, target_ids=resouce_ids)
  249. if meta_ids:
  250. meta_node_list = []
  251. for id in meta_ids:
  252. query = """
  253. MATCH (n)
  254. WHERE id(n) = $node_id
  255. RETURN n
  256. """
  257. with connect_graph().session() as session:
  258. result = session.run(query, node_id=id)
  259. if result:
  260. record = result.data()
  261. if record:
  262. meta_node_list.append(record[0]['n'])
  263. # 提取接收到的数据并创建meta_node节点
  264. meta_node = None
  265. resource_ids = []
  266. for item in id_lists:
  267. resource_id = item['resource_id']
  268. resource_ids.append(resource_id)
  269. for meta_item in item['metaData']:
  270. meta_id = meta_item['id']
  271. data_standard = meta_item.get('data_standard', '')
  272. en_name_zh = meta_item.get('en_name_zh', '')
  273. data_name = meta_item.get('data_name', '')
  274. # 使用传递的参数创建meta_node节点
  275. meta_params = {
  276. 'name': data_name,
  277. 'cn_name': en_name_zh,
  278. 'standard': data_standard,
  279. 'time': get_formatted_time()
  280. }
  281. # 创建meta_node节点
  282. meta_node = create_or_get_node('meta_node', **meta_params)
  283. # 创建与data_model的关系
  284. if meta_node and not relationship_exists(data_model_node, 'component', meta_node):
  285. with connect_graph().session() as session:
  286. session.execute_write(
  287. lambda tx: tx.run(
  288. "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:component]->(b)",
  289. a_id=data_model_node.id, b_id=meta_node.id
  290. )
  291. )
  292. # 定义查询数据模型详情的Cypher查询
  293. def type_cql_query():
  294. """
  295. 构建获取数据模型详情的Cypher查询
  296. Returns:
  297. 查询语句
  298. """
  299. query = """
  300. MATCH (n:data_model)
  301. WHERE elementId(n) = $nodeId
  302. // 获取元数据节点, 数据模型
  303. WITH n
  304. OPTIONAL MATCH (n)-[:connection]->(a:meta_node)
  305. // 获取数据标准
  306. OPTIONAL MATCH (n)-[:clean_model]-(d:data_standard)-[:clean_model]->(a)
  307. // 获取数据标签
  308. OPTIONAL MATCH (n)-[:label]->(la:data_label)
  309. // 获取子节点关系
  310. OPTIONAL MATCH (n)-[:child]->(child_node:data_model)
  311. // 收集元数据信息,注意ma变量未定义,需要修复
  312. OPTIONAL MATCH (a)-[:master_data]->(ma:master_data)
  313. WITH n, a, d, la, collect(DISTINCT {id: elementId(child_node), name: child_node.name}) AS childrenId, ma
  314. // 收集元数据信息并排序
  315. WITH n, collect(DISTINCT {
  316. id: elementId(a),
  317. name: COALESCE(a.name, ""),
  318. en_name: COALESCE(a.en_name, ""),
  319. data_type: COALESCE(a.data_type, ""),
  320. master_data: CASE
  321. WHEN ma IS NOT NULL THEN {id: elementId(ma), name: COALESCE(ma.name, "")}
  322. ELSE NULL
  323. END,
  324. data_standard: CASE
  325. WHEN d IS NOT NULL THEN {id: elementId(d), name: COALESCE(d.name, "")}
  326. ELSE NULL
  327. END
  328. }) AS meta_ids,
  329. properties(n) AS properties,
  330. CASE
  331. WHEN la IS NOT NULL THEN {id: elementId(la), name: COALESCE(la.name, "")}
  332. ELSE NULL
  333. END AS tag,
  334. childrenId
  335. // 对 meta_ids 进行排序
  336. UNWIND meta_ids AS meta_id
  337. WITH n, tag, properties, childrenId, meta_id
  338. ORDER BY meta_id.id
  339. WITH n, tag, properties, childrenId, collect(meta_id) AS sorted_meta_ids
  340. // 构建结果集
  341. WITH [{data_resource: null, resource_id: null, meta_ids: sorted_meta_ids}] AS resources,
  342. elementId(n) as nodeid, tag, properties, n, childrenId
  343. UNWIND resources as resource
  344. WITH nodeid, collect(resource) as results, tag, properties, n, childrenId
  345. // 合并结果集
  346. RETURN results, tag, properties, childrenId
  347. """
  348. return query
  349. # 数据模型编辑接口
  350. def handle_id_model(id):
  351. """
  352. 获取数据模型详情
  353. Args:
  354. id: 数据模型ID (字符串)
  355. Returns:
  356. 数据模型详情
  357. """
  358. # 获取数据模型的名称,元数据名称,对应选中的数据资源名称
  359. query = type_cql_query()
  360. # 确保id参数为字符串类型并进行日志输出
  361. node_id = str(id) if id is not None else None
  362. print(f"Querying data model with elementId: {node_id}")
  363. with connect_graph().session() as session:
  364. try:
  365. result = session.run(query, nodeId=node_id)
  366. data_ = result.data()
  367. print(f"Query result: {data_}")
  368. if not data_:
  369. print(f"No data found for elementId: {node_id}")
  370. return {"data_model": {}}
  371. res_list = []
  372. properties = {}
  373. for record in data_:
  374. if 'results' in record:
  375. res_list = record['results']
  376. if 'properties' in record:
  377. properties = record['properties']
  378. if 'tag' in record and record['tag'] is not None:
  379. properties['tag'] = record['tag']
  380. if 'childrenId' in record and record['childrenId'] is not None:
  381. properties['childrenId'] = record['childrenId']
  382. # 处理id值,确保是字符串格式
  383. if 'id' in properties and properties['id'] is not None:
  384. properties['id'] = str(properties['id'])
  385. # 处理tag中的id
  386. if 'tag' in properties and properties['tag'] is not None and 'id' in properties['tag']:
  387. properties['tag']['id'] = str(properties['tag']['id'])
  388. # 处理childrenId列表中的id
  389. if 'childrenId' in properties and properties['childrenId']:
  390. for child in properties['childrenId']:
  391. if 'id' in child:
  392. child['id'] = str(child['id'])
  393. properties.pop('id_list', None)
  394. if 'tag' not in properties:
  395. properties['tag'] = None
  396. if 'describe' not in properties:
  397. properties['describe'] = None
  398. # 处理结果中的id值为字符串
  399. if res_list:
  400. for res in res_list:
  401. if 'resource_id' in res and res['resource_id'] is not None:
  402. res['resource_id'] = str(res['resource_id'])
  403. if 'meta_ids' in res:
  404. for meta in res['meta_ids']:
  405. if 'id' in meta:
  406. meta['id'] = str(meta['id'])
  407. if 'data_standard' in meta and meta['data_standard'] and 'id' in meta['data_standard']:
  408. meta['data_standard']['id'] = str(meta['data_standard']['id'])
  409. if 'master_data' in meta and meta['master_data'] and 'id' in meta['master_data']:
  410. meta['master_data']['id'] = str(meta['master_data']['id'])
  411. res_dict = {"resource_selected": res_list}
  412. merged_dict = {**res_dict, **properties}
  413. response_data = {"data_model": merged_dict}
  414. return response_data
  415. except Exception as e:
  416. print(f"Error in handle_id_model: {str(e)}")
  417. import traceback
  418. traceback.print_exc()
  419. return {"data_model": {}}
  420. # 数据模型详情
  421. '''
  422. def handle_id_model(model_id):
  423. """
  424. 获取数据模型详情
  425. Args:
  426. model_id: 数据模型ID
  427. Returns:
  428. 数据模型详情
  429. """
  430. model_detail_query = """
  431. MATCH (n:data_model) WHERE id(n) = $model_id
  432. RETURN n
  433. """
  434. with connect_graph().session() as session:
  435. model_detail_result = session.run(model_detail_query, model_id=model_id).data()
  436. if not model_detail_result:
  437. return None
  438. model_detail = model_detail_result[0]['n']
  439. model_info = dict(model_detail)
  440. model_info['id'] = model_id
  441. # 获取data_model节点连接的resource节点
  442. resource_query = """
  443. MATCH (n:data_model)-[:resource]->(r:data_resource) WHERE id(n) = $model_id
  444. RETURN r
  445. """
  446. resource_result = session.run(resource_query, model_id=model_id).data()
  447. resources = []
  448. for item in resource_result:
  449. if 'r' in item and hasattr(item['r'], 'id'):
  450. resource = dict(item['r'])
  451. resource['id'] = item['r'].id
  452. resources.append(resource)
  453. model_info['resources'] = resources
  454. # 获取data_model节点连接的component节点
  455. component_query = """
  456. MATCH (n:data_model)-[:component]->(m:meta_node) WHERE id(n) = $model_id
  457. RETURN m
  458. """
  459. component_result = session.run(component_query, model_id=model_id).data()
  460. components = []
  461. for item in component_result:
  462. if 'm' in item and hasattr(item['m'], 'id'):
  463. component = dict(item['m'])
  464. component['id'] = item['m'].id
  465. components.append(component)
  466. model_info['components'] = components
  467. # 获取data_model节点连接的use节点
  468. use_query = """
  469. MATCH (n:data_model)-[:use]->(u:data_model) WHERE id(n) = $model_id
  470. RETURN u
  471. """
  472. use_result = session.run(use_query, model_id=model_id).data()
  473. uses = []
  474. for item in use_result:
  475. if 'u' in item and hasattr(item['u'], 'id'):
  476. use = dict(item['u'])
  477. use['id'] = item['u'].id
  478. uses.append(use)
  479. model_info['uses'] = uses
  480. # 获取data_model节点连接的标签
  481. tag_query = """
  482. MATCH (n:data_model)-[:label]->(t:data_label) WHERE id(n) = $model_id
  483. RETURN t
  484. """
  485. tag_result = session.run(tag_query, model_id=model_id).data()
  486. if tag_result and 't' in tag_result[0] and hasattr(tag_result[0]['t'], 'id'):
  487. tag = dict(tag_result[0]['t'])
  488. tag['id'] = tag_result[0]['t'].id
  489. model_info['tag'] = tag
  490. return model_info
  491. '''
  492. # 数据模型列表
  493. def model_list(skip_count, page_size, en_name_filter=None, name_filter=None,
  494. category=None, tag=None, level=None):
  495. """
  496. 获取数据模型列表
  497. Args:
  498. skip_count: 跳过的记录数量
  499. page_size: 每页记录数量
  500. en_name_filter: 英文名称过滤条件
  501. name_filter: 名称过滤条件
  502. category: 分类过滤条件
  503. tag: 标签过滤条件
  504. level: 级别过滤条件
  505. Returns:
  506. tuple: (数据列表, 总记录数)
  507. """
  508. # 构建查询条件
  509. params = {}
  510. match_clause = "MATCH (n:data_model)"
  511. where_clause = []
  512. if tag:
  513. match_clause = "MATCH (n:data_model)-[:label]->(t:data_label)"
  514. where_clause.append("elementId(t) = $tag")
  515. params['tag'] = str(tag)
  516. if name_filter:
  517. where_clause.append("n.name =~ $name_filter")
  518. params['name_filter'] = f"(?i).*{name_filter}.*"
  519. if en_name_filter:
  520. where_clause.append("n.en_name =~ $en_name_filter")
  521. params['en_name_filter'] = f"(?i).*{en_name_filter}.*"
  522. if category:
  523. where_clause.append("n.category = $category")
  524. params['category'] = category
  525. if level:
  526. where_clause.append("n.level = $level")
  527. params['level'] = level
  528. # 转换为字符串形式
  529. where_str = " AND ".join(where_clause)
  530. if where_str:
  531. where_str = "WHERE " + where_str
  532. # 获取数据总数
  533. count_query = f"""
  534. {match_clause}
  535. {where_str}
  536. RETURN count(n) as count
  537. """
  538. # 使用正确的session方式执行查询
  539. driver = connect_graph()
  540. if not driver:
  541. return [], 0
  542. with driver.session() as session:
  543. count_result = session.run(count_query, **params)
  544. count = count_result.single()["count"]
  545. # 获取分页数据
  546. params['skip'] = skip_count
  547. params['limit'] = page_size
  548. data_query = f"""
  549. {match_clause}
  550. {where_str}
  551. OPTIONAL MATCH (n)-[:label]->(t:data_label)
  552. WITH n, t
  553. OPTIONAL MATCH (n)-[:component]->(m:meta_node)
  554. RETURN
  555. elementId(n) as id,
  556. n.name as name,
  557. n.en_name as en_name,
  558. n.category as category,
  559. n.description as description,
  560. n.time as time,
  561. n.level as level,
  562. t.name as tag_name,
  563. elementId(t) as tag_id,
  564. count(m) as component_count
  565. ORDER BY n.time DESC
  566. SKIP $skip
  567. LIMIT $limit
  568. """
  569. result = session.run(data_query, **params)
  570. data = result.data()
  571. # 确保结果中的ID是字符串格式
  572. for item in data:
  573. if 'id' in item:
  574. item['id'] = str(item['id'])
  575. if 'tag_id' in item and item['tag_id'] is not None:
  576. item['tag_id'] = str(item['tag_id'])
  577. return data, count
  578. # 有血缘关系的数据资源列表
  579. def model_resource_list(skip_count, page_size, name_filter=None, id=None,
  580. category=None, time=None):
  581. """
  582. 获取有血缘关系的数据资源列表
  583. Args:
  584. skip_count: 跳过的记录数量
  585. page_size: 每页记录数量
  586. name_filter: 名称过滤条件
  587. id: 数据资源ID
  588. category: 分类过滤条件
  589. time: 时间过滤条件
  590. Returns:
  591. tuple: (数据列表, 总记录数)
  592. """
  593. # 构建查询条件
  594. params = {'id': id}
  595. where_clause = []
  596. if name_filter:
  597. where_clause.append("n.name =~ $name_filter")
  598. params['name_filter'] = f"(?i).*{name_filter}.*"
  599. if category:
  600. where_clause.append("n.category = $category")
  601. params['category'] = category
  602. if time:
  603. where_clause.append("n.time >= $time")
  604. params['time'] = time
  605. # 转换为字符串形式
  606. where_str = " AND ".join(where_clause)
  607. if where_str:
  608. where_str = "WHERE " + where_str
  609. # 获取数据总数
  610. count_query = f"""
  611. MATCH (search:data_resource) WHERE id(search) = $id
  612. MATCH (search)-[:connection]->(mn:meta_node)<-[:connection]-(n:data_resource)
  613. {where_str}
  614. RETURN count(DISTINCT n) as count
  615. """
  616. # 使用正确的session方式执行查询
  617. driver = connect_graph()
  618. if not driver:
  619. return [], 0
  620. with driver.session() as session:
  621. count_result = session.run(count_query, **params)
  622. count = count_result.single()["count"]
  623. # 获取分页数据
  624. params['skip'] = skip_count
  625. params['limit'] = page_size
  626. data_query = f"""
  627. MATCH (search:data_resource) WHERE id(search) = $id
  628. MATCH (search)-[:connection]->(mn:meta_node)<-[:connection]-(n:data_resource)
  629. {where_str}
  630. WITH DISTINCT n, mn
  631. RETURN
  632. id(n) as id,
  633. n.name as name,
  634. n.en_name as en_name,
  635. n.category as category,
  636. n.description as description,
  637. n.time as time,
  638. collect({{id: id(mn), name: mn.name}}) as common_meta
  639. ORDER BY n.time DESC
  640. SKIP $skip
  641. LIMIT $limit
  642. """
  643. result = session.run(data_query, **params)
  644. data = result.data()
  645. return data, count
  646. # 数据模型血缘图谱
  647. def model_kinship_graph(nodeid, meta=False):
  648. """
  649. 获取数据模型血缘图谱
  650. Args:
  651. nodeid: 节点ID(字符串)
  652. meta: 是否返回元数据
  653. Returns:
  654. 图谱数据
  655. """
  656. # 确保nodeid是字符串类型
  657. node_id = str(nodeid) if nodeid is not None else None
  658. if meta:
  659. query = """
  660. MATCH p = (n:data_model)-[r:component|resource*..3]-(m)
  661. WHERE elementId(n) = $nodeId
  662. WITH p, relationships(p) as rels
  663. RETURN p
  664. limit 300
  665. """
  666. else:
  667. query = """
  668. MATCH p = (n:data_model)-[r:resource*..3]-(m)
  669. WHERE elementId(n) = $nodeId and labels(m) <> ['meta_node']
  670. WITH p, relationships(p) as rels
  671. RETURN p
  672. limit 300
  673. """
  674. # 使用正确的session方式执行查询
  675. driver = connect_graph()
  676. if not driver:
  677. return {"nodes": [], "edges": []}
  678. with driver.session() as session:
  679. result = session.run(query, nodeId=node_id)
  680. nodes = set()
  681. relationships = set()
  682. nodes_by_id = {}
  683. for record in result:
  684. path = record["p"]
  685. for node in path.nodes:
  686. node_id_str = str(node.element_id) if hasattr(node, 'element_id') else str(node.id)
  687. if node_id_str not in nodes:
  688. node_type = list(node.labels)[0].split('_')[1]
  689. node_data = {
  690. "id": node_id_str,
  691. "text": node.get("name", ""),
  692. "type": node_type
  693. }
  694. nodes.add(node_id_str)
  695. nodes_by_id[node_id_str] = node_data
  696. for rel in path.relationships:
  697. start_id = str(rel.start_node.element_id) if hasattr(rel.start_node, 'element_id') else str(rel.start_node.id)
  698. end_id = str(rel.end_node.element_id) if hasattr(rel.end_node, 'element_id') else str(rel.end_node.id)
  699. relationship_id = f"{start_id}-{end_id}"
  700. if relationship_id not in relationships:
  701. relationship_data = {
  702. "from": start_id,
  703. "to": end_id,
  704. "text": type(rel).__name__
  705. }
  706. relationships.add(relationship_id)
  707. # 转换为所需格式
  708. return {
  709. "nodes": list(nodes_by_id.values()),
  710. "edges": [{"from": rel.split("-")[0], "to": rel.split("-")[1], "text": ""}
  711. for rel in relationships]
  712. }
  713. # 数据模型影响图谱
  714. def model_impact_graph(nodeid, meta=False):
  715. """
  716. 获取数据模型影响图谱
  717. Args:
  718. nodeid: 节点ID(字符串)
  719. meta: 是否返回元数据
  720. Returns:
  721. 图谱数据
  722. """
  723. # 确保nodeid是字符串类型
  724. node_id = str(nodeid) if nodeid is not None else None
  725. if meta:
  726. query = """
  727. MATCH p = (n:data_model)-[r:use*..3]-(m)
  728. WHERE elementId(n) = $nodeId
  729. WITH p, relationships(p) as rels
  730. RETURN p
  731. limit 300
  732. """
  733. else:
  734. query = """
  735. MATCH p = (n:data_model)-[r:use*..3]-(m)
  736. WHERE elementId(n) = $nodeId
  737. WITH p, relationships(p) as rels
  738. RETURN p
  739. limit 300
  740. """
  741. # 使用正确的session方式执行查询
  742. driver = connect_graph()
  743. if not driver:
  744. return {"nodes": [], "edges": []}
  745. with driver.session() as session:
  746. result = session.run(query, nodeId=node_id)
  747. nodes = set()
  748. relationships = set()
  749. nodes_by_id = {}
  750. for record in result:
  751. path = record["p"]
  752. for node in path.nodes:
  753. node_id_str = str(node.element_id) if hasattr(node, 'element_id') else str(node.id)
  754. if node_id_str not in nodes:
  755. node_type = list(node.labels)[0].split('_')[1]
  756. node_data = {
  757. "id": node_id_str,
  758. "text": node.get("name", ""),
  759. "type": node_type
  760. }
  761. nodes.add(node_id_str)
  762. nodes_by_id[node_id_str] = node_data
  763. for rel in path.relationships:
  764. start_id = str(rel.start_node.element_id) if hasattr(rel.start_node, 'element_id') else str(rel.start_node.id)
  765. end_id = str(rel.end_node.element_id) if hasattr(rel.end_node, 'element_id') else str(rel.end_node.id)
  766. relationship_id = f"{start_id}-{end_id}"
  767. if relationship_id not in relationships:
  768. relationship_data = {
  769. "from": start_id,
  770. "to": end_id,
  771. "text": type(rel).__name__
  772. }
  773. relationships.add(relationship_id)
  774. # 转换为所需格式
  775. return {
  776. "nodes": list(nodes_by_id.values()),
  777. "edges": [{"from": rel.split("-")[0], "to": rel.split("-")[1], "text": ""}
  778. for rel in relationships]
  779. }
  780. # 数据模型全部图谱
  781. def model_all_graph(nodeid, meta=False):
  782. """
  783. 获取数据模型全部图谱
  784. Args:
  785. nodeid: 节点ID(字符串)
  786. meta: 是否返回元数据
  787. Returns:
  788. 图谱数据
  789. """
  790. # 确保nodeid是字符串类型
  791. node_id = str(nodeid) if nodeid is not None else None
  792. if meta:
  793. query = """
  794. MATCH p = (n:data_model)-[r*..3]-(m)
  795. WHERE elementId(n) = $nodeId
  796. WITH p, relationships(p) as rels
  797. RETURN p
  798. limit 300
  799. """
  800. else:
  801. query = """
  802. MATCH p = (n:data_model)-[r*..3]-(m)
  803. WHERE elementId(n) = $nodeId and labels(m) <> ['meta_node']
  804. WITH p, relationships(p) as rels
  805. RETURN p
  806. limit 300
  807. """
  808. # 使用正确的session方式执行查询
  809. driver = connect_graph()
  810. if not driver:
  811. return {"nodes": [], "edges": []}
  812. with driver.session() as session:
  813. result = session.run(query, nodeId=node_id)
  814. nodes = set()
  815. relationships = set()
  816. nodes_by_id = {}
  817. for record in result:
  818. path = record["p"]
  819. for node in path.nodes:
  820. node_id_str = str(node.element_id) if hasattr(node, 'element_id') else str(node.id)
  821. if node_id_str not in nodes:
  822. node_type = list(node.labels)[0].split('_')[1]
  823. node_data = {
  824. "id": node_id_str,
  825. "text": node.get("name", ""),
  826. "type": node_type
  827. }
  828. nodes.add(node_id_str)
  829. nodes_by_id[node_id_str] = node_data
  830. for rel in path.relationships:
  831. start_id = str(rel.start_node.element_id) if hasattr(rel.start_node, 'element_id') else str(rel.start_node.id)
  832. end_id = str(rel.end_node.element_id) if hasattr(rel.end_node, 'element_id') else str(rel.end_node.id)
  833. relationship_id = f"{start_id}-{end_id}"
  834. if relationship_id not in relationships:
  835. relationship_data = {
  836. "from": start_id,
  837. "to": end_id,
  838. "text": type(rel).__name__
  839. }
  840. relationships.add(relationship_id)
  841. # 转换为所需格式
  842. return {
  843. "nodes": list(nodes_by_id.values()),
  844. "edges": [{"from": rel.split("-")[0], "to": rel.split("-")[1], "text": ""}
  845. for rel in relationships]
  846. }
  847. # 更新数据模型
  848. def data_model_edit(receiver):
  849. """
  850. 更新数据模型
  851. Args:
  852. receiver: 接收到的请求参数
  853. Returns:
  854. 更新结果
  855. """
  856. id = receiver.get('id')
  857. name = receiver.get('name')
  858. en_name = receiver.get('en_name')
  859. category = receiver.get('category')
  860. description = receiver.get('description')
  861. tag = receiver.get('tag')
  862. # 更新数据模型节点
  863. query = """
  864. MATCH (n:data_model) WHERE id(n) = $id
  865. SET n.name = $name, n.en_name = $en_name, n.category = $category, n.description = $description
  866. RETURN n
  867. """
  868. with connect_graph().session() as session:
  869. result = session.run(query, id=id, name=name, en_name=en_name,
  870. category=category, description=description).data()
  871. # 处理标签关系
  872. if tag:
  873. # 先删除所有标签关系
  874. delete_query = """
  875. MATCH (n:data_model)-[r:label]->() WHERE id(n) = $id
  876. DELETE r
  877. """
  878. with connect_graph().session() as session:
  879. session.run(delete_query, id=id)
  880. # 再创建新的标签关系
  881. tag_node = get_node_by_id('data_label', tag)
  882. if tag_node:
  883. model_node = get_node_by_id_no_label(id)
  884. if model_node and not relationship_exists(model_node, 'label', tag_node):
  885. with connect_graph().session() as session:
  886. session.execute_write(
  887. lambda tx: tx.run(
  888. "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:label]->(b)",
  889. a_id=model_node.id, b_id=tag_node.id
  890. )
  891. )
  892. return {"message": "数据模型更新成功"}