model.py 80 KB


  1. """
  2. 数据模型核心业务逻辑模块
  3. 本模块包含了数据模型相关的所有核心业务逻辑函数,包括:
  4. - 数据模型的创建、更新、删除
  5. - 数据模型与数据资源、元数据之间的关系处理
  6. - 数据模型血缘关系管理
  7. - 数据模型图谱生成
  8. - 数据模型层级计算等功能
  9. """
  10. import math
  11. import threading
  12. from concurrent.futures import ThreadPoolExecutor
  13. import pandas as pd
  14. from py2neo import Relationship
  15. import logging
  16. import json
  17. # Configure logger
  18. logger = logging.getLogger(__name__)
  19. from app.core.graph.graph_operations import relationship_exists
  20. from app.core.graph.graph_operations import connect_graph,create_or_get_node,get_node
  21. from app.services.neo4j_driver import neo4j_driver
  22. from app.core.meta_data import get_formatted_time, handle_id_unstructured
  23. from app.core.common import delete_relationships, update_or_create_node, get_node_by_id_no_label
  24. from app.core.data_resource.resource import get_node_by_id, serialize_node_properties
  25. # 根据child关系计算数据模型当前的level自动保存
  26. def calculate_model_level(id):
  27. """
  28. 根据child关系计算数据模型当前的level并自动保存
  29. Args:
  30. id: 数据模型的节点ID(整数)
  31. Returns:
  32. None
  33. """
  34. # 确保id是整数类型
  35. node_id = int(id) if id is not None else None
  36. cql = """
  37. MATCH (start_node:DataModel)
  38. WHERE id(start_node) = $nodeId
  39. CALL {
  40. WITH start_node
  41. OPTIONAL MATCH path = (start_node)-[:child*]->(end_node)
  42. RETURN length(path) AS level
  43. }
  44. WITH coalesce(max(level), 0) AS max_level
  45. RETURN max_level
  46. """
  47. with connect_graph().session() as session:
  48. result = session.run(cql, nodeId=node_id)
  49. record = result.single()
  50. data = record["max_level"] if record and "max_level" in record else 0
  51. # 更新level属性
  52. update_query = """
  53. MATCH (n:DataModel)
  54. WHERE id(n) = $nodeId
  55. SET n.level = $level
  56. RETURN n
  57. """
  58. with connect_graph().session() as session:
  59. session.run(update_query, nodeId=node_id, level=data)
  60. # 处理数据模型血缘关系
  61. def handle_model_relation(resource_ids):
  62. """
  63. 处理数据模型血缘关系
  64. Args:
  65. resource_ids: 数据资源ID
  66. Returns:
  67. 血缘关系数据
  68. """
  69. query = """
  70. MATCH (search:DataResource)-[:connection]->(common_node:meta_node)<-[:connection]-(connect:DataResource)
  71. WHERE id(search) = $resource_Ids
  72. WITH search, connect, common_node
  73. MATCH (search)-[:connection]->(search_node:meta_node)
  74. WITH search, connect, common_node, collect(DISTINCT id(search_node)) AS search_nodes
  75. MATCH (connect)-[:connection]->(connect_node:meta_node)
  76. WITH search, connect, common_node, search_nodes, collect(DISTINCT id(connect_node)) AS connect_nodes
  77. WITH search, connect, search_nodes, connect_nodes, collect(DISTINCT id(common_node)) AS common_nodes
  78. // 剔除 search_nodes 和 connect_nodes 中包含在 common_nodes 中的内容
  79. WITH search, connect, common_nodes,
  80. [node IN search_nodes WHERE NOT node IN common_nodes] AS filtered_search_nodes,
  81. [node IN connect_nodes WHERE NOT node IN common_nodes] AS filtered_connect_nodes
  82. RETURN id(connect) as blood_resources, common_nodes,
  83. filtered_search_nodes as origin_nodes, filtered_connect_nodes as blood_nodes
  84. """
  85. with connect_graph().session() as session:
  86. result = session.run(query, resource_Ids=resource_ids)
  87. return result.data()
  88. # 创建一个数据模型节点
  89. def handle_data_model(data_model, result_list, result, receiver):
  90. """
  91. 创建一个数据模型节点
  92. Args:
  93. data_model: 数据模型名称
  94. result_list: 数据模型英文名列表
  95. result: 序列化的ID列表
  96. receiver: 接收到的请求参数
  97. Returns:
  98. tuple: (id, data_model_node)
  99. """
  100. try:
  101. # 添加数据资源 血缘关系的字段 blood_resource
  102. data_model_en = result_list[0] if result_list and len(result_list) > 0 else ""
  103. receiver['id_list'] = result
  104. add_attribute = {
  105. 'create_time': get_formatted_time(),
  106. 'name_en': data_model_en
  107. }
  108. receiver.update(add_attribute)
  109. data_model_node = get_node('DataModel', name_zh=data_model) or create_or_get_node('DataModel', **receiver)
  110. logger.info(f"通过查询或创建节点获得节点ID111,data_model_node: {data_model_node}")
  111. # 获取节点ID,确保我们能安全地访问节点ID
  112. node_id = data_model_node
  113. if hasattr(data_model_node, 'id'):
  114. logger.info(f"通过节点ID获取节点ID222,data_model_node: {data_model_node}")
  115. node_id = data_model_node.id
  116. else:
  117. logger.info(f"通过查询节点名称获取节点ID333,data_model_node: {data_model_node}")
  118. # 如果节点没有id属性,尝试通过查询获取
  119. query = """
  120. MATCH (n:DataModel {name: $name})
  121. RETURN id(n) as node_id
  122. """
  123. with connect_graph().session() as session:
  124. result = session.run(query, name=data_model)
  125. record = result.single()
  126. logger.info(f"通过查询节点名称获取节点ID444,record: {record}")
  127. if record and "node_id" in record:
  128. logger.info(f"通过查询节点名称获取节点ID555,record: {record}")
  129. node_id = record["node_id"]
  130. # 安全地处理子节点关系
  131. child_list = receiver.get('childrenId', [])
  132. for child_id in child_list:
  133. child_node = get_node_by_id_no_label(child_id)
  134. if child_node:
  135. # 直接使用Cypher查询检查关系是否存在
  136. with connect_graph().session() as session:
  137. rel_query = """
  138. MATCH (a)-[r:child]->(b)
  139. WHERE id(a) = $start_id AND id(b) = $end_id
  140. RETURN count(r) > 0 as exists
  141. """
  142. rel_result = session.run(rel_query,
  143. start_id=int(node_id),
  144. end_id=int(child_node.id)).single()
  145. # 如果关系不存在,则创建关系
  146. if not (rel_result and rel_result["exists"]):
  147. child_node_id = child_node.id if child_node else None
  148. if child_node_id is not None:
  149. # 将变量转换为确定的int类型以避免类型检查问题
  150. child_id_int = int(child_node_id)
  151. session.execute_write(
  152. lambda tx: tx.run(
  153. "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:child]->(b)",
  154. a_id=int(node_id), b_id=child_id_int
  155. )
  156. )
  157. # 根据传入参数id,和数据标签建立关系
  158. if receiver.get('tag'):
  159. tag = get_node_by_id('DataLabel', receiver['tag'])
  160. if tag:
  161. # 直接使用Cypher查询检查关系是否存在
  162. with connect_graph().session() as session:
  163. rel_query = """
  164. MATCH (a)-[r:LABEL]->(b)
  165. WHERE id(a) = $start_id AND id(b) = $end_id
  166. RETURN count(r) > 0 as exists
  167. """
  168. rel_result = session.run(rel_query,
  169. start_id=int(node_id),
  170. end_id=int(tag.id)).single()
  171. # 如果关系不存在,则创建关系
  172. if not (rel_result and rel_result["exists"]):
  173. session.execute_write(
  174. lambda tx: tx.run(
  175. "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:LABEL]->(b)",
  176. a_id=int(node_id), b_id=int(tag.id)
  177. )
  178. )
  179. # 处理数据源关系 - 创建COME_FROM关系
  180. data_source = receiver.get('data_source')
  181. if data_source:
  182. try:
  183. # 获取数据源的标识(支持多种格式)
  184. data_source_id = None
  185. data_source_name_en = None
  186. # 1. 如果是数字(节点ID)
  187. if isinstance(data_source, (int, float)) or (isinstance(data_source, str) and data_source.isdigit()):
  188. data_source_id = int(data_source)
  189. logger.info(f"data_source 为节点ID: {data_source_id}")
  190. # 2. 如果是字典且包含name_en
  191. elif isinstance(data_source, dict) and data_source.get('name_en'):
  192. data_source_name_en = data_source['name_en']
  193. logger.info(f"data_source 为字典,提取name_en: {data_source_name_en}")
  194. # 3. 如果是字符串(name_en)
  195. elif isinstance(data_source, str):
  196. data_source_name_en = data_source
  197. logger.info(f"data_source 为字符串name_en: {data_source_name_en}")
  198. # 创建数据模型与数据源的关系
  199. with connect_graph().session() as session:
  200. if data_source_id is not None:
  201. # 使用节点ID创建关系
  202. # 首先检查数据源节点是否存在
  203. check_ds_cypher = "MATCH (b:DataSource) WHERE id(b) = $ds_id RETURN b"
  204. check_ds_result = session.run(check_ds_cypher, ds_id=data_source_id)
  205. if not check_ds_result.single():
  206. logger.warning(f"数据源节点不存在: ID={data_source_id},跳过关系创建")
  207. else:
  208. # 检查关系是否已存在
  209. rel_check_query = """
  210. MATCH (a:DataModel)-[r:COME_FROM]->(b:DataSource)
  211. WHERE id(a) = $model_id AND id(b) = $ds_id
  212. RETURN count(r) > 0 as exists
  213. """
  214. rel_check_result = session.run(rel_check_query,
  215. model_id=int(node_id),
  216. ds_id=data_source_id).single()
  217. # 如果关系不存在,则创建COME_FROM关系
  218. if not (rel_check_result and rel_check_result["exists"]):
  219. create_rel_cypher = """
  220. MATCH (a:DataModel), (b:DataSource)
  221. WHERE id(a) = $model_id AND id(b) = $ds_id
  222. CREATE (a)-[r:COME_FROM]->(b)
  223. RETURN r
  224. """
  225. session.run(create_rel_cypher,
  226. model_id=int(node_id),
  227. ds_id=data_source_id)
  228. logger.info(f"已创建数据模型与数据源的COME_FROM关系: model_id={node_id} -> data_source_id={data_source_id}")
  229. else:
  230. logger.info(f"数据模型与数据源的COME_FROM关系已存在: model_id={node_id} -> data_source_id={data_source_id}")
  231. elif data_source_name_en:
  232. # 使用name_en创建关系(兼容旧方式)
  233. # 首先检查数据源节点是否存在
  234. check_ds_cypher = "MATCH (b:DataSource) WHERE b.name_en = $ds_name_en RETURN b"
  235. check_ds_result = session.run(check_ds_cypher, ds_name_en=data_source_name_en)
  236. if not check_ds_result.single():
  237. logger.warning(f"数据源节点不存在: name_en={data_source_name_en},跳过关系创建")
  238. else:
  239. # 检查关系是否已存在
  240. rel_check_query = """
  241. MATCH (a:DataModel)-[r:COME_FROM]->(b:DataSource)
  242. WHERE id(a) = $model_id AND b.name_en = $ds_name_en
  243. RETURN count(r) > 0 as exists
  244. """
  245. rel_check_result = session.run(rel_check_query,
  246. model_id=int(node_id),
  247. ds_name_en=data_source_name_en).single()
  248. # 如果关系不存在,则创建COME_FROM关系
  249. if not (rel_check_result and rel_check_result["exists"]):
  250. create_rel_cypher = """
  251. MATCH (a:DataModel), (b:DataSource)
  252. WHERE id(a) = $model_id AND b.name_en = $ds_name_en
  253. CREATE (a)-[r:COME_FROM]->(b)
  254. RETURN r
  255. """
  256. session.run(create_rel_cypher,
  257. model_id=int(node_id),
  258. ds_name_en=data_source_name_en)
  259. logger.info(f"已创建数据模型与数据源的COME_FROM关系: model_id={node_id} -> name_en={data_source_name_en}")
  260. else:
  261. logger.info(f"数据模型与数据源的COME_FROM关系已存在: model_id={node_id} -> name_en={data_source_name_en}")
  262. else:
  263. logger.warning(f"data_source参数无效,无法识别格式: {data_source}")
  264. except Exception as e:
  265. # 数据源关系创建失败不应该中断主流程
  266. logger.error(f"处理数据源关系失败(不中断主流程): {str(e)}")
  267. # 不再抛出异常,允许主流程继续
  268. return node_id, data_model_node
  269. except Exception as e:
  270. logging.error(f"Error in handle_data_model: {str(e)}")
  271. raise
  272. # (从数据资源中选取)
  273. def resource_handle_meta_data_model(id_lists, data_model_node_id):
  274. """
  275. 处理从数据资源中选取的数据模型与元数据的关系
  276. Args:
  277. id_lists: ID列表
  278. data_model_node_id: 数据模型节点ID
  279. Returns:
  280. None
  281. """
  282. try:
  283. logger.info(f"开始处理数据模型与元数据的关系,数据模型ID: {data_model_node_id}")
  284. # 构建meta_id和resouce_id的列表
  285. resouce_ids = [record['resource_id'] for record in id_lists]
  286. meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']]
  287. logger.info(f"资源ID列表: {resouce_ids}")
  288. logger.info(f"元数据ID列表: {meta_ids}")
  289. # 创建与meta_node的关系 组成关系
  290. if meta_ids:
  291. logger.info("开始创建数据模型与元数据的关系")
  292. query = """
  293. MATCH (source:DataModel), (target:DataMeta)
  294. WHERE id(source)=$source_id AND id(target) IN $target_ids
  295. MERGE (source)-[:INCLUDES]->(target)
  296. RETURN count(*) as count
  297. """
  298. with connect_graph().session() as session:
  299. result = session.run(query, source_id=data_model_node_id, target_ids=meta_ids)
  300. result_record = result.single()
  301. count = result_record["count"] if result_record else 0
  302. logger.info(f"成功创建 {count} 个数据模型与元数据的关系")
  303. # 创建与DataResource的关系 资源关系
  304. # 不在创建Modle时创建资源关系,将资源关系创建放在数据流程创建时处理
  305. # 关系名称为DERIVED_FROM
  306. # commented by mxl 2025-06-27
  307. #
  308. # if resouce_ids:
  309. # logger.info("开始创建数据模型与数据资源的关系")
  310. # query = """
  311. # MATCH (source:DataModel), (target:DataResource)
  312. # WHERE id(source)=$source_id AND id(target) IN $target_ids
  313. # MERGE (source)-[:DERIVED_FROM]->(target)
  314. # RETURN count(*) as count
  315. # """
  316. # with connect_graph().session() as session:
  317. # result = session.run(query, source_id=data_model_node_id, target_ids=resouce_ids)
  318. # count = result.single()["count"]
  319. # logger.info(f"成功创建 {count} 个数据模型与数据资源的关系")
  320. except Exception as e:
  321. logger.error(f"处理数据模型与元数据的关系时发生错误: {str(e)}")
  322. raise
  323. # (从数据模型中选取)
  324. def model_handle_meta_data_model(id_lists, data_model_node_id):
  325. """
  326. 处理从数据模型中选取的数据模型与元数据的关系
  327. Args:
  328. id_lists: ID列表
  329. data_model_node_id: 数据模型节点ID
  330. Returns:
  331. None
  332. """
  333. # 构建meta_id和model_id的列表
  334. model_ids = [record['model_id'] for record in id_lists]
  335. meta_ids = [record['id'] for id_list in id_lists for record in id_list['metaData']]
  336. # 创建与meta_node的关系 组成关系
  337. if meta_ids:
  338. query = """
  339. MATCH (source:DataModel), (target:DataMeta)
  340. WHERE id(source)=$source_id AND id(target) IN $target_ids
  341. MERGE (source)-[:component]->(target)
  342. """
  343. with neo4j_driver.get_session() as session:
  344. session.run(query, source_id=data_model_node_id, target_ids=meta_ids)
  345. # 创建与data_model的关系 模型关系
  346. if model_ids:
  347. query = """
  348. MATCH (source:DataModel), (target:DataModel)
  349. WHERE id(source)=$source_id AND id(target) IN $target_ids
  350. MERGE (source)-[:use]->(target)
  351. """
  352. with neo4j_driver.get_session() as session:
  353. session.run(query, source_id=data_model_node_id, target_ids=model_ids)
  354. # (从DDL中选取)
  355. def handle_no_meta_data_model(id_lists, receiver, data_model_node):
  356. """
  357. 处理从DDL中选取的没有元数据的数据模型
  358. Args:
  359. id_lists: ID列表(可以为空)
  360. receiver: 接收到的请求参数
  361. data_model_node: 数据模型节点
  362. Returns:
  363. None
  364. """
  365. # DDL新增时,id_lists可能为空,提前返回
  366. if not id_lists:
  367. logger.info("id_lists为空,跳过资源关系处理")
  368. return
  369. # 构建meta_id和resouce_id的列表
  370. resouce_ids = [record['resource_id'] for record in id_lists if 'resource_id' in record]
  371. meta_ids = [record['id'] for id_list in id_lists for record in id_list.get('metaData', []) if 'id' in record]
  372. # 获取数据模型节点ID
  373. data_model_node_id = None
  374. if hasattr(data_model_node, 'id'):
  375. # data_model_node 是节点对象
  376. data_model_node_id = data_model_node.id
  377. elif isinstance(data_model_node, int):
  378. # data_model_node 直接就是整数ID
  379. data_model_node_id = data_model_node
  380. elif isinstance(data_model_node, dict):
  381. # data_model_node 是字典,尝试通过name_zh查询
  382. query = """
  383. MATCH (n:DataModel {name_zh: $name_zh})
  384. RETURN id(n) as node_id
  385. """
  386. with connect_graph().session() as session:
  387. result = session.run(query, name_zh=data_model_node.get('name_zh'))
  388. record = result.single()
  389. if record:
  390. data_model_node_id = record["node_id"]
  391. else:
  392. # 未知类型,记录警告
  393. logger.warning(f"data_model_node类型未知: {type(data_model_node)}, 值: {data_model_node}")
  394. if not data_model_node_id:
  395. return
  396. # 创建与DataResource的关系 资源关系
  397. if resouce_ids:
  398. query = """
  399. MATCH (source:DataModel), (target:DataResource)
  400. WHERE id(source)=$source_id AND id(target) IN $target_ids
  401. MERGE (source)-[:resource]->(target)
  402. """
  403. with connect_graph().session() as session:
  404. session.run(query, source_id=data_model_node_id, target_ids=resouce_ids)
  405. if meta_ids:
  406. meta_node_list = []
  407. for id in meta_ids:
  408. query = """
  409. MATCH (n)
  410. WHERE id(n) = $node_id
  411. RETURN n
  412. """
  413. with connect_graph().session() as session:
  414. result = session.run(query, node_id=id)
  415. # 必须在 session 作用域内处理结果
  416. if result:
  417. record = result.data()
  418. if record:
  419. meta_node_list.append(record[0]['n'])
  420. # 提取接收到的数据并创建meta_node节点
  421. meta_node = None
  422. resource_ids = []
  423. for item in id_lists:
  424. resource_id = item.get('resource_id')
  425. if resource_id:
  426. resource_ids.append(resource_id)
  427. for meta_item in item.get('metaData', []):
  428. meta_id = meta_item['id']
  429. data_standard = meta_item.get('data_standard', '')
  430. name_en = meta_item.get('name_en', '')
  431. name_zh = meta_item.get('name_zh', '')
  432. # 使用传递的参数创建meta_node节点
  433. meta_params = {
  434. 'name_zh': name_zh,
  435. 'name_en': name_en,
  436. 'standard': data_standard,
  437. 'create_time': get_formatted_time()
  438. }
  439. # 创建meta_node节点
  440. meta_node = create_or_get_node('DataMeta', **meta_params)
  441. # 获取数据模型节点ID
  442. dm_id = data_model_node_id if data_model_node_id is not None else data_model_node
  443. # 确保dm_id是整数类型
  444. if isinstance(dm_id, int):
  445. dm_id_int = dm_id
  446. elif isinstance(dm_id, dict):
  447. dict_dm_id = dm_id.get('id')
  448. dm_id_int = int(dict_dm_id) if dict_dm_id is not None else None
  449. elif hasattr(dm_id, 'id'):
  450. dm_id_int = int(dm_id.id)
  451. else:
  452. try:
  453. dm_id_int = int(dm_id)
  454. except (ValueError, TypeError):
  455. dm_id_int = None
  456. if meta_node and dm_id_int is not None:
  457. # 确保meta_node_id是整数类型
  458. if isinstance(meta_node, int):
  459. meta_node_id_int = meta_node
  460. elif isinstance(meta_node, dict):
  461. dict_id = meta_node.get('id')
  462. meta_node_id_int = int(dict_id) if dict_id is not None else None
  463. elif hasattr(meta_node, 'id'):
  464. meta_node_id_int = int(meta_node.id)
  465. else:
  466. try:
  467. meta_node_id_int = int(meta_node)
  468. except (ValueError, TypeError):
  469. meta_node_id_int = None
  470. if meta_node_id_int is not None:
  471. # 直接使用Cypher查询检查关系是否存在
  472. with connect_graph().session() as session:
  473. rel_query = """
  474. MATCH (a)-[r:INCLUDES]->(b)
  475. WHERE id(a) = $start_id AND id(b) = $end_id
  476. RETURN count(r) > 0 as exists
  477. """
  478. rel_result = session.run(rel_query,
  479. start_id=dm_id_int,
  480. end_id=meta_node_id_int).single()
  481. # 如果关系不存在,则创建INCLUDES关系
  482. if not (rel_result and rel_result["exists"]):
  483. session.execute_write(
  484. lambda tx: tx.run(
  485. "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:INCLUDES]->(b)",
  486. a_id=dm_id_int, b_id=meta_node_id_int
  487. )
  488. )
  489. # 数据模型-详情接口
  490. def handle_id_model(id):
  491. """
  492. 获取数据模型详情
  493. Args:
  494. id: 数据模型的节点ID
  495. Returns:
  496. dict: 包含数据模型详情的字典,格式为:
  497. {"data_model": {
  498. "resource_selected": [...],
  499. "leader": ...,
  500. "origin": ...,
  501. "frequency": ...,
  502. "childrenId": [...],
  503. "organization": ...,
  504. "name_zh": ...,
  505. "name_en": ...,
  506. "data_sensitivity": ...,
  507. "describe": ...,
  508. "tag": ...,
  509. "create_time": ...,
  510. "category": ...,
  511. "status": ...
  512. }}
  513. """
  514. node_id = id
  515. cql = """
  516. MATCH (n:DataModel) WHERE id(n) = $nodeId
  517. OPTIONAL MATCH (n)-[:INCLUDES]->(meta:DataMeta)
  518. OPTIONAL MATCH (n)-[:DERIVED_FROM]->(resource:DataResource)
  519. OPTIONAL MATCH (n)-[:LABEL]->(tag:DataLabel)
  520. OPTIONAL MATCH (uses:model_use)-[:use]->(n)
  521. OPTIONAL MATCH (n)-[:has_component]->(component)
  522. WITH n,
  523. collect(DISTINCT meta) as meta_nodes,
  524. collect(DISTINCT resource) as resources,
  525. collect(DISTINCT component) as components,
  526. collect(DISTINCT uses) as uses,
  527. collect(DISTINCT tag) as tags,
  528. CASE WHEN n.childrenId IS NOT NULL THEN n.childrenId ELSE [] END as children
  529. RETURN {
  530. // 基本信息
  531. id: id(n),
  532. name_zh: n.name_zh,
  533. name_en: n.name_en,
  534. create_time: n.create_time,
  535. describe: n.describe,
  536. category: n.category,
  537. level: n.level,
  538. tag: CASE WHEN size(tags) > 0 AND tags[0] IS NOT NULL THEN {id: id(tags[0]), name: tags[0].name} ELSE null END,
  539. // 添加其他必需字段
  540. leader: n.leader,
  541. origin: n.origin,
  542. blood_resource: n.blood_resource,
  543. frequency: n.frequency,
  544. organization: n.organization,
  545. data_sensitivity: n.data_sensitivity,
  546. status: n.status,
  547. // 子节点列表
  548. childrenId: children
  549. } AS result,
  550. // 资源列表
  551. [{
  552. data_resource: [resource IN resources WHERE resource IS NOT NULL | {
  553. id: id(resource),
  554. name_zh: resource.name_zh,
  555. name_en: resource.name_en,
  556. description: resource.description
  557. }],
  558. resource_id: [resource IN resources WHERE resource IS NOT NULL | id(resource)],
  559. meta_ids: [meta IN meta_nodes WHERE meta IS NOT NULL | {
  560. id: id(meta),
  561. name_zh: meta.name_zh,
  562. name_en: meta.name_en,
  563. data_type: meta.data_type
  564. }]
  565. }] AS resource_selected
  566. """
  567. with connect_graph().session() as session:
  568. result = session.run(cql, nodeId=node_id)
  569. # 处理查询结果
  570. record = result.single()
  571. logging.info(f"获得查询结果---------->>>{record}")
  572. if record:
  573. # 获取基本属性和资源选择列表
  574. properties = record["result"]
  575. resource_selected = record["resource_selected"]
  576. # 确保所有必需字段都有默认值,避免空值
  577. required_fields = ['tag', 'leader', 'origin', 'blood_resource',
  578. 'frequency', 'describe', 'organization', 'name_zh', 'name_en',
  579. 'data_sensitivity', 'create_time', 'category', 'status', 'childrenId']
  580. for field in required_fields:
  581. if field not in properties or properties[field] is None:
  582. if field == 'tag':
  583. properties[field] = {}
  584. elif field == 'childrenId':
  585. properties[field] = []
  586. else:
  587. properties[field] = ""
  588. # 构建最终返回格式
  589. final_data = {
  590. "resource_selected": resource_selected,
  591. **properties
  592. }
  593. return {"data_model": final_data}
  594. else:
  595. # 如果没有查询到结果,返回空的结构
  596. return {"data_model": {
  597. "resource_selected": [{"meta_ids": [], "data_resource": None, "resource_id": None}],
  598. "leader": None, "origin": None, "frequency": None, "childrenId": [],
  599. "organization": None, "name_zh": None, "name_en": None, "data_sensitivity": None,
  600. "describe": None, "tag": {}, "create_time": None, "category": None, "status": None
  601. }}
  602. # 数据模型列表
  603. def model_list(skip_count, page_size, name_en_filter=None, name_zh_filter=None,
  604. category=None, tag=None, level=None):
  605. """
  606. 获取数据模型列表
  607. Args:
  608. skip_count: 跳过的数量
  609. page_size: 页面大小
  610. name_en_filter: 英文名称过滤条件
  611. name_zh_filter: 名称过滤条件
  612. category: 类别过滤条件
  613. tag: 标签过滤条件
  614. level: 层级过滤条件
  615. Returns:
  616. tuple: (数据模型列表, 总数量)
  617. """
  618. try:
  619. # 构建where子句 - 只针对DataModel节点的过滤条件
  620. datamodel_where_clause = []
  621. params = {}
  622. if name_zh_filter is not None:
  623. datamodel_where_clause.append("n.name_zh =~ $name_zh")
  624. params['name_zh'] = f".*{name_zh_filter}.*"
  625. if name_en_filter is not None:
  626. datamodel_where_clause.append("n.name_en =~ $name_en")
  627. params['name_en'] = f".*{name_en_filter}.*"
  628. if category is not None:
  629. datamodel_where_clause.append("n.category = $category")
  630. params['category'] = category
  631. if level is not None:
  632. datamodel_where_clause.append("n.level = $level")
  633. params['level'] = level
  634. # 处理标签查询
  635. if tag is not None:
  636. # 确保tag参数是整数类型
  637. try:
  638. tag_id = int(tag)
  639. params['tag'] = tag_id
  640. except (ValueError, TypeError):
  641. logger.warning(f"Invalid tag parameter: {tag}, expected integer")
  642. return [], 0
  643. # 有标签查询条件时,需要确保标签关系存在
  644. match_clause = "MATCH (n:DataModel)-[:LABEL]->(t)"
  645. datamodel_where_clause.append("id(t) = $tag")
  646. else:
  647. # 没有标签查询条件时,先匹配DataModel,然后可选连接标签
  648. match_clause = "MATCH (n:DataModel)"
  649. # 构建DataModel节点的WHERE子句
  650. datamodel_where_str = " AND ".join(datamodel_where_clause)
  651. if datamodel_where_str:
  652. datamodel_where_str = f"WHERE {datamodel_where_str}"
  653. # 构建查询
  654. with connect_graph().session() as session:
  655. # 计算总数量
  656. if tag is not None:
  657. # 有标签查询时,直接使用标签连接
  658. count_query = f"""
  659. {match_clause}
  660. {datamodel_where_str}
  661. RETURN COUNT(DISTINCT n) AS count
  662. """
  663. else:
  664. # 无标签查询时,只计算DataModel节点
  665. count_query = f"""
  666. MATCH (n:DataModel)
  667. {datamodel_where_str}
  668. RETURN COUNT(n) AS count
  669. """
  670. logger.debug(f"Count query: {count_query}")
  671. logger.debug(f"Query parameters: {params}")
  672. count_result = session.run(count_query, **params)
  673. count_record = count_result.single()
  674. total = count_record['count'] if count_record else 0
  675. # 查询数据 - 修复OPTIONAL MATCH的笛卡尔积问题
  676. if tag is not None:
  677. # 有标签查询时,直接使用标签连接
  678. query = f"""
  679. {match_clause}
  680. {datamodel_where_str}
  681. RETURN DISTINCT
  682. id(n) as id,
  683. n.name_zh as name_zh,
  684. n.name_en as name_en,
  685. n.create_time as create_time,
  686. n.describe as describe,
  687. n.level as level,
  688. n.category as category,
  689. n.status as status,
  690. n.leader as leader,
  691. n.origin as origin,
  692. n.blood_resource as blood_resource,
  693. n.organization as organization,
  694. id(t) as tag_id,
  695. t.name_zh as tag_name
  696. ORDER BY time DESC
  697. SKIP $skip
  698. LIMIT $limit
  699. """
  700. else:
  701. # 无标签查询时,先过滤DataModel节点,然后可选连接标签
  702. query = f"""
  703. MATCH (n:DataModel)
  704. {datamodel_where_str}
  705. WITH n
  706. OPTIONAL MATCH (n)-[:LABEL]->(t)
  707. RETURN
  708. id(n) as id,
  709. n.name_zh as name_zh,
  710. n.name_en as name_en,
  711. n.create_time as create_time,
  712. n.describe as describe,
  713. n.level as level,
  714. n.category as category,
  715. n.status as status,
  716. n.leader as leader,
  717. n.origin as origin,
  718. n.blood_resource as blood_resource,
  719. n.organization as organization,
  720. id(t) as tag_id,
  721. t.name_zh as tag_name
  722. ORDER BY n.create_time DESC
  723. SKIP $skip
  724. LIMIT $limit
  725. """
  726. logger.debug(f"Main query: {query}")
  727. result = session.run(query, skip=skip_count, limit=page_size, **params)
  728. # 处理结果
  729. data = []
  730. for record in result:
  731. item = {
  732. "id": record['id'],
  733. "name_zh": record['name_zh'],
  734. "name_en": record['name_en'],
  735. "create_time": record['create_time'],
  736. "describe": record['describe'],
  737. "category": record['category'],
  738. "status": record['status'],
  739. "leader": record['leader'],
  740. "origin": record['origin'],
  741. "blood_resource": record['blood_resource'],
  742. "organization": record['organization'],
  743. "level": record['level'],
  744. "tag": {"id": record['tag_id'], "name_zh": record['tag_name']} if record['tag_id'] is not None else None
  745. }
  746. data.append(item)
  747. logger.info(f"Query returned {len(data)} items out of {total} total")
  748. return data, total
  749. except Exception as e:
  750. logger.error(f"Error in model_list: {str(e)}")
  751. import traceback
  752. traceback.print_exc()
  753. return [], 0
  754. # 有血缘关系的数据资源列表
  755. def model_resource_list(skip_count, page_size, name_zh_filter=None, id=None,
  756. category=None, create_time=None):
  757. """
  758. 获取数据模型相关的数据资源列表
  759. Args:
  760. skip_count: 跳过的数量
  761. page_size: 页面大小
  762. name_zh_filter: 名称过滤条件
  763. id: 数据模型ID
  764. category: 类别过滤条件
  765. create_time: 时间过滤条件
  766. Returns:
  767. tuple: (数据资源列表, 总数量)
  768. """
  769. try:
  770. # 构建基础查询
  771. base_query = """
  772. MATCH (n:DataModel)
  773. WHERE id(n) = $nodeId
  774. MATCH (n)-[:children]->(m:DataResource)
  775. """
  776. # 计算总数量
  777. count_query = base_query + """
  778. RETURN COUNT(m) as count
  779. """
  780. with connect_graph().session() as session:
  781. # 执行计数查询
  782. count_result = session.run(count_query, nodeId=id)
  783. count_record = count_result.single()
  784. total = count_record['count'] if count_record else 0
  785. # 使用分页和筛选条件构建主查询
  786. main_query = base_query + """
  787. MATCH (m)-[:LABEL]->(l)
  788. WHERE id(n) = $nodeId and labels(m) <> ['DataMeta']
  789. RETURN m.name_zh as name_zh,
  790. m.name_en as name_en,
  791. id(m) as id,
  792. l.name_zh as label,
  793. m.create_time as create_time,
  794. m.description as description,
  795. m.category as category
  796. ORDER BY m.create_time DESC
  797. SKIP $skip LIMIT $limit
  798. """
  799. # 执行主查询
  800. result = session.run(main_query, nodeId=id, skip=skip_count, limit=page_size)
  801. # 处理结果
  802. data = []
  803. for record in result:
  804. item = {
  805. "name_zh": record['name_zh'],
  806. "name_en": record['name_en'],
  807. "id": record['id'],
  808. "label": record['label'],
  809. "create_time": record['create_time'],
  810. "description": record['description'],
  811. "category": record['category']
  812. }
  813. data.append(item)
  814. return data, total
  815. except Exception as e:
  816. print(f"Error in model_resource_list: {str(e)}")
  817. import traceback
  818. traceback.print_exc()
  819. return [], 0
  820. # 数据模型血缘图谱
  821. def model_kinship_graph(nodeid, meta=False):
  822. """
  823. 生成数据模型的血缘关系图谱
  824. 按照DERIVED_FROM关系进行递归查找,从当前节点作为起点查找所有DERIVED_FROM关系指向的节点
  825. Args:
  826. nodeid: 节点ID
  827. meta: 是否包含元数据
  828. Returns:
  829. dict: 包含节点和连线信息的图谱数据
  830. """
  831. try:
  832. with connect_graph().session() as session:
  833. # 确保nodeid为整数
  834. try:
  835. nodeid_int = int(nodeid)
  836. except (ValueError, TypeError):
  837. logger.error(f"节点ID不是有效的整数: {nodeid}")
  838. return {"nodes": [], "lines": []}
  839. # 查询起始模型节点是否存在
  840. start_node_query = """
  841. MATCH (n:DataModel)
  842. WHERE id(n) = $nodeId
  843. RETURN n
  844. """
  845. start_result = session.run(start_node_query, nodeId=nodeid_int)
  846. start_record = start_result.single()
  847. if not start_record:
  848. logger.error(f"未找到ID为{nodeid_int}的DataModel节点")
  849. return {"nodes": [], "lines": []}
  850. # 递归查找DERIVED_FROM关系
  851. cypher = """
  852. MATCH (start:DataModel)
  853. WHERE id(start) = $nodeId
  854. MATCH path = (start)-[:DERIVED_FROM*0..]->(target)
  855. WHERE target:DataResource OR target:DataModel
  856. RETURN path
  857. """
  858. result = session.run(cypher, nodeId=nodeid_int)
  859. # 收集节点和关系
  860. nodes = {}
  861. lines = {}
  862. for record in result:
  863. # 处理路径
  864. path = record['path']
  865. logger.debug(f"处理路径,长度: {len(path)}, 节点数: {len(path.nodes)}, 关系数: {len(path.relationships)}")
  866. # 处理路径中的所有节点
  867. for node in path.nodes:
  868. node_id = int(node.id) # 直接转换为整数
  869. if node_id not in nodes:
  870. node_dict = serialize_node_properties(node)
  871. node_dict["id"] = str(node_id)
  872. node_dict["node_type"] = list(node.labels)[0] if node.labels else ""
  873. nodes[node_id] = node_dict
  874. logger.debug(f"添加节点: ID={node_id}, 标签={list(node.labels)}")
  875. # 处理路径中的所有关系
  876. for rel in path.relationships:
  877. rel_id = int(rel.id) # 直接转换为整数
  878. if rel_id not in lines:
  879. rel_dict = {
  880. "id": str(rel_id),
  881. "from": str(int(rel.start_node.id)),
  882. "to": str(int(rel.end_node.id)),
  883. "text": rel.type
  884. }
  885. lines[rel_id] = rel_dict
  886. logger.debug(f"添加关系: ID={rel_id}, 类型={rel.type}, 从{int(rel.start_node.id)}到{int(rel.end_node.id)}")
  887. # 如果需要元数据,查询INCLUDES关系
  888. if meta:
  889. meta_cypher = """
  890. MATCH (start:DataModel)-[r:INCLUDES]->(meta:DataMeta)
  891. WHERE id(start) = $nodeId
  892. RETURN start, r, meta
  893. """
  894. meta_result = session.run(meta_cypher, nodeId=nodeid_int)
  895. for meta_record in meta_result:
  896. start_node = meta_record['start']
  897. rel = meta_record['r']
  898. meta_node = meta_record['meta']
  899. # 添加元数据节点
  900. meta_node_id = int(meta_node.id)
  901. if meta_node_id not in nodes:
  902. node_dict = serialize_node_properties(meta_node)
  903. node_dict["id"] = str(meta_node_id)
  904. node_dict["node_type"] = list(meta_node.labels)[0] if meta_node.labels else ""
  905. nodes[meta_node_id] = node_dict
  906. # 添加INCLUDES关系
  907. rel_id = int(rel.id)
  908. if rel_id not in lines:
  909. rel_dict = {
  910. "id": str(rel_id),
  911. "from": str(nodeid_int),
  912. "to": str(meta_node_id),
  913. "text": rel.type
  914. }
  915. lines[rel_id] = rel_dict
  916. logger.info(f"成功获取血缘关系图谱,ID: {nodeid_int}, 节点数: {len(nodes)}, 关系数: {len(lines)}")
  917. return {
  918. "nodes": list(nodes.values()),
  919. "lines": list(lines.values())
  920. }
  921. except Exception as e:
  922. logger.error(f"获取数据模型血缘关系图谱失败: {str(e)}")
  923. import traceback
  924. logger.error(f"错误详情: {traceback.format_exc()}")
  925. return {"nodes": [], "lines": []}
  926. # 数据模型影响图谱
  927. def model_impact_graph(nodeid, meta=False):
  928. """
  929. 生成数据模型的影响关系图谱
  930. 按照DERIVED_FROM关系进行递归查找,从当前节点作为终点查找所有指向这个终点的节点
  931. Args:
  932. nodeid: 节点ID
  933. meta: 是否包含元数据
  934. Returns:
  935. dict: 包含节点和连线信息的图谱数据
  936. """
  937. try:
  938. with connect_graph().session() as session:
  939. # 确保nodeid为整数
  940. try:
  941. nodeid_int = int(nodeid)
  942. except (ValueError, TypeError):
  943. logger.error(f"节点ID不是有效的整数: {nodeid}")
  944. return {"nodes": [], "lines": []}
  945. # 查询起始模型节点是否存在
  946. start_node_query = """
  947. MATCH (n:DataModel)
  948. WHERE id(n) = $nodeId
  949. RETURN n
  950. """
  951. start_result = session.run(start_node_query, nodeId=nodeid_int)
  952. start_record = start_result.single()
  953. if not start_record:
  954. logger.error(f"未找到ID为{nodeid_int}的DataModel节点")
  955. return {"nodes": [], "lines": []}
  956. # 递归查找指向当前节点的DERIVED_FROM关系
  957. cypher = """
  958. MATCH (target:DataModel)
  959. WHERE id(target) = $nodeId
  960. MATCH path = (source)-[:DERIVED_FROM*0..]->(target)
  961. WHERE source:DataResource OR source:DataModel
  962. RETURN path
  963. """
  964. result = session.run(cypher, nodeId=nodeid_int)
  965. # 收集节点和关系
  966. nodes = {}
  967. lines = {}
  968. for record in result:
  969. # 处理路径
  970. path = record['path']
  971. logger.debug(f"处理影响路径,长度: {len(path)}, 节点数: {len(path.nodes)}, 关系数: {len(path.relationships)}")
  972. # 处理路径中的所有节点
  973. for node in path.nodes:
  974. node_id = int(node.id) # 直接转换为整数
  975. if node_id not in nodes:
  976. node_dict = serialize_node_properties(node)
  977. node_dict["id"] = str(node_id)
  978. node_dict["node_type"] = list(node.labels)[0] if node.labels else ""
  979. nodes[node_id] = node_dict
  980. logger.debug(f"添加影响节点: ID={node_id}, 标签={list(node.labels)}")
  981. # 处理路径中的所有关系
  982. for rel in path.relationships:
  983. rel_id = int(rel.id) # 直接转换为整数
  984. if rel_id not in lines:
  985. rel_dict = {
  986. "id": str(rel_id),
  987. "from": str(int(rel.start_node.id)),
  988. "to": str(int(rel.end_node.id)),
  989. "text": rel.type
  990. }
  991. lines[rel_id] = rel_dict
  992. logger.debug(f"添加影响关系: ID={rel_id}, 类型={rel.type}, 从{int(rel.start_node.id)}到{int(rel.end_node.id)}")
  993. # 如果需要元数据,查询INCLUDES关系
  994. if meta:
  995. meta_cypher = """
  996. MATCH (target:DataModel)-[r:INCLUDES]->(meta:DataMeta)
  997. WHERE id(target) = $nodeId
  998. RETURN target, r, meta
  999. """
  1000. meta_result = session.run(meta_cypher, nodeId=nodeid_int)
  1001. for meta_record in meta_result:
  1002. target_node = meta_record['target']
  1003. rel = meta_record['r']
  1004. meta_node = meta_record['meta']
  1005. # 添加元数据节点
  1006. meta_node_id = int(meta_node.id)
  1007. if meta_node_id not in nodes:
  1008. node_dict = serialize_node_properties(meta_node)
  1009. node_dict["id"] = str(meta_node_id)
  1010. node_dict["node_type"] = list(meta_node.labels)[0] if meta_node.labels else ""
  1011. nodes[meta_node_id] = node_dict
  1012. # 添加INCLUDES关系
  1013. rel_id = int(rel.id)
  1014. if rel_id not in lines:
  1015. rel_dict = {
  1016. "id": str(rel_id),
  1017. "from": str(nodeid_int),
  1018. "to": str(meta_node_id),
  1019. "text": rel.type
  1020. }
  1021. lines[rel_id] = rel_dict
  1022. logger.info(f"成功获取影响关系图谱,ID: {nodeid_int}, 节点数: {len(nodes)}, 关系数: {len(lines)}")
  1023. return {
  1024. "nodes": list(nodes.values()),
  1025. "lines": list(lines.values())
  1026. }
  1027. except Exception as e:
  1028. logger.error(f"获取数据模型影响关系图谱失败: {str(e)}")
  1029. import traceback
  1030. logger.error(f"错误详情: {traceback.format_exc()}")
  1031. return {"nodes": [], "lines": []}
  1032. # 数据模型全部图谱
  1033. def model_all_graph(nodeid, meta=False):
  1034. """
  1035. 生成数据模型的所有关系图谱
  1036. 分别调用model_impact_graph查找影响关系,调用model_kinship_graph查找血缘关系,
  1037. 然后合并两部分数据返回
  1038. Args:
  1039. nodeid: 节点ID
  1040. meta: 是否包含元数据
  1041. Returns:
  1042. dict: 包含节点和连线信息的图谱数据
  1043. """
  1044. try:
  1045. # 获取血缘关系图谱
  1046. kinship_data = model_kinship_graph(nodeid, meta)
  1047. # 获取影响关系图谱
  1048. impact_data = model_impact_graph(nodeid, meta)
  1049. # 合并节点数据,使用字典去重
  1050. merged_nodes = {}
  1051. merged_lines = {}
  1052. # 添加血缘关系的节点和连线
  1053. if kinship_data and 'nodes' in kinship_data:
  1054. for node in kinship_data['nodes']:
  1055. node_id = node.get('id')
  1056. if node_id:
  1057. merged_nodes[node_id] = node
  1058. if kinship_data and 'lines' in kinship_data:
  1059. for line in kinship_data['lines']:
  1060. line_id = line.get('id')
  1061. if line_id:
  1062. merged_lines[line_id] = line
  1063. # 添加影响关系的节点和连线
  1064. if impact_data and 'nodes' in impact_data:
  1065. for node in impact_data['nodes']:
  1066. node_id = node.get('id')
  1067. if node_id:
  1068. merged_nodes[node_id] = node
  1069. if impact_data and 'lines' in impact_data:
  1070. for line in impact_data['lines']:
  1071. line_id = line.get('id')
  1072. if line_id:
  1073. merged_lines[line_id] = line
  1074. # 构建最终结果
  1075. result = {
  1076. "nodes": list(merged_nodes.values()),
  1077. "lines": list(merged_lines.values())
  1078. }
  1079. logger.info(f"成功获取完整关系图谱,ID: {nodeid}, 节点数: {len(merged_nodes)}, 关系数: {len(merged_lines)}")
  1080. return result
  1081. except Exception as e:
  1082. logger.error(f"获取数据模型完整关系图谱失败: {str(e)}")
  1083. return {"nodes": [], "lines": []}
  1084. # 更新数据模型
  1085. def data_model_edit(receiver):
  1086. """
  1087. 更新数据模型
  1088. Args:
  1089. receiver: 接收到的请求参数
  1090. Returns:
  1091. 更新结果
  1092. """
  1093. id = receiver.get('id')
  1094. name = receiver.get('name_zh')
  1095. name_en = receiver.get('name_en')
  1096. category = receiver.get('category')
  1097. describe = receiver.get('describe')
  1098. tag = receiver.get('tag')
  1099. frequency = receiver.get('frequency')
  1100. leader = receiver.get('leader')
  1101. organization = receiver.get('organization')
  1102. status = bool(receiver.get('status')) if receiver.get('status') is not None else None
  1103. meta_data = receiver.get('metaData', [])
  1104. # 更新数据模型节点 - 添加新的字段
  1105. query = """
  1106. MATCH (n:DataModel) WHERE id(n) = $id
  1107. SET n.name_zh = $name_zh,
  1108. n.name_en = $name_en,
  1109. n.category = $category,
  1110. n.describe = $describe,
  1111. n.frequency = $frequency,
  1112. n.leader = $leader,
  1113. n.organization = $organization,
  1114. n.status = $status,
  1115. n.create_time = $create_time
  1116. RETURN n
  1117. """
  1118. create_time = get_formatted_time()
  1119. with connect_graph().session() as session:
  1120. result = session.run(query,
  1121. id=id,
  1122. name_zh=name,
  1123. name_en=name_en,
  1124. category=category,
  1125. describe=describe,
  1126. frequency=frequency,
  1127. leader=leader,
  1128. organization=organization,
  1129. status=status,
  1130. create_time=create_time).data()
  1131. # 处理标签关系
  1132. if tag:
  1133. # 先删除所有标签关系
  1134. delete_query = """
  1135. MATCH (n:DataModel)-[r:LABEL]->() WHERE id(n) = $id
  1136. DELETE r
  1137. """
  1138. with connect_graph().session() as session:
  1139. session.run(delete_query, id=id)
  1140. # 再创建新的标签关系
  1141. tag_node = get_node_by_id('DataLabel', tag)
  1142. if tag_node:
  1143. model_node = get_node_by_id_no_label(id)
  1144. if model_node:
  1145. # 获取节点ID
  1146. model_id = model_node.id if hasattr(model_node, 'id') else model_node
  1147. tag_id = tag_node.id if hasattr(tag_node, 'id') else tag_node
  1148. # 直接使用Cypher查询检查关系是否存在
  1149. with connect_graph().session() as session:
  1150. rel_query = """
  1151. MATCH (a)-[r:LABEL]->(b)
  1152. WHERE id(a) = $start_id AND id(b) = $end_id
  1153. RETURN count(r) > 0 as exists
  1154. """
  1155. rel_result = session.run(rel_query,
  1156. start_id=int(model_id),
  1157. end_id=int(tag_id)).single()
  1158. # 如果关系不存在,则创建关系
  1159. if not (rel_result and rel_result["exists"]):
  1160. session.execute_write(
  1161. lambda tx: tx.run(
  1162. "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:LABEL]->(b)",
  1163. a_id=int(model_id), b_id=int(tag_id)
  1164. )
  1165. )
  1166. # 处理DataMeta节点关系更新
  1167. with connect_graph().session() as session:
  1168. # 先删除DataModel关联的所有DataMeta关系
  1169. delete_meta_query = """
  1170. MATCH (n:DataModel)-[r:INCLUDES]->(m:DataMeta)
  1171. WHERE id(n) = $id
  1172. DELETE r
  1173. """
  1174. session.run(delete_meta_query, id=id)
  1175. logger.info(f"已删除DataModel({id})的所有DataMeta关系")
  1176. # 根据上传的metaData数据是否有值来决定是否重新构建INCLUDES关系
  1177. if meta_data:
  1178. # 根据上传的metaData数据重新构建INCLUDES关系
  1179. for meta_item in meta_data:
  1180. meta_id = meta_item.get('id')
  1181. if meta_id:
  1182. try:
  1183. meta_id = int(meta_id)
  1184. # 验证DataMeta节点是否存在
  1185. check_meta_query = """
  1186. MATCH (m:DataMeta)
  1187. WHERE id(m) = $meta_id
  1188. RETURN m
  1189. """
  1190. meta_result = session.run(check_meta_query, meta_id=meta_id)
  1191. if meta_result.single():
  1192. # 创建INCLUDES关系
  1193. create_includes_query = """
  1194. MATCH (n:DataModel), (m:DataMeta)
  1195. WHERE id(n) = $model_id AND id(m) = $meta_id
  1196. CREATE (n)-[:INCLUDES]->(m)
  1197. RETURN n, m
  1198. """
  1199. session.run(create_includes_query, model_id=id, meta_id=meta_id)
  1200. logger.info(f"成功创建INCLUDES关系: DataModel({id}) -> DataMeta({meta_id})")
  1201. else:
  1202. logger.warning(f"DataMeta节点不存在,ID: {meta_id}")
  1203. except (ValueError, TypeError) as e:
  1204. logger.error(f"无效的meta_id: {meta_id}, 错误: {str(e)}")
  1205. else:
  1206. logger.info(f"meta_data为空,不需要重新创建INCLUDES关系,DataModel({id})将不关联任何DataMeta节点")
  1207. return {"message": "数据模型更新成功"}
  1208. def model_community(tag=None):
  1209. """
  1210. 查询DataModel的所有节点及DERIVED_FROM关系
  1211. Args:
  1212. tag: 可选的标签ID,如果指定则只查找有该标签的DataModel节点
  1213. Returns:
  1214. dict: 包含节点和连线信息的图谱数据,格式与model_kinship_graph相同
  1215. """
  1216. try:
  1217. with connect_graph().session() as session:
  1218. # 构建查询条件
  1219. if tag is not None:
  1220. # 确保tag参数是整数类型
  1221. try:
  1222. tag_id = int(tag)
  1223. except (ValueError, TypeError):
  1224. logger.warning(f"Invalid tag parameter: {tag}, expected integer")
  1225. return {"nodes": [], "lines": []}
  1226. # 有标签查询条件时,查询有指定标签的DataModel节点及其DERIVED_FROM关系
  1227. cypher = """
  1228. MATCH (dm:DataModel)-[:LABEL]->(t)
  1229. WHERE id(t) = $tag_id
  1230. WITH dm
  1231. MATCH path = (dm)-[:DERIVED_FROM*0..]->(target:DataModel)
  1232. RETURN path
  1233. UNION
  1234. MATCH (dm:DataModel)-[:LABEL]->(t)
  1235. WHERE id(t) = $tag_id
  1236. WITH dm
  1237. MATCH path = (source:DataModel)-[:DERIVED_FROM*0..]->(dm)
  1238. RETURN path
  1239. """
  1240. result = session.run(cypher, tag_id=tag_id)
  1241. else:
  1242. # 没有标签查询条件时,查询所有DataModel节点及其DERIVED_FROM关系
  1243. cypher = """
  1244. MATCH (dm:DataModel)
  1245. WITH dm
  1246. MATCH path = (dm)-[:DERIVED_FROM*0..]->(target:DataModel)
  1247. RETURN path
  1248. UNION
  1249. MATCH (dm:DataModel)
  1250. WITH dm
  1251. MATCH path = (source:DataModel)-[:DERIVED_FROM*0..]->(dm)
  1252. RETURN path
  1253. """
  1254. result = session.run(cypher)
  1255. # 收集节点和关系
  1256. nodes = {}
  1257. lines = {}
  1258. for record in result:
  1259. # 处理路径
  1260. path = record['path']
  1261. logger.debug(f"处理社区路径,长度: {len(path)}, 节点数: {len(path.nodes)}, 关系数: {len(path.relationships)}")
  1262. # 处理路径中的所有节点
  1263. for node in path.nodes:
  1264. node_id = int(node.id) # 直接转换为整数
  1265. if node_id not in nodes:
  1266. node_dict = serialize_node_properties(node)
  1267. node_dict["id"] = str(node_id)
  1268. node_dict["node_type"] = list(node.labels)[0] if node.labels else ""
  1269. nodes[node_id] = node_dict
  1270. logger.debug(f"添加社区节点: ID={node_id}, 标签={list(node.labels)}")
  1271. # 处理路径中的所有关系
  1272. for rel in path.relationships:
  1273. rel_id = int(rel.id) # 直接转换为整数
  1274. if rel_id not in lines:
  1275. rel_dict = {
  1276. "id": str(rel_id),
  1277. "from": str(int(rel.start_node.id)),
  1278. "to": str(int(rel.end_node.id)),
  1279. "text": rel.type
  1280. }
  1281. lines[rel_id] = rel_dict
  1282. logger.debug(f"添加社区关系: ID={rel_id}, 类型={rel.type}, 从{int(rel.start_node.id)}到{int(rel.end_node.id)}")
  1283. logger.info(f"成功获取数据模型社区图谱,标签ID: {tag}, 节点数: {len(nodes)}, 关系数: {len(lines)}")
  1284. return {
  1285. "nodes": list(nodes.values()),
  1286. "lines": list(lines.values())
  1287. }
  1288. except Exception as e:
  1289. logger.error(f"获取数据模型社区图谱失败: {str(e)}")
  1290. import traceback
  1291. logger.error(f"错误详情: {traceback.format_exc()}")
  1292. return {"nodes": [], "lines": []}
  1293. def model_search_list(model_id, page, page_size, name_en_filter=None,
  1294. name_zh_filter=None, category_filter=None, tag_filter=None):
  1295. """获取特定数据模型关联的元数据列表"""
  1296. try:
  1297. with connect_graph().session() as session:
  1298. # 确保model_id为整数
  1299. try:
  1300. model_id_int = int(model_id)
  1301. except (ValueError, TypeError):
  1302. logger.error(f"模型ID不是有效的整数: {model_id}")
  1303. return [], 0
  1304. # 基本匹配语句 - 支持DataMeta和Metadata标签
  1305. match_clause = """
  1306. MATCH (n:DataModel)-[:INCLUDES]->(m)
  1307. WHERE id(n) = $model_id
  1308. AND (m:DataMeta OR m:Metadata)
  1309. """
  1310. where_conditions = []
  1311. if name_en_filter:
  1312. where_conditions.append(f"m.name_en CONTAINS '{name_en_filter}'")
  1313. if name_zh_filter:
  1314. where_conditions.append(f"m.name_zh CONTAINS '{name_zh_filter}'")
  1315. if category_filter:
  1316. where_conditions.append(f"m.category = '{category_filter}'")
  1317. # 标签过滤需要额外的匹配
  1318. tag_match = ""
  1319. if tag_filter:
  1320. tag_match = "MATCH (m)-[:HAS_TAG]->(t:Tag) WHERE t.name_zh = $tag_filter"
  1321. where_clause = " AND " + " AND ".join(where_conditions) if where_conditions else ""
  1322. # 计算总数
  1323. count_cypher = f"""
  1324. {match_clause}{where_clause}
  1325. {tag_match}
  1326. RETURN count(m) as count
  1327. """
  1328. count_params = {"model_id": model_id_int}
  1329. if tag_filter:
  1330. count_params["tag_filter"] = tag_filter
  1331. count_result = session.run(count_cypher, count_params)
  1332. count_record = count_result.single()
  1333. total_count = count_record["count"] if count_record else 0
  1334. # 分页查询
  1335. skip = (page - 1) * page_size
  1336. cypher = f"""
  1337. {match_clause}{where_clause}
  1338. {tag_match}
  1339. RETURN m
  1340. ORDER BY m.name_zh
  1341. SKIP {skip} LIMIT {page_size}
  1342. """
  1343. result = session.run(cypher, count_params) # type: ignore[arg-type]
  1344. # 格式化结果
  1345. metadata_list = []
  1346. for record in result:
  1347. meta = serialize_node_properties(record["m"])
  1348. meta["id"] = record["m"].id
  1349. metadata_list.append(meta)
  1350. logger.info(f"成功获取数据模型关联元数据,ID: {model_id_int}, 元数据数量: {total_count}")
  1351. return metadata_list, total_count
  1352. except Exception as e:
  1353. logger.error(f"获取数据模型关联的元数据列表失败: {str(e)}")
  1354. return [], 0
  1355. def get_businessdomain_node(name_zh):
  1356. """
  1357. 查找BusinessDomain节点,需要同时满足两个条件:
  1358. 1. name_zh匹配
  1359. 2. 存在与"数据模型"标签的BELONGS_TO关系
  1360. Args:
  1361. name_zh: 业务域节点的中文名称
  1362. Returns:
  1363. 节点对象或None(如果不存在)
  1364. """
  1365. try:
  1366. with connect_graph().session() as session:
  1367. query = """
  1368. MATCH (bd:BusinessDomain)-[:BELONGS_TO]->(label:DataLabel)
  1369. WHERE bd.name_zh = $name_zh
  1370. AND (label.name_zh = '数据模型' OR label.name_en = 'data_model')
  1371. RETURN bd
  1372. LIMIT 1
  1373. """
  1374. result = session.run(query, name_zh=name_zh)
  1375. record = result.single()
  1376. if record and record.get('bd'):
  1377. logger.info(f"找到已存在的BusinessDomain节点: name_zh={name_zh}")
  1378. return record['bd']
  1379. else:
  1380. logger.info(f"未找到BusinessDomain节点: name_zh={name_zh}")
  1381. return None
  1382. except Exception as e:
  1383. logger.error(f"查询BusinessDomain节点时发生错误: {str(e)}")
  1384. return None
  1385. def handle_businessdomain_node(data_model, result_list, result, receiver, id_list):
  1386. """
  1387. 创建一个BusinessDomain业务域节点,属性和关联关系与DataModel节点一致
  1388. 额外创建与DataLabel中"数据模型"标签的BELONGS_TO关系
  1389. Args:
  1390. data_model: 数据模型名称
  1391. result_list: 数据模型英文名列表
  1392. result: 序列化的ID列表
  1393. receiver: 接收到的请求参数
  1394. id_list: ID列表(用于处理资源关系)
  1395. Returns:
  1396. tuple: (node_id, business_domain_node)
  1397. """
  1398. try:
  1399. logger.info(f"开始创建BusinessDomain节点,名称: {data_model}")
  1400. # 添加数据资源 血缘关系的字段 blood_resource
  1401. data_model_en = result_list[0] if result_list and len(result_list) > 0 else ""
  1402. # 准备BusinessDomain节点的属性(与DataModel相同)
  1403. bd_attributes = {
  1404. 'name_zh': data_model,
  1405. 'name_en': data_model_en,
  1406. 'id_list': result,
  1407. 'create_time': get_formatted_time(),
  1408. 'description': receiver.get('description', ''),
  1409. 'category': receiver.get('category', ''),
  1410. 'leader': receiver.get('leader', ''),
  1411. 'origin': receiver.get('origin', ''),
  1412. 'frequency': receiver.get('frequency', ''),
  1413. 'organization': receiver.get('organization', ''),
  1414. 'data_sensitivity': receiver.get('data_sensitivity', ''),
  1415. 'status': receiver.get('status', '')
  1416. }
  1417. # 创建BusinessDomain节点
  1418. # 使用专用函数查找,需要同时满足name_zh和BELONGS_TO关系
  1419. business_domain_node = get_businessdomain_node(data_model) or create_or_get_node('BusinessDomain', **bd_attributes)
  1420. logger.info(f"BusinessDomain节点创建成功,data: {business_domain_node}")
  1421. # 获取节点ID
  1422. node_id = business_domain_node
  1423. if hasattr(business_domain_node, 'id'):
  1424. node_id = business_domain_node.id
  1425. else:
  1426. # 如果节点没有id属性,尝试通过查询获取
  1427. query = """
  1428. MATCH (n:BusinessDomain {name_zh: $name})
  1429. RETURN id(n) as node_id
  1430. """
  1431. with connect_graph().session() as session:
  1432. result_query = session.run(query, name=data_model)
  1433. record = result_query.single()
  1434. if record and "node_id" in record:
  1435. node_id = record["node_id"]
  1436. logger.info(f"BusinessDomain节点ID: {node_id}")
  1437. # 1. 处理子节点关系(child关系)
  1438. child_list = receiver.get('childrenId', [])
  1439. if child_list:
  1440. logger.info(f"处理BusinessDomain的child关系,子节点数量: {len(child_list)}")
  1441. for child_id in child_list:
  1442. child_node = get_node_by_id_no_label(child_id)
  1443. if child_node:
  1444. with connect_graph().session() as session:
  1445. rel_query = """
  1446. MATCH (a)-[r:child]->(b)
  1447. WHERE id(a) = $start_id AND id(b) = $end_id
  1448. RETURN count(r) > 0 as exists
  1449. """
  1450. child_node_id = child_node.id if hasattr(child_node, 'id') else int(child_node)
  1451. rel_result = session.run(rel_query,
  1452. start_id=int(node_id),
  1453. end_id=int(child_node_id)).single()
  1454. if not (rel_result and rel_result["exists"]):
  1455. child_id_int = int(child_node_id)
  1456. session.execute_write(
  1457. lambda tx: tx.run(
  1458. "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:child]->(b)",
  1459. a_id=int(node_id), b_id=child_id_int
  1460. )
  1461. )
  1462. logger.info(f"创建BusinessDomain child关系: {node_id} -> {child_node_id}")
  1463. # 2. 处理标签关系(LABEL关系)
  1464. if receiver.get('tag'):
  1465. logger.info(f"处理BusinessDomain的LABEL关系,标签ID: {receiver['tag']}")
  1466. tag = get_node_by_id('DataLabel', receiver['tag'])
  1467. if tag:
  1468. with connect_graph().session() as session:
  1469. rel_query = """
  1470. MATCH (a)-[r:LABEL]->(b)
  1471. WHERE id(a) = $start_id AND id(b) = $end_id
  1472. RETURN count(r) > 0 as exists
  1473. """
  1474. rel_result = session.run(rel_query,
  1475. start_id=int(node_id),
  1476. end_id=int(tag.id)).single()
  1477. if not (rel_result and rel_result["exists"]):
  1478. session.execute_write(
  1479. lambda tx: tx.run(
  1480. "MATCH (a), (b) WHERE id(a) = $a_id AND id(b) = $b_id CREATE (a)-[:LABEL]->(b)",
  1481. a_id=int(node_id), b_id=int(tag.id)
  1482. )
  1483. )
  1484. logger.info(f"创建BusinessDomain LABEL关系: {node_id} -> {tag.id}")
  1485. # 3. 处理数据源关系(COME_FROM关系)
  1486. data_source = receiver.get('data_source')
  1487. if data_source:
  1488. logger.info(f"处理BusinessDomain的COME_FROM关系,数据源: {data_source}")
  1489. try:
  1490. data_source_id = None
  1491. data_source_name_en = None
  1492. # 获取数据源标识(支持多种格式)
  1493. if isinstance(data_source, (int, float)) or (isinstance(data_source, str) and data_source.isdigit()):
  1494. data_source_id = int(data_source)
  1495. elif isinstance(data_source, dict) and data_source.get('name_en'):
  1496. data_source_name_en = data_source['name_en']
  1497. elif isinstance(data_source, str):
  1498. data_source_name_en = data_source
  1499. # 创建BusinessDomain与数据源的关系
  1500. with connect_graph().session() as session:
  1501. if data_source_id is not None:
  1502. check_ds_cypher = "MATCH (b:DataSource) WHERE id(b) = $ds_id RETURN b"
  1503. check_ds_result = session.run(check_ds_cypher, ds_id=data_source_id)
  1504. if not check_ds_result.single():
  1505. logger.warning(f"数据源节点不存在: ID={data_source_id},跳过关系创建")
  1506. else:
  1507. rel_check_query = """
  1508. MATCH (a:BusinessDomain)-[r:COME_FROM]->(b:DataSource)
  1509. WHERE id(a) = $bd_id AND id(b) = $ds_id
  1510. RETURN count(r) > 0 as exists
  1511. """
  1512. rel_check_result = session.run(rel_check_query,
  1513. bd_id=int(node_id),
  1514. ds_id=data_source_id).single()
  1515. if not (rel_check_result and rel_check_result["exists"]):
  1516. create_rel_cypher = """
  1517. MATCH (a:BusinessDomain), (b:DataSource)
  1518. WHERE id(a) = $bd_id AND id(b) = $ds_id
  1519. CREATE (a)-[r:COME_FROM]->(b)
  1520. RETURN r
  1521. """
  1522. session.run(create_rel_cypher,
  1523. bd_id=int(node_id),
  1524. ds_id=data_source_id)
  1525. logger.info(f"创建BusinessDomain与数据源的COME_FROM关系: bd_id={node_id} -> data_source_id={data_source_id}")
  1526. elif data_source_name_en:
  1527. check_ds_cypher = "MATCH (b:DataSource {name_en: $name_en}) RETURN b"
  1528. check_ds_result = session.run(check_ds_cypher, name_en=data_source_name_en)
  1529. if not check_ds_result.single():
  1530. logger.warning(f"数据源节点不存在: name_en={data_source_name_en},跳过关系创建")
  1531. else:
  1532. rel_check_query = """
  1533. MATCH (a:BusinessDomain)-[r:COME_FROM]->(b:DataSource {name_en: $ds_name_en})
  1534. WHERE id(a) = $bd_id
  1535. RETURN count(r) > 0 as exists
  1536. """
  1537. rel_check_result = session.run(rel_check_query,
  1538. bd_id=int(node_id),
  1539. ds_name_en=data_source_name_en).single()
  1540. if not (rel_check_result and rel_check_result["exists"]):
  1541. create_rel_cypher = """
  1542. MATCH (a:BusinessDomain), (b:DataSource {name_en: $ds_name_en})
  1543. WHERE id(a) = $bd_id
  1544. CREATE (a)-[r:COME_FROM]->(b)
  1545. RETURN r
  1546. """
  1547. session.run(create_rel_cypher,
  1548. bd_id=int(node_id),
  1549. ds_name_en=data_source_name_en)
  1550. logger.info(f"创建BusinessDomain与数据源的COME_FROM关系: bd_id={node_id} -> name_en={data_source_name_en}")
  1551. else:
  1552. logger.warning(f"data_source参数无效,无法识别格式: {data_source}")
  1553. except Exception as e:
  1554. logger.error(f"创建BusinessDomain与数据源关系时发生错误: {str(e)}")
  1555. # 4. 处理与id_list中资源和元数据的关系(如果有)
  1556. if id_list:
  1557. logger.info(f"处理BusinessDomain与资源/元数据的关系,id_list数量: {len(id_list)}")
  1558. # 构建meta_id和resouce_id的列表
  1559. resouce_ids = [record['resource_id'] for record in id_list if 'resource_id' in record]
  1560. meta_ids = [record['id'] for id_list_item in id_list for record in id_list_item.get('metaData', []) if 'id' in record]
  1561. # 创建与DataResource的关系
  1562. if resouce_ids:
  1563. query = """
  1564. MATCH (source:BusinessDomain), (target:DataResource)
  1565. WHERE id(source)=$source_id AND id(target) IN $target_ids
  1566. MERGE (source)-[:resource]->(target)
  1567. """
  1568. with connect_graph().session() as session:
  1569. session.run(query, source_id=int(node_id), target_ids=resouce_ids)
  1570. logger.info(f"创建BusinessDomain与DataResource的关系,资源数量: {len(resouce_ids)}")
  1571. # 处理元数据关系
  1572. if meta_ids:
  1573. for item in id_list:
  1574. for meta_item in item.get('metaData', []):
  1575. meta_id = meta_item['id']
  1576. data_standard = meta_item.get('data_standard', '')
  1577. name_en = meta_item.get('name_en', '')
  1578. name_zh = meta_item.get('name_zh', '')
  1579. # 创建meta_node节点
  1580. meta_params = {
  1581. 'name_zh': name_zh,
  1582. 'name_en': name_en,
  1583. 'standard': data_standard,
  1584. 'create_time': get_formatted_time()
  1585. }
  1586. meta_node = create_or_get_node('DataMeta', **meta_params)
  1587. # 创建BusinessDomain与DataMeta的关系
  1588. if meta_node:
  1589. meta_node_id = meta_node.id if hasattr(meta_node, 'id') else meta_node
  1590. query = """
  1591. MATCH (source:BusinessDomain), (target:DataMeta)
  1592. WHERE id(source) = $source_id AND id(target) = $target_id
  1593. MERGE (source)-[:INCLUDES]->(target)
  1594. """
  1595. with connect_graph().session() as session:
  1596. session.run(query, source_id=int(node_id), target_id=int(meta_node_id))
  1597. logger.info(f"创建BusinessDomain与DataMeta的关系,元数据数量: {len(meta_ids)}")
  1598. # 5. 创建与DataLabel中"数据模型"标签的BELONGS_TO关系
  1599. logger.info("查找DataLabel中的'数据模型'标签")
  1600. with connect_graph().session() as session:
  1601. # 查找名称为"数据模型"的DataLabel节点
  1602. find_label_query = """
  1603. MATCH (label:DataLabel)
  1604. WHERE label.name_zh = '数据模型' OR label.name_en = 'data_model'
  1605. RETURN id(label) as label_id
  1606. LIMIT 1
  1607. """
  1608. label_result = session.run(find_label_query)
  1609. label_record = label_result.single()
  1610. if label_record:
  1611. label_id = label_record['label_id']
  1612. logger.info(f"找到'数据模型'标签,ID: {label_id}")
  1613. # 检查BELONGS_TO关系是否已存在
  1614. rel_check_query = """
  1615. MATCH (a:BusinessDomain)-[r:BELONGS_TO]->(b:DataLabel)
  1616. WHERE id(a) = $bd_id AND id(b) = $label_id
  1617. RETURN count(r) > 0 as exists
  1618. """
  1619. rel_check_result = session.run(rel_check_query,
  1620. bd_id=int(node_id),
  1621. label_id=label_id).single()
  1622. if not (rel_check_result and rel_check_result["exists"]):
  1623. # 创建BELONGS_TO关系
  1624. create_rel_query = """
  1625. MATCH (a:BusinessDomain), (b:DataLabel)
  1626. WHERE id(a) = $bd_id AND id(b) = $label_id
  1627. CREATE (a)-[r:BELONGS_TO]->(b)
  1628. RETURN r
  1629. """
  1630. session.run(create_rel_query, bd_id=int(node_id), label_id=label_id)
  1631. logger.info(f"成功创建BusinessDomain与'数据模型'标签的BELONGS_TO关系: bd_id={node_id} -> label_id={label_id}")
  1632. else:
  1633. logger.info(f"BusinessDomain与'数据模型'标签的BELONGS_TO关系已存在")
  1634. else:
  1635. logger.warning("未找到名称为'数据模型'的DataLabel节点,跳过BELONGS_TO关系创建")
  1636. logger.info(f"BusinessDomain节点创建完成,ID: {node_id}")
  1637. return node_id, business_domain_node
  1638. except Exception as e:
  1639. logger.error(f"创建BusinessDomain节点时发生错误: {str(e)}")
  1640. import traceback
  1641. logger.error(f"错误详情: {traceback.format_exc()}")
  1642. raise