interface.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491
  1. """
  2. 数据接口核心业务逻辑模块
  3. 本模块包含了数据接口相关的所有核心业务逻辑函数,包括:
  4. - 数据标准(data_standard)相关功能
  5. - 数据标签(data_label)相关功能
  6. - 图谱生成
  7. - 动态标签识别等功能
  8. """
  9. import logging
  10. from app.core.graph.graph_operations import connect_graph
  11. # 配置logger
  12. logger = logging.getLogger(__name__)
  13. # 数据标准列表展示
  14. def standard_list(skip_count, page_size, en_name_filter=None,
  15. name_filter=None, category_filter=None, time_filter=None):
  16. """
  17. 获取数据标准列表
  18. Args:
  19. skip_count: 跳过的记录数量
  20. page_size: 每页记录数量
  21. en_name_filter: 英文名称过滤条件
  22. name_filter: 名称过滤条件
  23. category_filter: 分类过滤条件
  24. time_filter: 时间过滤条件
  25. Returns:
  26. tuple: (数据列表, 总记录数)
  27. """
  28. data = []
  29. # 构建查询条件
  30. where_clause = []
  31. params = {}
  32. if name_filter:
  33. where_clause.append("n.name CONTAINS $name_filter")
  34. params['name_filter'] = name_filter
  35. if en_name_filter:
  36. where_clause.append("n.en_name CONTAINS $en_name_filter")
  37. params['en_name_filter'] = en_name_filter
  38. if category_filter:
  39. where_clause.append("n.category CONTAINS $category_filter")
  40. params['category_filter'] = category_filter
  41. if time_filter:
  42. where_clause.append("n.time CONTAINS $time_filter")
  43. params['time_filter'] = time_filter
  44. else:
  45. where_clause.append("TRUE")
  46. where_str = " AND ".join(where_clause)
  47. # 构建完整的查询语句
  48. cql = f"""
  49. MATCH (n:data_standard)
  50. WHERE {where_str}
  51. RETURN properties(n) as properties,n.time as time,elementId(n) as nodeid,
  52. COUNT((()-[]->(n))) + COUNT(((n)-[]->())) as relationship_count
  53. ORDER BY time desc
  54. SKIP $skip_count
  55. LIMIT $page_size
  56. """
  57. params['skip_count'] = skip_count
  58. params['page_size'] = page_size
  59. # 修复:使用正确的session方式执行查询
  60. driver = connect_graph()
  61. if not driver:
  62. return [], 0
  63. with driver.session() as session:
  64. result = session.run(cql, **params)
  65. for record in result:
  66. properties = {
  67. key: value for key, value in record['properties'].items()
  68. if key not in ['input', 'code', 'output']
  69. }
  70. properties.setdefault("describe", None)
  71. new_attr = {
  72. 'id': record['nodeid'],
  73. 'number': record['relationship_count']
  74. }
  75. properties.update(new_attr)
  76. data.append(properties)
  77. # 获取总量
  78. total_query = f"MATCH (n:data_standard) WHERE {where_str} RETURN COUNT(n) AS total"
  79. total_result = session.run(total_query, **params).single()["total"]
  80. return data, total_result
  81. # 数据标准图谱展示(血缘关系)父节点
  82. def standard_kinship_graph(nodeid):
  83. """
  84. 生成数据标准的血缘关系图谱
  85. Args:
  86. nodeid: 节点ID
  87. Returns:
  88. 图谱数据
  89. """
  90. # 查询语句
  91. cql = """
  92. MATCH(da:data_standard)
  93. WHERE elementId(da)=$nodeId
  94. OPTIONAL MATCH(a:data_resource)-[:clean_resource]-(da)
  95. OPTIONAL MATCH(b:data_model)-[:clean_model]-(da)
  96. WITH
  97. collect({id:toString(elementId(a)),text:a.name,type:split(labels(a)[0],'_')[1]})+
  98. collect({id:toString(elementId(b)),text:b.name,type:split(labels(b)[0],'_')[1]})+
  99. collect({id:toString(elementId(da)),text:da.name,type:split(labels(da)[0],'_')[1]}) as nodes,da,
  100. collect({from:toString(elementId(a)),to:toString(elementId(da)),text:'标准'})+
  101. collect({from:toString(elementId(b)),to:toString(elementId(da)),text:'标准'})as lines
  102. WITH
  103. toString(elementId(da)) as rootId,
  104. apoc.coll.toSet(lines) as lines,
  105. apoc.coll.toSet(nodes) as nodes
  106. RETURN nodes,lines,rootId
  107. """
  108. # 修复:使用正确的session方式执行查询
  109. driver = connect_graph()
  110. if not driver:
  111. return {}
  112. with driver.session() as session:
  113. result = session.run(cql, nodeId=nodeid)
  114. res = {}
  115. for item in result:
  116. res = {
  117. "nodes": [record for record in item['nodes'] if record['id']],
  118. "lines": [record for record in item['lines'] if record['from'] and record['to']],
  119. "rootId": item['rootId']
  120. }
  121. return res
  122. # 数据标准图谱展示(影响关系)下游
  123. def standard_impact_graph(nodeid):
  124. """
  125. 生成数据标准的影响关系图谱
  126. Args:
  127. nodeid: 节点ID
  128. Returns:
  129. 图谱数据
  130. """
  131. # 查询语句
  132. cql = """
  133. MATCH(da:data_standard)
  134. WHERE elementId(da)=$nodeId
  135. OPTIONAL MATCH(da)-[:clean_model]-(m1:meta_node)
  136. OPTIONAL MATCH(da)-[:clean_model]-(m2:meta_node)
  137. WITH
  138. collect({id:toString(elementId(da)),text:da.name,type:split(labels(da)[0],'_')[1]})+
  139. collect({id:toString(elementId(m1)),text:m1.name})+
  140. collect({id:toString(elementId(m2)),text:m2.name})as nodes,da,
  141. collect({from:toString(elementId(da)),to:toString(elementId(m1)),text:'标准清洗'})+
  142. collect({from:toString(elementId(da)),to:toString(elementId(m2)),text:'标准清洗'})as lines
  143. WITH
  144. toString(elementId(da)) as rootId,
  145. apoc.coll.toSet(lines) as lines,
  146. apoc.coll.toSet(nodes) as nodes
  147. RETURN nodes,lines,rootId
  148. """
  149. # 修复:使用正确的session方式执行查询
  150. driver = connect_graph()
  151. if not driver:
  152. return {}
  153. with driver.session() as session:
  154. result = session.run(cql, nodeId=nodeid)
  155. res = {}
  156. for item in result:
  157. res = {
  158. "nodes": [record for record in item['nodes'] if record['id']],
  159. "lines": [record for record in item['lines'] if record['from'] and record['to']],
  160. "rootId": item['rootId']
  161. }
  162. return res
  163. # 数据标准图谱展示(所有关系)
  164. def standard_all_graph(nodeid):
  165. """
  166. 生成数据标准的所有关系图谱
  167. Args:
  168. nodeid: 节点ID
  169. Returns:
  170. 图谱数据
  171. """
  172. # 查询语句
  173. cql = """
  174. MATCH(da:data_standard)
  175. WHERE elementId(da)=$nodeId
  176. OPTIONAL MATCH(a:data_resource)-[:clean_resource]-(da)
  177. OPTIONAL MATCH(b:data_model)-[:clean_model]-(da)
  178. OPTIONAL MATCH(da)-[:clean_model]-(m1:meta_node)
  179. OPTIONAL MATCH(da)-[:clean_model]-(m2:meta_node)
  180. WITH
  181. collect({id:toString(elementId(a)),text:a.name,type:split(labels(a)[0],'_')[1]})+
  182. collect({id:toString(elementId(b)),text:b.name,type:split(labels(b)[0],'_')[1]})+
  183. collect({id:toString(elementId(da)),text:da.name,type:split(labels(da)[0],'_')[1]})+
  184. collect({id:toString(elementId(m1)),text:m1.name})+
  185. collect({id:toString(elementId(m2)),text:m2.name})as nodes,da,
  186. collect({from:toString(elementId(a)),to:toString(elementId(da)),text:'标准'})+
  187. collect({from:toString(elementId(b)),to:toString(elementId(da)),text:'标准'})+
  188. collect({from:toString(elementId(da)),to:toString(elementId(m1)),text:'标准清洗'})+
  189. collect({from:toString(elementId(da)),to:toString(elementId(m2)),text:'标准清洗'})as lines
  190. WITH
  191. toString(elementId(da)) as rootId,
  192. apoc.coll.toSet(lines) as lines,
  193. apoc.coll.toSet(nodes) as nodes
  194. RETURN nodes,lines,rootId
  195. """
  196. # 修复:使用正确的session方式执行查询
  197. driver = connect_graph()
  198. if not driver:
  199. return {}
  200. with driver.session() as session:
  201. result = session.run(cql, nodeId=nodeid)
  202. res = {}
  203. for item in result:
  204. res = {
  205. "nodes": [record for record in item['nodes'] if record['id']],
  206. "lines": [record for record in item['lines'] if record['from'] and record['to']],
  207. "rootId": item['rootId']
  208. }
  209. return res
  210. # 数据标签列表展示
  211. def label_list(skip_count, page_size, en_name_filter=None,
  212. name_filter=None, category_filter=None, group_filter=None):
  213. """
  214. 获取数据标签列表
  215. Args:
  216. skip_count: 跳过的记录数量
  217. page_size: 每页记录数量
  218. en_name_filter: 英文名称过滤条件
  219. name_filter: 名称过滤条件
  220. category_filter: 分类过滤条件
  221. group_filter: 分组过滤条件
  222. Returns:
  223. tuple: (数据列表, 总记录数)
  224. """
  225. data = []
  226. # 构建查询条件
  227. where_clause = []
  228. params = {}
  229. if name_filter:
  230. where_clause.append("n.name CONTAINS $name_filter")
  231. params['name_filter'] = name_filter
  232. if en_name_filter:
  233. where_clause.append("n.en_name CONTAINS $en_name_filter")
  234. params['en_name_filter'] = en_name_filter
  235. if category_filter:
  236. where_clause.append("n.category CONTAINS $category_filter")
  237. params['category_filter'] = category_filter
  238. if group_filter:
  239. where_clause.append(f"n.group CONTAINS $group_filter")
  240. params['group_filter'] = group_filter
  241. else:
  242. where_clause.append("TRUE")
  243. where_str = " AND ".join(where_clause)
  244. # 构建完整的查询语句
  245. cql = f"""
  246. MATCH (n:data_label)
  247. WHERE {where_str}
  248. WITH n, properties(n) as properties, n.time as time, elementId(n) as nodeid
  249. OPTIONAL MATCH (n)<-[r]-()
  250. WITH n, properties, time, nodeid, count(r) as incoming
  251. OPTIONAL MATCH (n)-[r]->()
  252. WITH n, properties, time, nodeid, incoming, count(r) as outgoing
  253. RETURN properties, time, nodeid, incoming + outgoing as relationship_count
  254. ORDER BY time desc
  255. SKIP $skip_count
  256. LIMIT $page_size
  257. """
  258. params['skip_count'] = skip_count
  259. params['page_size'] = page_size
  260. # 修复:使用正确的session方式执行查询
  261. driver = connect_graph()
  262. if not driver:
  263. logger.error("无法连接到数据库")
  264. return [], 0
  265. with driver.session() as session:
  266. result = session.run(cql, **params)
  267. for record in result:
  268. properties = record['properties']
  269. new_attr = {
  270. 'id': record['nodeid'],
  271. 'number': record['relationship_count']
  272. }
  273. if "describe" not in properties:
  274. properties["describe"] = None
  275. if "scope" not in properties:
  276. properties["scope"] = None
  277. properties.update(new_attr)
  278. data.append(properties)
  279. # 获取总量
  280. total_query = f"MATCH (n:data_label) WHERE {where_str} RETURN COUNT(n) AS total"
  281. total_result = session.run(total_query, **params).single()["total"]
  282. return data, total_result
  283. # 数据标签图谱展示
  284. def id_label_graph(id):
  285. """
  286. 根据ID生成数据标签图谱
  287. Args:
  288. id: 节点ID
  289. Returns:
  290. 图谱数据
  291. """
  292. query = """
  293. MATCH (n:data_label)
  294. WHERE elementId(n) = $nodeId
  295. OPTIONAL MATCH (a)-[:label]-(n)
  296. WITH
  297. collect({from: toString(elementId(a)), to: toString(elementId(n)), text: "标签"}) AS line1,
  298. collect({id: toString(elementId(n)), text: n.name, type:"label"}) AS node1,
  299. collect({id: toString(elementId(a)), text: a.name, type: split(labels(a)[0], '_')[1]}) AS node2, n
  300. WITH apoc.coll.toSet(line1) AS lines,
  301. apoc.coll.toSet(node1 + node2) AS nodes,
  302. toString(elementId(n)) AS res
  303. RETURN lines, nodes, res
  304. """
  305. # 修复:使用正确的session方式执行查询
  306. driver = connect_graph()
  307. if not driver:
  308. return {}
  309. with driver.session() as session:
  310. result = session.run(query, nodeId=id)
  311. res = {}
  312. for item in result:
  313. res = {
  314. "nodes": [record for record in item['nodes'] if record['id']],
  315. "lines": [record for record in item['lines'] if record['from'] and record['to']],
  316. "rootId": item['res'],
  317. }
  318. return res
  319. # 数据标签图谱展示(血缘关系)父节点/(所有关系)
  320. def label_kinship_graph(nodeid):
  321. """
  322. 生成数据标签的血缘关系图谱
  323. Args:
  324. nodeid: 节点ID
  325. Returns:
  326. 图谱数据
  327. """
  328. # 查询语句
  329. cql = """
  330. MATCH(la:data_label)
  331. WHERE elementId(la)=$nodeId
  332. OPTIONAL MATCH(a:data_resource)-[:label]-(la)
  333. OPTIONAL MATCH(b:data_model)-[:label]-(la)
  334. OPTIONAL MATCH(meta:meta_node)-[:label]-(la)
  335. OPTIONAL MATCH(d:data_standard)-[:label]-(la)
  336. OPTIONAL MATCH(e:data_metric)-[:label]-(la)
  337. WITH
  338. collect({id:toString(elementId(a)),text:a.name,type:split(labels(a)[0],'_')[1]})+
  339. collect({id:toString(elementId(b)),text:b.name,type:split(labels(b)[0],'_')[1]})+
  340. collect({id:toString(elementId(d)),text:d.name,type:split(labels(d)[0],'_')[1]})+
  341. collect({id:toString(elementId(e)),text:e.name,type:split(labels(e)[0],'_')[1]})+
  342. collect({id:toString(elementId(la)),text:la.name,type:split(labels(la)[0],'_')[1]})+
  343. collect({id:toString(elementId(meta)),text:meta.name}) as nodes,la,
  344. collect({from:toString(elementId(a)),to:toString(elementId(la)),text:'标签'})+
  345. collect({from:toString(elementId(b)),to:toString(elementId(la)),text:'标签'})+
  346. collect({from:toString(elementId(meta)),to:toString(elementId(la)),text:'标签'})+
  347. collect({from:toString(elementId(d)),to:toString(elementId(la)),text:'标签'})+
  348. collect({from:toString(elementId(e)),to:toString(elementId(la)),text:'标签'})as lines
  349. WITH
  350. toString(elementId(la)) as rootId,
  351. apoc.coll.toSet(lines) as lines,
  352. apoc.coll.toSet(nodes) as nodes
  353. RETURN nodes,lines,rootId
  354. """
  355. # 修复:使用正确的session方式执行查询
  356. driver = connect_graph()
  357. if not driver:
  358. return {}
  359. with driver.session() as session:
  360. result = session.run(cql, nodeId=nodeid)
  361. res = {}
  362. for item in result:
  363. res = {
  364. "nodes": [record for record in item['nodes'] if record['id']],
  365. "lines": [record for record in item['lines'] if record['from'] and record['to']],
  366. "rootId": item['rootId']
  367. }
  368. return res
  369. # 数据标签图谱展示(影响关系)下游
  370. def label_impact_graph(nodeid):
  371. """
  372. 生成数据标签的影响关系图谱
  373. Args:
  374. nodeid: 节点ID
  375. Returns:
  376. 图谱数据
  377. """
  378. # 查询语句
  379. cql = """
  380. MATCH(n:data_label)
  381. WHERE elementId(n)=$nodeId
  382. RETURN {id:toString(elementId(n)),text:(n.name),type:"label"} AS nodes,
  383. toString(elementId(n)) as rootId
  384. """
  385. # 修复:使用正确的session方式执行查询
  386. driver = connect_graph()
  387. if not driver:
  388. return {}
  389. with driver.session() as session:
  390. result = session.run(cql, nodeId=nodeid)
  391. res = {}
  392. for item in result:
  393. res = {
  394. "nodes": item['nodes'],
  395. "rootId": item['rootId'],
  396. "lines": []
  397. }
  398. return res
  399. # 数据标签按照提交内容查询相似分组,并且返回
  400. def dynamic_label_list(name_filter=None):
  401. """
  402. 根据内容查询相似的数据标签分组
  403. Args:
  404. name_filter: 内容过滤条件
  405. Returns:
  406. 标签分组列表
  407. """
  408. # 构建完整的查询语句
  409. cql = f"""
  410. MATCH (n:data_label)
  411. WITH n, apoc.text.levenshteinSimilarity(n.group, "{name_filter}") AS similarity
  412. WHERE similarity > 0.1 // 设置相似度阈值
  413. RETURN DISTINCT n.group as name, elementId(n) as nodeid
  414. """
  415. # 修复:使用正确的session方式执行查询
  416. driver = connect_graph()
  417. if not driver:
  418. return []
  419. with driver.session() as session:
  420. result = session.run(cql)
  421. data = []
  422. for record in result:
  423. data.append({
  424. "name": record['name'],
  425. "id": record['nodeid']
  426. })
  427. return data