interface.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. """
  2. 数据接口核心业务逻辑模块
  3. 本模块包含了数据接口相关的所有核心业务逻辑函数,包括:
  4. - 数据标准(data_standard)相关功能
  5. - 数据标签(data_label)相关功能
  6. - 图谱生成
  7. - 动态标签识别等功能
  8. """
  9. from app.core.graph.graph_operations import connect_graph
  10. # 数据标准列表展示
  11. def standard_list(skip_count, page_size, en_name_filter=None,
  12. name_filter=None, category_filter=None, time_filter=None):
  13. """
  14. 获取数据标准列表
  15. Args:
  16. skip_count: 跳过的记录数量
  17. page_size: 每页记录数量
  18. en_name_filter: 英文名称过滤条件
  19. name_filter: 名称过滤条件
  20. category_filter: 分类过滤条件
  21. time_filter: 时间过滤条件
  22. Returns:
  23. tuple: (数据列表, 总记录数)
  24. """
  25. data = []
  26. # 构建查询条件
  27. where_clause = []
  28. params = {}
  29. if name_filter:
  30. where_clause.append("n.name CONTAINS $name_filter")
  31. params['name_filter'] = name_filter
  32. if en_name_filter:
  33. where_clause.append("n.en_name CONTAINS $en_name_filter")
  34. params['en_name_filter'] = en_name_filter
  35. if category_filter:
  36. where_clause.append("n.category CONTAINS $category_filter")
  37. params['category_filter'] = category_filter
  38. if time_filter:
  39. where_clause.append("n.time CONTAINS $time_filter")
  40. params['time_filter'] = time_filter
  41. else:
  42. where_clause.append("TRUE")
  43. where_str = " AND ".join(where_clause)
  44. # 构建完整的查询语句
  45. cql = f"""
  46. MATCH (n:data_standard)
  47. WHERE {where_str}
  48. RETURN properties(n) as properties,n.time as time,id(n) as nodeid,size((n)--()) as relationship_count
  49. ORDER BY time desc
  50. SKIP $skip_count
  51. LIMIT $page_size
  52. """
  53. params['skip_count'] = skip_count
  54. params['page_size'] = page_size
  55. result = connect_graph.run(cql, **params)
  56. for record in result:
  57. properties = {
  58. key: value for key, value in record['properties'].items()
  59. if key not in ['input', 'code', 'output']
  60. }
  61. properties.setdefault("describe", None)
  62. new_attr = {
  63. 'id': record['nodeid'],
  64. 'number': record['relationship_count']
  65. }
  66. properties.update(new_attr)
  67. data.append(properties)
  68. # 获取总量
  69. total_query = f"MATCH (n:data_standard) WHERE {where_str} RETURN COUNT(n) AS total"
  70. total_result = connect_graph.run(total_query, **params).evaluate()
  71. return data, total_result
  72. # 数据标准图谱展示(血缘关系)父节点
  73. def standard_kinship_graph(nodeid):
  74. """
  75. 生成数据标准的血缘关系图谱
  76. Args:
  77. nodeid: 节点ID
  78. Returns:
  79. 图谱数据
  80. """
  81. # 查询语句
  82. cql = """
  83. MATCH(da:data_standard)
  84. WHERE id(da)=$nodeId
  85. OPTIONAL MATCH(a:data_resource)-[:clean_resource]-(da)
  86. OPTIONAL MATCH(b:data_model)-[:clean_model]-(da)
  87. WITH
  88. collect({id:toString(id(a)),text:a.name,type:split(labels(a)[0],'_')[1]})+
  89. collect({id:toString(id(b)),text:b.name,type:split(labels(b)[0],'_')[1]})+
  90. collect({id:toString(id(da)),text:da.name,type:split(labels(da)[0],'_')[1]}) as nodes,da,
  91. collect({from:toString(id(a)),to:toString(id(da)),text:'标准'})+
  92. collect({from:toString(id(b)),to:toString(id(da)),text:'标准'})as lines
  93. WITH
  94. toString(id(da)) as rootId,
  95. apoc.coll.toSet(lines) as lines,
  96. apoc.coll.toSet(nodes) as nodes
  97. RETURN nodes,lines,rootId
  98. """
  99. data = connect_graph.run(cql, nodeId=nodeid)
  100. res = {}
  101. for item in data:
  102. res = {
  103. "nodes": [record for record in item['nodes'] if record['id']],
  104. "lines": [record for record in item['lines'] if record['from'] and record['to']],
  105. "rootId": item['rootId']
  106. }
  107. return res
  108. # 数据标准图谱展示(影响关系)下游
  109. def standard_impact_graph(nodeid):
  110. """
  111. 生成数据标准的影响关系图谱
  112. Args:
  113. nodeid: 节点ID
  114. Returns:
  115. 图谱数据
  116. """
  117. # 查询语句
  118. cql = """
  119. MATCH(da:data_standard)
  120. WHERE id(da)=$nodeId
  121. OPTIONAL MATCH(da)-[:clean_model]-(m1:meta_node)
  122. OPTIONAL MATCH(da)-[:clean_model]-(m2:meta_node)
  123. WITH
  124. collect({id:toString(id(da)),text:da.name,type:split(labels(da)[0],'_')[1]})+
  125. collect({id:toString(id(m1)),text:m1.name})+
  126. collect({id:toString(id(m2)),text:m2.name})as nodes,da,
  127. collect({from:toString(id(da)),to:toString(id(m1)),text:'标准清洗'})+
  128. collect({from:toString(id(da)),to:toString(id(m2)),text:'标准清洗'})as lines
  129. WITH
  130. toString(id(da)) as rootId,
  131. apoc.coll.toSet(lines) as lines,
  132. apoc.coll.toSet(nodes) as nodes
  133. RETURN nodes,lines,rootId
  134. """
  135. data = connect_graph.run(cql, nodeId=nodeid)
  136. res = {}
  137. for item in data:
  138. res = {
  139. "nodes": [record for record in item['nodes'] if record['id']],
  140. "lines": [record for record in item['lines'] if record['from'] and record['to']],
  141. "rootId": item['rootId']
  142. }
  143. return res
  144. # 数据标准图谱展示(所有关系)
  145. def standard_all_graph(nodeid):
  146. """
  147. 生成数据标准的所有关系图谱
  148. Args:
  149. nodeid: 节点ID
  150. Returns:
  151. 图谱数据
  152. """
  153. # 查询语句
  154. cql = """
  155. MATCH(da:data_standard)
  156. WHERE id(da)=$nodeId
  157. OPTIONAL MATCH(a:data_resource)-[:clean_resource]-(da)
  158. OPTIONAL MATCH(b:data_model)-[:clean_model]-(da)
  159. OPTIONAL MATCH(da)-[:clean_model]-(m1:meta_node)
  160. OPTIONAL MATCH(da)-[:clean_model]-(m2:meta_node)
  161. WITH
  162. collect({id:toString(id(a)),text:a.name,type:split(labels(a)[0],'_')[1]})+
  163. collect({id:toString(id(b)),text:b.name,type:split(labels(b)[0],'_')[1]})+
  164. collect({id:toString(id(da)),text:da.name,type:split(labels(da)[0],'_')[1]})+
  165. collect({id:toString(id(m1)),text:m1.name})+
  166. collect({id:toString(id(m2)),text:m2.name})as nodes,da,
  167. collect({from:toString(id(a)),to:toString(id(da)),text:'标准'})+
  168. collect({from:toString(id(b)),to:toString(id(da)),text:'标准'})+
  169. collect({from:toString(id(da)),to:toString(id(m1)),text:'标准清洗'})+
  170. collect({from:toString(id(da)),to:toString(id(m2)),text:'标准清洗'})as lines
  171. WITH
  172. toString(id(da)) as rootId,
  173. apoc.coll.toSet(lines) as lines,
  174. apoc.coll.toSet(nodes) as nodes
  175. RETURN nodes,lines,rootId
  176. """
  177. data = connect_graph.run(cql, nodeId=nodeid)
  178. res = {}
  179. for item in data:
  180. res = {
  181. "nodes": [record for record in item['nodes'] if record['id']],
  182. "lines": [record for record in item['lines'] if record['from'] and record['to']],
  183. "rootId": item['rootId']
  184. }
  185. return res
  186. # 数据标签列表展示
  187. def label_list(skip_count, page_size, en_name_filter=None,
  188. name_filter=None, category_filter=None, group_filter=None):
  189. """
  190. 获取数据标签列表
  191. Args:
  192. skip_count: 跳过的记录数量
  193. page_size: 每页记录数量
  194. en_name_filter: 英文名称过滤条件
  195. name_filter: 名称过滤条件
  196. category_filter: 分类过滤条件
  197. group_filter: 分组过滤条件
  198. Returns:
  199. tuple: (数据列表, 总记录数)
  200. """
  201. data = []
  202. # 构建查询条件
  203. where_clause = []
  204. params = {}
  205. if name_filter:
  206. where_clause.append("n.name CONTAINS $name_filter")
  207. params['name_filter'] = name_filter
  208. if en_name_filter:
  209. where_clause.append("n.en_name CONTAINS $en_name_filter")
  210. params['en_name_filter'] = en_name_filter
  211. if category_filter:
  212. where_clause.append("n.category CONTAINS $category_filter")
  213. params['category_filter'] = category_filter
  214. if group_filter:
  215. where_clause.append(f"n.group CONTAINS $group_filter")
  216. params['group_filter'] = group_filter
  217. else:
  218. where_clause.append("TRUE")
  219. where_str = " AND ".join(where_clause)
  220. # 构建完整的查询语句
  221. cql = f"""
  222. MATCH (n:data_label)
  223. WHERE {where_str}
  224. RETURN properties(n) as properties,n.time as time,id(n) as nodeid,
  225. size((n)--()) as relationship_count
  226. ORDER BY time desc
  227. SKIP $skip_count
  228. LIMIT $page_size
  229. """
  230. params['skip_count'] = skip_count
  231. params['page_size'] = page_size
  232. result = connect_graph.run(cql, **params)
  233. for record in result:
  234. properties = record['properties']
  235. new_attr = {
  236. 'id': record['nodeid'],
  237. 'number': record['relationship_count']
  238. }
  239. if "describe" not in properties:
  240. properties["describe"] = None
  241. if "scope" not in properties:
  242. properties["scope"] = None
  243. properties.update(new_attr)
  244. data.append(properties)
  245. # 获取总量
  246. total_query = f"MATCH (n:data_label) WHERE {where_str} RETURN COUNT(n) AS total"
  247. total_result = connect_graph.run(total_query, **params).evaluate()
  248. return data, total_result
  249. # 数据标签图谱展示
  250. def id_label_graph(id):
  251. """
  252. 根据ID生成数据标签图谱
  253. Args:
  254. id: 节点ID
  255. Returns:
  256. 图谱数据
  257. """
  258. query = """
  259. MATCH (n:data_label)
  260. WHERE id(n) = $nodeId
  261. OPTIONAL MATCH (a)-[:label]-(n)
  262. WITH
  263. collect({from: toString(id(a)), to: toString(id(n)), text: "标签"}) AS line1,
  264. collect({id: toString(id(n)), text: n.name, type:"label"}) AS node1,
  265. collect({id: toString(id(a)), text: a.name, type: split(labels(a)[0], '_')[1]}) AS node2, n
  266. WITH apoc.coll.toSet(line1) AS lines,
  267. apoc.coll.toSet(node1 + node2) AS nodes,
  268. toString(id(n)) AS res
  269. RETURN lines, nodes, res
  270. """
  271. data = connect_graph.run(query, nodeId=id)
  272. res = {}
  273. for item in data:
  274. res = {
  275. "nodes": [record for record in item['nodes'] if record['id']],
  276. "lines": [record for record in item['lines'] if record['from'] and record['to']],
  277. "rootId": item['res'],
  278. }
  279. return res
  280. # 数据标签图谱展示(血缘关系)父节点/(所有关系)
  281. def label_kinship_graph(nodeid):
  282. """
  283. 生成数据标签的血缘关系图谱
  284. Args:
  285. nodeid: 节点ID
  286. Returns:
  287. 图谱数据
  288. """
  289. # 查询语句
  290. cql = """
  291. MATCH(la:data_label)
  292. WHERE id(la)=$nodeId
  293. OPTIONAL MATCH(a:data_resource)-[:label]-(la)
  294. OPTIONAL MATCH(b:data_model)-[:label]-(la)
  295. OPTIONAL MATCH(meta:meta_node)-[:label]-(la)
  296. OPTIONAL MATCH(d:data_standard)-[:label]-(la)
  297. OPTIONAL MATCH(e:data_metric)-[:label]-(la)
  298. WITH
  299. collect({id:toString(id(a)),text:a.name,type:split(labels(a)[0],'_')[1]})+
  300. collect({id:toString(id(b)),text:b.name,type:split(labels(b)[0],'_')[1]})+
  301. collect({id:toString(id(d)),text:d.name,type:split(labels(d)[0],'_')[1]})+
  302. collect({id:toString(id(e)),text:e.name,type:split(labels(e)[0],'_')[1]})+
  303. collect({id:toString(id(la)),text:la.name,type:split(labels(la)[0],'_')[1]})+
  304. collect({id:toString(id(meta)),text:meta.name}) as nodes,la,
  305. collect({from:toString(id(a)),to:toString(id(la)),text:'标签'})+
  306. collect({from:toString(id(b)),to:toString(id(la)),text:'标签'})+
  307. collect({from:toString(id(meta)),to:toString(id(la)),text:'标签'})+
  308. collect({from:toString(id(d)),to:toString(id(la)),text:'标签'})+
  309. collect({from:toString(id(e)),to:toString(id(la)),text:'标签'})as lines
  310. WITH
  311. toString(id(la)) as rootId,
  312. apoc.coll.toSet(lines) as lines,
  313. apoc.coll.toSet(nodes) as nodes
  314. RETURN nodes,lines,rootId
  315. """
  316. data = connect_graph.run(cql, nodeId=nodeid)
  317. res = {}
  318. for item in data:
  319. res = {
  320. "nodes": [record for record in item['nodes'] if record['id']],
  321. "lines": [record for record in item['lines'] if record['from'] and record['to']],
  322. "rootId": item['rootId']
  323. }
  324. return res
  325. # 数据标签图谱展示(影响关系)下游
  326. def label_impact_graph(nodeid):
  327. """
  328. 生成数据标签的影响关系图谱
  329. Args:
  330. nodeid: 节点ID
  331. Returns:
  332. 图谱数据
  333. """
  334. # 查询语句
  335. cql = """
  336. MATCH(n:data_label)
  337. WHERE id(n)=$nodeId
  338. RETURN {id:toString(id(n)),text:(n.name),type:"label"} AS nodes,
  339. toString(id(n)) as rootId
  340. """
  341. data = connect_graph.run(cql, nodeId=nodeid)
  342. res = {}
  343. for item in data:
  344. res = {
  345. "nodes": item['nodes'],
  346. "rootId": item['rootId'],
  347. "lines": []
  348. }
  349. return res
  350. # 数据标签按照提交内容查询相似分组,并且返回
  351. def dynamic_label_list(name_filter=None):
  352. """
  353. 根据内容查询相似的数据标签分组
  354. Args:
  355. name_filter: 内容过滤条件
  356. Returns:
  357. 标签分组列表
  358. """
  359. # 构建完整的查询语句
  360. cql = f"""
  361. MATCH (n:data_label)
  362. WITH n, apoc.text.levenshteinSimilarity(n.group, "{name_filter}") AS similarity
  363. WHERE similarity > 0.1 // 设置相似度阈值
  364. RETURN DISTINCT n.group as name,id(n) as nodeid
  365. """
  366. result = connect_graph.run(cql).data()
  367. data = []
  368. for record in result:
  369. data.append({
  370. "name": record['name'],
  371. "id": record['nodeid']
  372. })
  373. return data