interface.py 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161
  1. """
  2. 数据接口核心业务逻辑模块
  3. 本模块包含了数据接口相关的所有核心业务逻辑函数,包括:
  4. - 数据标准(data_standard)相关功能
  5. - 数据标签(DataLabel)相关功能
  6. - 图谱生成
  7. - 动态标签识别等功能
  8. """
  9. import logging
  10. import re
  11. from app.core.graph.graph_operations import connect_graph
  12. from app.services.neo4j_driver import neo4j_driver
  13. # 配置logger
  14. logger = logging.getLogger(__name__)
  15. def _build_category_filter_conditions(category_filter, params):
  16. """
  17. 将 category_filter 转换为 Cypher 查询条件列表。
  18. 支持:
  19. - 字典: {field: value, ...}
  20. - 列表: [{"field": "...", "value": "..."}, {"category": "xxx"}]
  21. - 字符串: 兼容旧用法,等同于按 category 字段过滤
  22. """
  23. conditions = []
  24. param_index = 0
  25. def add_condition(field, value):
  26. nonlocal param_index
  27. if value is None:
  28. return
  29. if not isinstance(field, str):
  30. return
  31. if not re.match(r"^[A-Za-z_][A-Za-z0-9_]*$", field):
  32. logger.warning(f"忽略非法属性字段: {field}")
  33. return
  34. param_key = f"category_filter_{param_index}"
  35. param_index += 1
  36. conditions.append(f"n.{field} CONTAINS ${param_key}")
  37. params[param_key] = value
  38. if isinstance(category_filter, dict):
  39. for field, value in category_filter.items():
  40. add_condition(field, value)
  41. elif isinstance(category_filter, list):
  42. for item in category_filter:
  43. if not isinstance(item, dict):
  44. continue
  45. if "field" in item and "value" in item:
  46. add_condition(item.get("field"), item.get("value"))
  47. elif len(item) == 1:
  48. field, value = next(iter(item.items()))
  49. add_condition(field, value)
  50. elif category_filter:
  51. add_condition("category", category_filter)
  52. return conditions
  53. # 数据标准列表展示
  54. def standard_list(
  55. skip_count,
  56. page_size,
  57. name_en_filter=None,
  58. name_zh_filter=None,
  59. category_filter=None,
  60. create_time_filter=None,
  61. ):
  62. """
  63. 获取数据标准列表
  64. Args:
  65. skip_count: 跳过的记录数量
  66. page_size: 每页记录数量
  67. name_en_filter: 英文名称过滤条件
  68. name_zh_filter: 名称过滤条件
  69. category_filter: 分类过滤条件
  70. create_time_filter: 时间过滤条件
  71. Returns:
  72. tuple: (数据列表, 总记录数)
  73. """
  74. data = []
  75. # 构建查询条件
  76. where_clause = []
  77. params = {}
  78. if name_zh_filter:
  79. where_clause.append("n.name_zh CONTAINS $name_zh_filter")
  80. params["name_zh_filter"] = name_zh_filter
  81. if name_en_filter:
  82. where_clause.append("n.name_en CONTAINS $name_en_filter")
  83. params["name_en_filter"] = name_en_filter
  84. if category_filter:
  85. where_clause.append("n.category CONTAINS $category_filter")
  86. params["category_filter"] = category_filter
  87. if create_time_filter:
  88. where_clause.append("n.create_time CONTAINS $create_time_filter")
  89. params["create_time_filter"] = create_time_filter
  90. else:
  91. where_clause.append("TRUE")
  92. where_str = " AND ".join(where_clause)
  93. # 构建完整的查询语句
  94. cql = f"""
  95. MATCH (n:data_standard)
  96. WHERE {where_str}
  97. RETURN
  98. properties(n) as properties,
  99. n.create_time as create_time,
  100. id(n) as nodeid,
  101. size([(n)<-[]-() | 1]) + size([(n)-[]->() | 1]) as relationship_count
  102. ORDER BY create_time desc
  103. SKIP $skip_count
  104. LIMIT $page_size
  105. """
  106. params["skip_count"] = skip_count
  107. params["page_size"] = page_size
  108. # 修复:使用正确的session方式执行查询
  109. driver = None
  110. try:
  111. driver = connect_graph()
  112. with driver.session() as session:
  113. result = session.run(cql, **params)
  114. for record in result:
  115. properties = {
  116. key: value
  117. for key, value in record["properties"].items()
  118. if key not in ["input", "code", "output"]
  119. }
  120. properties.setdefault("describe", None)
  121. new_attr = {
  122. "id": record["nodeid"],
  123. "number": record["relationship_count"],
  124. }
  125. properties.update(new_attr)
  126. data.append(properties)
  127. # 获取总量
  128. total_query = (
  129. f"MATCH (n:data_standard) WHERE {where_str} RETURN COUNT(n) AS total"
  130. )
  131. total_record = session.run(total_query, **params).single()
  132. total = total_record["total"] if total_record else 0
  133. return data, total
  134. except (ConnectionError, ValueError) as e:
  135. logger.error(f"Neo4j数据库连接失败: {str(e)}")
  136. return [], 0
  137. finally:
  138. if driver:
  139. driver.close()
  140. # 数据标准图谱展示(血缘关系)父节点
  141. def standard_kinship_graph(nodeid):
  142. """
  143. 生成数据标准的血缘关系图谱
  144. Args:
  145. nodeid: 节点ID
  146. Returns:
  147. 图谱数据
  148. """
  149. # 查询语句
  150. cql = """
  151. MATCH(da:data_standard)
  152. WHERE id(da)=$nodeId
  153. OPTIONAL MATCH(a:DataResource)-[:clean_resource]-(da)
  154. OPTIONAL MATCH(b:DataModel)-[:clean_model]-(da)
  155. WITH
  156. collect({
  157. id:toString(id(a)),
  158. text:a.name,
  159. type:split(labels(a)[0],'_')[1]
  160. })+
  161. collect({
  162. id:toString(id(b)),
  163. text:b.name,
  164. type:split(labels(b)[0],'_')[1]
  165. })+
  166. collect({
  167. id:toString(id(da)),
  168. text:da.name,
  169. type:split(labels(da)[0],'_')[1]
  170. }) as nodes,
  171. da,
  172. collect({from:toString(id(a)),to:toString(id(da)),text:'标准'})+
  173. collect({from:toString(id(b)),to:toString(id(da)),text:'标准'}) as lines
  174. WITH
  175. toString(id(da)) as rootId,
  176. apoc.coll.toSet(lines) as lines,
  177. apoc.coll.toSet(nodes) as nodes
  178. RETURN nodes,lines,rootId
  179. """
  180. driver = None
  181. try:
  182. driver = connect_graph()
  183. with driver.session() as session:
  184. result = session.run(cql, nodeId=nodeid)
  185. res = {}
  186. for item in result:
  187. res = {
  188. "nodes": [record for record in item["nodes"] if record["id"]],
  189. "lines": [
  190. record
  191. for record in item["lines"]
  192. if record["from"] and record["to"]
  193. ],
  194. "rootId": item["rootId"],
  195. }
  196. return res
  197. except (ConnectionError, ValueError) as e:
  198. logger.error(f"Neo4j数据库连接失败: {str(e)}")
  199. return {}
  200. finally:
  201. if driver:
  202. driver.close()
  203. # 数据标准图谱展示(影响关系)下游
  204. def standard_impact_graph(nodeid):
  205. """
  206. 生成数据标准的影响关系图谱
  207. Args:
  208. nodeid: 节点ID
  209. Returns:
  210. 图谱数据
  211. """
  212. # 查询语句
  213. cql = """
  214. MATCH(da:data_standard)
  215. WHERE id(da)=$nodeId
  216. OPTIONAL MATCH(da)-[:clean_model]-(m1:DataMeta)-[:clean_model]-(da)
  217. OPTIONAL MATCH(da)-[:clean_model]-(m2:DataMeta)-[:clean_model]-(da)
  218. WITH
  219. collect({
  220. id:toString(id(da)),
  221. text:da.name,
  222. type:split(labels(da)[0],'_')[1]
  223. })+
  224. collect({id:toString(id(m1)),text:m1.name})+
  225. collect({id:toString(id(m2)),text:m2.name}) as nodes,
  226. da,
  227. collect({
  228. from:toString(id(da)),
  229. to:toString(id(m1)),
  230. text:'标准清洗'
  231. })+
  232. collect({
  233. from:toString(id(da)),
  234. to:toString(id(m2)),
  235. text:'标准清洗'
  236. }) as lines
  237. WITH
  238. toString(id(da)) as rootId,
  239. apoc.coll.toSet(lines) as lines,
  240. apoc.coll.toSet(nodes) as nodes
  241. RETURN nodes,lines,rootId
  242. """
  243. driver = None
  244. try:
  245. driver = connect_graph()
  246. with driver.session() as session:
  247. result = session.run(cql, nodeId=nodeid)
  248. res = {}
  249. for item in result:
  250. res = {
  251. "nodes": [record for record in item["nodes"] if record["id"]],
  252. "lines": [
  253. record
  254. for record in item["lines"]
  255. if record["from"] and record["to"]
  256. ],
  257. "rootId": item["rootId"],
  258. }
  259. return res
  260. except (ConnectionError, ValueError) as e:
  261. logger.error(f"Neo4j数据库连接失败: {str(e)}")
  262. return {}
  263. finally:
  264. if driver:
  265. driver.close()
  266. # 数据标准图谱展示(所有关系)
  267. def standard_all_graph(nodeid):
  268. """
  269. 生成数据标准的所有关系图谱
  270. Args:
  271. nodeid: 节点ID
  272. Returns:
  273. 图谱数据
  274. """
  275. # 查询语句
  276. cql = """
  277. MATCH(da:data_standard)
  278. WHERE id(da)=$nodeId
  279. OPTIONAL MATCH(a:DataResource)-[:clean_resource]-(da)
  280. OPTIONAL MATCH(b:DataModel)-[:clean_model]-(da)
  281. OPTIONAL MATCH(da)-[:clean_model]-(m1:DataMeta)-[:clean_model]-(da)
  282. OPTIONAL MATCH(da)-[:clean_model]-(m2:DataMeta)-[:clean_model]-(da)
  283. WITH
  284. collect({
  285. id:toString(id(a)),
  286. text:a.name,
  287. type:split(labels(a)[0],'_')[1]
  288. })+
  289. collect({
  290. id:toString(id(b)),
  291. text:b.name,
  292. type:split(labels(b)[0],'_')[1]
  293. })+
  294. collect({
  295. id:toString(id(da)),
  296. text:da.name,
  297. type:split(labels(da)[0],'_')[1]
  298. })+
  299. collect({id:toString(id(m1)),text:m1.name})+
  300. collect({id:toString(id(m2)),text:m2.name}) as nodes,
  301. da,
  302. collect({from:toString(id(a)),to:toString(id(da)),text:'标准'})+
  303. collect({from:toString(id(b)),to:toString(id(da)),text:'标准'})+
  304. collect({
  305. from:toString(id(da)),
  306. to:toString(id(m1)),
  307. text:'标准清洗'
  308. })+
  309. collect({
  310. from:toString(id(da)),
  311. to:toString(id(m2)),
  312. text:'标准清洗'
  313. }) as lines
  314. WITH
  315. toString(id(da)) as rootId,
  316. apoc.coll.toSet(lines) as lines,
  317. apoc.coll.toSet(nodes) as nodes
  318. RETURN nodes,lines,rootId
  319. """
  320. driver = None
  321. try:
  322. driver = connect_graph()
  323. with driver.session() as session:
  324. result = session.run(cql, nodeId=nodeid)
  325. res = {}
  326. for item in result:
  327. res = {
  328. "nodes": [record for record in item["nodes"] if record["id"]],
  329. "lines": [
  330. record
  331. for record in item["lines"]
  332. if record["from"] and record["to"]
  333. ],
  334. "rootId": item["rootId"],
  335. }
  336. return res
  337. except (ConnectionError, ValueError) as e:
  338. logger.error(f"Neo4j数据库连接失败: {str(e)}")
  339. return {}
  340. finally:
  341. if driver:
  342. driver.close()
  343. # 数据标签列表展示
  344. def label_list(
  345. skip_count,
  346. page_size,
  347. name_en_filter=None,
  348. name_zh_filter=None,
  349. category_filter=None,
  350. group_filter=None,
  351. ):
  352. """
  353. 获取数据标签列表
  354. Args:
  355. skip_count: 跳过的记录数量
  356. page_size: 每页记录数量
  357. name_en_filter: 英文名称过滤条件
  358. name_zh_filter: 名称过滤条件
  359. category_filter: 分类过滤条件
  360. group_filter: 分组过滤条件
  361. Returns:
  362. tuple: (数据列表, 总记录数)
  363. """
  364. data = []
  365. # 构建查询条件
  366. where_clause = []
  367. params = {}
  368. if name_zh_filter:
  369. where_clause.append("n.name_zh CONTAINS $name_zh_filter")
  370. params["name_zh_filter"] = name_zh_filter
  371. if name_en_filter:
  372. where_clause.append("n.name_en CONTAINS $name_en_filter")
  373. params["name_en_filter"] = name_en_filter
  374. where_clause.extend(_build_category_filter_conditions(category_filter, params))
  375. if group_filter:
  376. where_clause.append("n.group CONTAINS $group_filter")
  377. params["group_filter"] = group_filter
  378. if not where_clause:
  379. where_clause.append("TRUE")
  380. where_str = " AND ".join(where_clause)
  381. # 构建完整的查询语句
  382. cql = f"""
  383. MATCH (n:DataLabel)
  384. WHERE {where_str}
  385. WITH
  386. n,
  387. properties(n) as properties,
  388. n.create_time as create_time,
  389. id(n) as nodeid
  390. OPTIONAL MATCH (n)<-[r]-()
  391. WITH
  392. n,
  393. properties,
  394. create_time,
  395. nodeid,
  396. count(r) as incoming
  397. OPTIONAL MATCH (n)-[r]->()
  398. WITH
  399. n,
  400. properties,
  401. create_time,
  402. nodeid,
  403. incoming,
  404. count(r) as outgoing
  405. RETURN
  406. properties,
  407. create_time,
  408. nodeid,
  409. incoming + outgoing as relationship_count
  410. ORDER BY create_time desc
  411. SKIP $skip_count
  412. LIMIT $page_size
  413. """
  414. params["skip_count"] = skip_count
  415. params["page_size"] = page_size
  416. driver = None
  417. try:
  418. driver = connect_graph()
  419. with driver.session() as session:
  420. result = session.run(cql, **params)
  421. for record in result:
  422. properties = record["properties"]
  423. new_attr = {
  424. "id": record["nodeid"],
  425. "number": record["relationship_count"],
  426. }
  427. if "describe" not in properties:
  428. properties["describe"] = None
  429. if "scope" not in properties:
  430. properties["scope"] = None
  431. properties.update(new_attr)
  432. data.append(properties)
  433. # 获取总量
  434. total_query = (
  435. f"MATCH (n:DataLabel) WHERE {where_str} RETURN COUNT(n) AS total"
  436. )
  437. total_record = session.run(total_query, **params).single()
  438. total = total_record["total"] if total_record else 0
  439. return data, total
  440. except (ConnectionError, ValueError) as e:
  441. logger.error(f"Neo4j数据库连接失败: {str(e)}")
  442. return [], 0
  443. finally:
  444. if driver:
  445. driver.close()
  446. # 数据标签图谱展示
  447. def id_label_graph(id):
  448. """
  449. 根据ID生成数据标签图谱
  450. Args:
  451. id: 节点ID
  452. Returns:
  453. 图谱数据
  454. """
  455. query = """
  456. MATCH (n:DataLabel)
  457. WHERE id(n) = $nodeId
  458. OPTIONAL MATCH (a)-[:LABEL]-(n)
  459. WITH
  460. collect({
  461. from: toString(id(a)),
  462. to: toString(id(n)),
  463. text: "标签"
  464. }) AS line1,
  465. collect({
  466. id: toString(id(n)),
  467. text: n.name_zh,
  468. type:"label"
  469. }) AS node1,
  470. collect({
  471. id: toString(id(a)),
  472. text: a.name_zh,
  473. type: split(labels(a)[0], '_')[1]
  474. }) AS node2,
  475. n
  476. WITH
  477. apoc.coll.toSet(line1) AS lines,
  478. apoc.coll.toSet(node1 + node2) AS nodes,
  479. toString(id(n)) AS res
  480. RETURN lines, nodes, res
  481. """
  482. driver = None
  483. try:
  484. driver = connect_graph()
  485. with driver.session() as session:
  486. result = session.run(query, nodeId=id)
  487. res = {}
  488. for item in result:
  489. res = {
  490. "nodes": [record for record in item["nodes"] if record["id"]],
  491. "lines": [
  492. record
  493. for record in item["lines"]
  494. if record["from"] and record["to"]
  495. ],
  496. "rootId": item["res"],
  497. }
  498. return res
  499. except (ConnectionError, ValueError) as e:
  500. logger.error(f"Neo4j数据库连接失败: {str(e)}")
  501. return {}
  502. finally:
  503. if driver:
  504. driver.close()
  505. # 数据标签图谱展示(血缘关系)父节点/(所有关系)
  506. def label_kinship_graph(nodeid):
  507. """
  508. 生成数据标签的血缘关系图谱
  509. Args:
  510. nodeid: 节点ID
  511. Returns:
  512. 图谱数据
  513. """
  514. # 查询语句
  515. cql = """
  516. MATCH(la:DataLabel)
  517. WHERE id(la)=$nodeId
  518. OPTIONAL MATCH(a:DataResource)-[:LABEL]-(la)
  519. OPTIONAL MATCH(b:DataModel)-[:LABEL]-(la)
  520. OPTIONAL MATCH(meta:DataMeta)-[:LABEL]-(la)
  521. OPTIONAL MATCH(d:data_standard)-[:LABEL]-(la)
  522. WITH
  523. collect({
  524. id:toString(id(a)),
  525. text:a.name_zh,
  526. type:split(labels(a)[0],'_')[1]
  527. })+
  528. collect({
  529. id:toString(id(b)),
  530. text:b.name_zh,
  531. type:split(labels(b)[0],'_')[1]
  532. })+
  533. collect({
  534. id:toString(id(d)),
  535. text:d.name_zh,
  536. type:split(labels(e)[0],'_')[1]
  537. })+
  538. collect({
  539. id:toString(id(la)),
  540. text:la.name_zh,
  541. type:split(labels(la)[0],'_')[1]
  542. })+
  543. collect({id:toString(id(meta)),text:meta.name_zh}) as nodes,
  544. la,
  545. collect({from:toString(id(a)),to:toString(id(la)),text:'标签'})+
  546. collect({from:toString(id(b)),to:toString(id(la)),text:'标签'})+
  547. collect({from:toString(id(meta)),to:toString(id(la)),text:'标签'})+
  548. collect({from:toString(id(d)),to:toString(id(la)),text:'标签'})+
  549. collect({from:toString(id(e)),to:toString(id(la)),text:'标签'}) as lines
  550. WITH
  551. toString(id(la)) as rootId,
  552. apoc.coll.toSet(lines) as lines,
  553. apoc.coll.toSet(nodes) as nodes
  554. RETURN nodes,lines,rootId
  555. """
  556. driver = None
  557. try:
  558. driver = connect_graph()
  559. with driver.session() as session:
  560. result = session.run(cql, nodeId=nodeid)
  561. res = {}
  562. for item in result:
  563. res = {
  564. "nodes": [record for record in item["nodes"] if record["id"]],
  565. "lines": [
  566. record
  567. for record in item["lines"]
  568. if record["from"] and record["to"]
  569. ],
  570. "rootId": item["rootId"],
  571. }
  572. return res
  573. except (ConnectionError, ValueError) as e:
  574. logger.error(f"Neo4j数据库连接失败: {str(e)}")
  575. return {}
  576. finally:
  577. if driver:
  578. driver.close()
  579. # 数据标签图谱展示(影响关系)下游
  580. def label_impact_graph(nodeid):
  581. """
  582. 生成数据标签的影响关系图谱
  583. Args:
  584. nodeid: 节点ID
  585. Returns:
  586. 图谱数据
  587. """
  588. # 查询语句
  589. cql = """
  590. MATCH(n:DataLabel)
  591. WHERE id(n)=$nodeId
  592. RETURN {
  593. id:toString(id(n)),
  594. text:(n.name_zh),
  595. type:"label"
  596. } AS nodes,
  597. toString(id(n)) as rootId
  598. """
  599. driver = None
  600. try:
  601. driver = connect_graph()
  602. with driver.session() as session:
  603. result = session.run(cql, nodeId=nodeid)
  604. res = {}
  605. for item in result:
  606. res = {"nodes": item["nodes"], "rootId": item["rootId"], "lines": []}
  607. return res
  608. except (ConnectionError, ValueError) as e:
  609. logger.error(f"Neo4j数据库连接失败: {str(e)}")
  610. return {}
  611. finally:
  612. if driver:
  613. driver.close()
  614. # 数据标签按照提交内容查询相似分组,并且返回
  615. def dynamic_label_list(name_filter=None):
  616. """
  617. 根据内容查询相似的数据标签分组
  618. Args:
  619. name_filter: 内容过滤条件
  620. Returns:
  621. 标签分组列表
  622. """
  623. # 构建完整的查询语句
  624. cql = """
  625. MATCH (n:DataLabel)
  626. WITH
  627. n,
  628. apoc.text.levenshteinSimilarity(n.group, $name_filter) AS similarity
  629. WHERE similarity > 0.1 // 设置相似度阈值
  630. RETURN DISTINCT n.group as name_zh, id(n) as nodeid
  631. """
  632. driver = None
  633. try:
  634. driver = connect_graph()
  635. with driver.session() as session:
  636. result = session.run(cql, name_filter=name_filter or "")
  637. data = []
  638. for record in result:
  639. data.append(
  640. {
  641. "name_zh": record["name_zh"],
  642. "id": record["nodeid"],
  643. }
  644. )
  645. return data
  646. except (ConnectionError, ValueError) as e:
  647. logger.error(f"Neo4j数据库连接失败: {str(e)}")
  648. return []
  649. finally:
  650. if driver:
  651. driver.close()
  652. def search_info(key, value):
  653. """
  654. 搜索指定属性的节点信息
  655. Args:
  656. key: 搜索属性键
  657. value: 搜索属性值
  658. Returns:
  659. 搜索结果列表
  660. """
  661. field_pattern = r"^[A-Za-z_][A-Za-z0-9_]*$"
  662. if not re.match(field_pattern, str(key)):
  663. logger.warning("非法属性键: %s", key)
  664. return []
  665. query = """
  666. MATCH (n)
  667. WHERE n[$field] =~ $pattern
  668. WITH
  669. n,
  670. properties(n) as properties,
  671. n.create_time as create_time,
  672. id(n) as nodeid
  673. RETURN properties, nodeid, create_time, labels(n) as labels
  674. LIMIT 30
  675. """
  676. driver = None
  677. try:
  678. driver = connect_graph()
  679. except (ConnectionError, ValueError) as e:
  680. logger.error(f"无法连接到Neo4j数据库: {str(e)}")
  681. return []
  682. try:
  683. with driver.session() as session:
  684. result = session.run(
  685. query,
  686. field=key,
  687. pattern=f"(?i).*{value}.*",
  688. )
  689. results = []
  690. for record in result:
  691. results.append(
  692. {
  693. "properties": record["properties"],
  694. "id": record["nodeid"],
  695. "create_time": record["create_time"],
  696. "labels": record["labels"],
  697. }
  698. )
  699. return results
  700. except Exception as e:
  701. logger.error(f"搜索节点信息失败: {str(e)}")
  702. return []
  703. finally:
  704. if driver:
  705. driver.close()
  706. def label_info(id):
  707. """
  708. 获取标签节点的信息
  709. Args:
  710. id: 节点ID
  711. Returns:
  712. 标签节点信息
  713. """
  714. query = """
  715. MATCH (n)
  716. WHERE id(n) = $nodeId
  717. RETURN {
  718. id:toString(id(n)),
  719. text:(n.name_zh),
  720. type:"label"
  721. } AS nodes,
  722. toString(id(n)) as rootId
  723. """
  724. driver = None
  725. try:
  726. driver = connect_graph()
  727. with driver.session() as session:
  728. result = session.run(query, nodeId=id).data()
  729. return result[0] if result else {}
  730. except (ConnectionError, ValueError) as e:
  731. logger.error(f"无法连接到Neo4j数据库: {str(e)}")
  732. return {}
  733. finally:
  734. if driver:
  735. driver.close()
  736. def graph_all(domain_id, include_meta=True):
  737. """
  738. 获取完整关系图谱
  739. 从指定的 domain_id 节点开始,通过 INPUT 和 OUTPUT 关系遍历找出所有的
  740. DataFlow 节点和 BusinessDomain 节点。
  741. Args:
  742. domain_id: 起始节点ID(通常是 BusinessDomain 节点)
  743. include_meta: 是否包含元数据节点。如果为 True,会包含:
  744. - domain_id 指定的节点本身
  745. - 通过 INCLUDES 关系连接到 domain_id 节点的 DataMeta 节点
  746. Returns:
  747. dict: 包含 nodes 与 lines 的图谱数据
  748. """
  749. try:
  750. domain_id_int = int(domain_id)
  751. except (ValueError, TypeError):
  752. logger.error(f"节点ID不是有效的整数: {domain_id}")
  753. return {"nodes": [], "lines": []}
  754. try:
  755. with neo4j_driver.get_session() as session:
  756. nodes = {} # 节点字典: {node_id: node_props}
  757. lines = {} # 关系字典: {rel_id: rel_props}
  758. # 1. 验证起始节点是否存在
  759. check_node_query = """
  760. MATCH (n)
  761. WHERE id(n) = $domain_id
  762. RETURN n, labels(n) as labels
  763. """
  764. result = session.run(check_node_query, domain_id=domain_id_int)
  765. record = result.single()
  766. if not record:
  767. logger.warning(f"未找到节点: {domain_id_int}")
  768. return {"nodes": [], "lines": []}
  769. start_node = record["n"]
  770. start_labels = record["labels"]
  771. start_node_type = start_labels[0] if start_labels else ""
  772. # 2. 如果 include_meta=True,添加起始节点及其 INCLUDES 关系的 DataMeta 节点
  773. if include_meta:
  774. # 添加起始节点
  775. start_props = dict(start_node)
  776. start_props["id"] = domain_id_int
  777. start_props["node_type"] = start_node_type
  778. nodes[domain_id_int] = start_props
  779. # 查找通过 INCLUDES 关系连接的 DataMeta 节点
  780. meta_query = """
  781. MATCH (n)-[r:INCLUDES]->(m:DataMeta)
  782. WHERE id(n) = $domain_id
  783. RETURN m, id(m) as meta_id, id(r) as rel_id
  784. """
  785. meta_results = session.run(meta_query, domain_id=domain_id_int)
  786. for meta_record in meta_results:
  787. meta_node = meta_record["m"]
  788. meta_id = meta_record["meta_id"]
  789. rel_id = meta_record["rel_id"]
  790. # 添加 DataMeta 节点
  791. meta_props = dict(meta_node)
  792. meta_props["id"] = meta_id
  793. meta_props["node_type"] = "DataMeta"
  794. nodes[meta_id] = meta_props
  795. # 添加 INCLUDES 关系
  796. lines[str(rel_id)] = {
  797. "id": str(rel_id),
  798. "from": str(domain_id_int),
  799. "to": str(meta_id),
  800. "text": "INCLUDES",
  801. }
  802. # 3. 通过 INPUT 和 OUTPUT 关系遍历,找出所有相关的 DataFlow 和 BusinessDomain 节点
  803. # 使用广度优先遍历,确保 BusinessDomain 和 DataFlow 两种节点都加入队列进行二次遍历
  804. queue = [(domain_id_int, start_node_type)] # (node_id, node_type)
  805. processed_bd = set() # 已处理的 BusinessDomain 节点 ID
  806. processed_df = set() # 已处理的 DataFlow 节点 ID
  807. while queue:
  808. current_id, current_type = queue.pop(0)
  809. # 如果是 BusinessDomain,查找所有相关的 DataFlow(INPUT 和 OUTPUT 两个方向)
  810. if current_type == "BusinessDomain" and current_id not in processed_bd:
  811. processed_bd.add(current_id)
  812. # 添加当前 BusinessDomain 节点(如果还未添加)
  813. if current_id not in nodes:
  814. bd_query = """
  815. MATCH (bd:BusinessDomain)
  816. WHERE id(bd) = $bd_id
  817. RETURN bd
  818. """
  819. bd_result = session.run(bd_query, bd_id=current_id).single()
  820. if bd_result:
  821. bd_node = bd_result["bd"]
  822. bd_props = dict(bd_node)
  823. bd_props["id"] = current_id
  824. bd_props["node_type"] = "BusinessDomain"
  825. nodes[current_id] = bd_props
  826. # 查找通过 INPUT 关系连接的 DataFlow(BD-[INPUT]->DF)
  827. input_query = """
  828. MATCH (bd:BusinessDomain)-[r:INPUT]->(df:DataFlow)
  829. WHERE id(bd) = $bd_id
  830. RETURN df, id(df) as df_id, id(r) as rel_id
  831. """
  832. input_results = session.run(input_query, bd_id=current_id)
  833. for input_record in input_results:
  834. df_node = input_record["df"]
  835. df_id = input_record["df_id"]
  836. rel_id = input_record["rel_id"]
  837. # 添加 DataFlow 节点
  838. if df_id not in nodes:
  839. df_props = dict(df_node)
  840. df_props["id"] = df_id
  841. df_props["node_type"] = "DataFlow"
  842. nodes[df_id] = df_props
  843. # 添加 INPUT 关系
  844. lines[str(rel_id)] = {
  845. "id": str(rel_id),
  846. "from": str(current_id),
  847. "to": str(df_id),
  848. "text": "INPUT",
  849. }
  850. # 将 DataFlow 加入队列继续遍历
  851. if df_id not in processed_df:
  852. queue.append((df_id, "DataFlow"))
  853. # 查找通过 OUTPUT 关系连接的 DataFlow(DF-[OUTPUT]->BD,反向查找)
  854. reverse_output_query = """
  855. MATCH (df:DataFlow)-[r:OUTPUT]->(bd:BusinessDomain)
  856. WHERE id(bd) = $bd_id
  857. RETURN df, id(df) as df_id, id(r) as rel_id
  858. """
  859. reverse_output_results = session.run(
  860. reverse_output_query, bd_id=current_id
  861. )
  862. for reverse_record in reverse_output_results:
  863. df_node = reverse_record["df"]
  864. df_id = reverse_record["df_id"]
  865. rel_id = reverse_record["rel_id"]
  866. # 添加 DataFlow 节点
  867. if df_id not in nodes:
  868. df_props = dict(df_node)
  869. df_props["id"] = df_id
  870. df_props["node_type"] = "DataFlow"
  871. nodes[df_id] = df_props
  872. # 添加 OUTPUT 关系
  873. lines[str(rel_id)] = {
  874. "id": str(rel_id),
  875. "from": str(df_id),
  876. "to": str(current_id),
  877. "text": "OUTPUT",
  878. }
  879. # 将 DataFlow 加入队列继续遍历
  880. if df_id not in processed_df:
  881. queue.append((df_id, "DataFlow"))
  882. # 如果是 DataFlow,查找所有相关的 BusinessDomain(INPUT 和 OUTPUT 两个方向)
  883. elif current_type == "DataFlow" and current_id not in processed_df:
  884. processed_df.add(current_id)
  885. # 添加当前 DataFlow 节点(如果还未添加)
  886. if current_id not in nodes:
  887. df_query = """
  888. MATCH (df:DataFlow)
  889. WHERE id(df) = $df_id
  890. RETURN df
  891. """
  892. df_result = session.run(df_query, df_id=current_id).single()
  893. if df_result:
  894. df_node = df_result["df"]
  895. df_props = dict(df_node)
  896. df_props["id"] = current_id
  897. df_props["node_type"] = "DataFlow"
  898. nodes[current_id] = df_props
  899. # 查找通过 OUTPUT 关系连接的目标 BusinessDomain(DF-[OUTPUT]->BD)
  900. output_query = """
  901. MATCH (df:DataFlow)-[r:OUTPUT]->(bd:BusinessDomain)
  902. WHERE id(df) = $df_id
  903. RETURN bd, id(bd) as bd_id, id(r) as rel_id
  904. """
  905. output_results = session.run(output_query, df_id=current_id)
  906. for output_record in output_results:
  907. bd_node = output_record["bd"]
  908. bd_id = output_record["bd_id"]
  909. rel_id = output_record["rel_id"]
  910. # 添加 BusinessDomain 节点
  911. if bd_id not in nodes:
  912. bd_props = dict(bd_node)
  913. bd_props["id"] = bd_id
  914. bd_props["node_type"] = "BusinessDomain"
  915. nodes[bd_id] = bd_props
  916. # 添加 OUTPUT 关系
  917. lines[str(rel_id)] = {
  918. "id": str(rel_id),
  919. "from": str(current_id),
  920. "to": str(bd_id),
  921. "text": "OUTPUT",
  922. }
  923. # 将 BusinessDomain 加入队列继续遍历
  924. if bd_id not in processed_bd:
  925. queue.append((bd_id, "BusinessDomain"))
  926. # 查找通过 INPUT 关系连接的源 BusinessDomain(BD-[INPUT]->DF,反向查找)
  927. reverse_input_query = """
  928. MATCH (bd:BusinessDomain)-[r:INPUT]->(df:DataFlow)
  929. WHERE id(df) = $df_id
  930. RETURN bd, id(bd) as bd_id, id(r) as rel_id
  931. """
  932. reverse_input_results = session.run(
  933. reverse_input_query, df_id=current_id
  934. )
  935. for reverse_record in reverse_input_results:
  936. bd_node = reverse_record["bd"]
  937. bd_id = reverse_record["bd_id"]
  938. rel_id = reverse_record["rel_id"]
  939. # 添加 BusinessDomain 节点
  940. if bd_id not in nodes:
  941. bd_props = dict(bd_node)
  942. bd_props["id"] = bd_id
  943. bd_props["node_type"] = "BusinessDomain"
  944. nodes[bd_id] = bd_props
  945. # 添加 INPUT 关系
  946. lines[str(rel_id)] = {
  947. "id": str(rel_id),
  948. "from": str(bd_id),
  949. "to": str(current_id),
  950. "text": "INPUT",
  951. }
  952. # 将 BusinessDomain 加入队列继续遍历
  953. if bd_id not in processed_bd:
  954. queue.append((bd_id, "BusinessDomain"))
  955. logger.info(
  956. f"graph_all 结果: node_id={domain_id_int}, "
  957. f"nodes={len(nodes)}, lines={len(lines)}, "
  958. f"include_meta={include_meta}"
  959. )
  960. return {
  961. "nodes": list(nodes.values()),
  962. "lines": list(lines.values()),
  963. }
  964. except Exception as e:
  965. logger.error(f"获取图谱失败: {str(e)}")
  966. import traceback
  967. logger.error(traceback.format_exc())
  968. return {"nodes": [], "lines": []}
  969. def node_delete(node_id):
  970. """
  971. 删除 DataLabel 节点及其所有关联关系
  972. Args:
  973. node_id: 节点ID(整数)
  974. Returns:
  975. dict: 删除结果,包含 success 状态和 message 信息
  976. """
  977. driver = None
  978. try:
  979. driver = connect_graph()
  980. except (ConnectionError, ValueError) as e:
  981. logger.error(f"无法连接到Neo4j数据库: {str(e)}")
  982. return {"success": False, "message": "无法连接到数据库"}
  983. try:
  984. with driver.session() as session:
  985. # 首先检查节点是否存在且为 DataLabel 类型
  986. check_query = """
  987. MATCH (n:DataLabel)
  988. WHERE id(n) = $nodeId
  989. RETURN n
  990. """
  991. check_result = session.run(
  992. check_query,
  993. nodeId=node_id,
  994. ).single()
  995. if not check_result:
  996. logger.warning(f"DataLabel 节点不存在: ID={node_id}")
  997. return {
  998. "success": False,
  999. "message": f"DataLabel 节点不存在 (ID: {node_id})",
  1000. }
  1001. # 删除节点及其所有关系
  1002. delete_query = """
  1003. MATCH (n:DataLabel)
  1004. WHERE id(n) = $nodeId
  1005. DETACH DELETE n
  1006. RETURN count(n) as deleted_count
  1007. """
  1008. delete_result = session.run(
  1009. delete_query,
  1010. nodeId=node_id,
  1011. ).single()
  1012. if not delete_result:
  1013. logger.warning(f"删除结果为空: ID={node_id}")
  1014. return {
  1015. "success": False,
  1016. "message": "删除失败,未获取到删除结果",
  1017. }
  1018. deleted_count = delete_result["deleted_count"]
  1019. if deleted_count > 0:
  1020. logger.info(f"成功删除 DataLabel 节点: ID={node_id}")
  1021. return {
  1022. "success": True,
  1023. "message": (f"成功删除 DataLabel 节点 (ID: {node_id})"),
  1024. }
  1025. else:
  1026. logger.warning(f"删除失败,节点可能已被删除: ID={node_id}")
  1027. return {"success": False, "message": "删除失败,节点可能已被删除"}
  1028. except Exception as e:
  1029. logger.error(f"删除 DataLabel 节点失败: {str(e)}")
  1030. return {"success": False, "message": f"删除失败: {str(e)}"}
  1031. finally:
  1032. if driver:
  1033. driver.close()