interface.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020
  1. """
  2. 数据接口核心业务逻辑模块
  3. 本模块包含了数据接口相关的所有核心业务逻辑函数,包括:
  4. - 数据标准(data_standard)相关功能
  5. - 数据标签(DataLabel)相关功能
  6. - 图谱生成
  7. - 动态标签识别等功能
  8. """
  9. import logging
  10. import re
  11. from app.core.graph.graph_operations import connect_graph
  12. from app.services.neo4j_driver import neo4j_driver
  13. # 配置logger
  14. logger = logging.getLogger(__name__)
  15. def _build_category_filter_conditions(category_filter, params):
  16. """
  17. 将 category_filter 转换为 Cypher 查询条件列表。
  18. 支持:
  19. - 字典: {field: value, ...}
  20. - 列表: [{"field": "...", "value": "..."}, {"category": "xxx"}]
  21. - 字符串: 兼容旧用法,等同于按 category 字段过滤
  22. """
  23. conditions = []
  24. param_index = 0
  25. def add_condition(field, value):
  26. nonlocal param_index
  27. if value is None:
  28. return
  29. if not isinstance(field, str):
  30. return
  31. if not re.match(r"^[A-Za-z_][A-Za-z0-9_]*$", field):
  32. logger.warning(f"忽略非法属性字段: {field}")
  33. return
  34. param_key = f"category_filter_{param_index}"
  35. param_index += 1
  36. conditions.append(f"n.{field} CONTAINS ${param_key}")
  37. params[param_key] = value
  38. if isinstance(category_filter, dict):
  39. for field, value in category_filter.items():
  40. add_condition(field, value)
  41. elif isinstance(category_filter, list):
  42. for item in category_filter:
  43. if not isinstance(item, dict):
  44. continue
  45. if "field" in item and "value" in item:
  46. add_condition(item.get("field"), item.get("value"))
  47. elif len(item) == 1:
  48. field, value = next(iter(item.items()))
  49. add_condition(field, value)
  50. elif category_filter:
  51. add_condition("category", category_filter)
  52. return conditions
  53. # 数据标准列表展示
  54. def standard_list(
  55. skip_count,
  56. page_size,
  57. name_en_filter=None,
  58. name_zh_filter=None,
  59. category_filter=None,
  60. create_time_filter=None,
  61. ):
  62. """
  63. 获取数据标准列表
  64. Args:
  65. skip_count: 跳过的记录数量
  66. page_size: 每页记录数量
  67. name_en_filter: 英文名称过滤条件
  68. name_zh_filter: 名称过滤条件
  69. category_filter: 分类过滤条件
  70. create_time_filter: 时间过滤条件
  71. Returns:
  72. tuple: (数据列表, 总记录数)
  73. """
  74. data = []
  75. # 构建查询条件
  76. where_clause = []
  77. params = {}
  78. if name_zh_filter:
  79. where_clause.append("n.name_zh CONTAINS $name_zh_filter")
  80. params['name_zh_filter'] = name_zh_filter
  81. if name_en_filter:
  82. where_clause.append("n.name_en CONTAINS $name_en_filter")
  83. params['name_en_filter'] = name_en_filter
  84. if category_filter:
  85. where_clause.append("n.category CONTAINS $category_filter")
  86. params['category_filter'] = category_filter
  87. if create_time_filter:
  88. where_clause.append("n.create_time CONTAINS $create_time_filter")
  89. params['create_time_filter'] = create_time_filter
  90. else:
  91. where_clause.append("TRUE")
  92. where_str = " AND ".join(where_clause)
  93. # 构建完整的查询语句
  94. cql = f"""
  95. MATCH (n:data_standard)
  96. WHERE {where_str}
  97. RETURN
  98. properties(n) as properties,
  99. n.create_time as create_time,
  100. id(n) as nodeid,
  101. size([(n)<-[]-() | 1]) + size([(n)-[]->() | 1]) as relationship_count
  102. ORDER BY create_time desc
  103. SKIP $skip_count
  104. LIMIT $page_size
  105. """
  106. params['skip_count'] = skip_count
  107. params['page_size'] = page_size
  108. # 修复:使用正确的session方式执行查询
  109. driver = None
  110. try:
  111. driver = connect_graph()
  112. with driver.session() as session:
  113. result = session.run(cql, **params)
  114. for record in result:
  115. properties = {
  116. key: value for key, value in record['properties'].items()
  117. if key not in ['input', 'code', 'output']
  118. }
  119. properties.setdefault("describe", None)
  120. new_attr = {
  121. 'id': record['nodeid'],
  122. 'number': record['relationship_count']
  123. }
  124. properties.update(new_attr)
  125. data.append(properties)
  126. # 获取总量
  127. total_query = (
  128. f"MATCH (n:data_standard) WHERE {where_str} "
  129. "RETURN COUNT(n) AS total"
  130. )
  131. total_record = session.run(total_query, **params).single()
  132. total = total_record["total"] if total_record else 0
  133. return data, total
  134. except (ConnectionError, ValueError) as e:
  135. logger.error(f"Neo4j数据库连接失败: {str(e)}")
  136. return [], 0
  137. finally:
  138. if driver:
  139. driver.close()
  140. # 数据标准图谱展示(血缘关系)父节点
  141. def standard_kinship_graph(nodeid):
  142. """
  143. 生成数据标准的血缘关系图谱
  144. Args:
  145. nodeid: 节点ID
  146. Returns:
  147. 图谱数据
  148. """
  149. # 查询语句
  150. cql = """
  151. MATCH(da:data_standard)
  152. WHERE id(da)=$nodeId
  153. OPTIONAL MATCH(a:DataResource)-[:clean_resource]-(da)
  154. OPTIONAL MATCH(b:DataModel)-[:clean_model]-(da)
  155. WITH
  156. collect({
  157. id:toString(id(a)),
  158. text:a.name,
  159. type:split(labels(a)[0],'_')[1]
  160. })+
  161. collect({
  162. id:toString(id(b)),
  163. text:b.name,
  164. type:split(labels(b)[0],'_')[1]
  165. })+
  166. collect({
  167. id:toString(id(da)),
  168. text:da.name,
  169. type:split(labels(da)[0],'_')[1]
  170. }) as nodes,
  171. da,
  172. collect({from:toString(id(a)),to:toString(id(da)),text:'标准'})+
  173. collect({from:toString(id(b)),to:toString(id(da)),text:'标准'}) as lines
  174. WITH
  175. toString(id(da)) as rootId,
  176. apoc.coll.toSet(lines) as lines,
  177. apoc.coll.toSet(nodes) as nodes
  178. RETURN nodes,lines,rootId
  179. """
  180. driver = None
  181. try:
  182. driver = connect_graph()
  183. with driver.session() as session:
  184. result = session.run(cql, nodeId=nodeid)
  185. res = {}
  186. for item in result:
  187. res = {
  188. "nodes": [
  189. record for record in item['nodes'] if record['id']
  190. ],
  191. "lines": [
  192. record
  193. for record in item['lines']
  194. if record['from'] and record['to']
  195. ],
  196. "rootId": item['rootId']
  197. }
  198. return res
  199. except (ConnectionError, ValueError) as e:
  200. logger.error(f"Neo4j数据库连接失败: {str(e)}")
  201. return {}
  202. finally:
  203. if driver:
  204. driver.close()
  205. # 数据标准图谱展示(影响关系)下游
  206. def standard_impact_graph(nodeid):
  207. """
  208. 生成数据标准的影响关系图谱
  209. Args:
  210. nodeid: 节点ID
  211. Returns:
  212. 图谱数据
  213. """
  214. # 查询语句
  215. cql = """
  216. MATCH(da:data_standard)
  217. WHERE id(da)=$nodeId
  218. OPTIONAL MATCH(da)-[:clean_model]-(m1:DataMeta)-[:clean_model]-(da)
  219. OPTIONAL MATCH(da)-[:clean_model]-(m2:DataMeta)-[:clean_model]-(da)
  220. WITH
  221. collect({
  222. id:toString(id(da)),
  223. text:da.name,
  224. type:split(labels(da)[0],'_')[1]
  225. })+
  226. collect({id:toString(id(m1)),text:m1.name})+
  227. collect({id:toString(id(m2)),text:m2.name}) as nodes,
  228. da,
  229. collect({
  230. from:toString(id(da)),
  231. to:toString(id(m1)),
  232. text:'标准清洗'
  233. })+
  234. collect({
  235. from:toString(id(da)),
  236. to:toString(id(m2)),
  237. text:'标准清洗'
  238. }) as lines
  239. WITH
  240. toString(id(da)) as rootId,
  241. apoc.coll.toSet(lines) as lines,
  242. apoc.coll.toSet(nodes) as nodes
  243. RETURN nodes,lines,rootId
  244. """
  245. driver = None
  246. try:
  247. driver = connect_graph()
  248. with driver.session() as session:
  249. result = session.run(cql, nodeId=nodeid)
  250. res = {}
  251. for item in result:
  252. res = {
  253. "nodes": [
  254. record for record in item['nodes'] if record['id']
  255. ],
  256. "lines": [
  257. record
  258. for record in item['lines']
  259. if record['from'] and record['to']
  260. ],
  261. "rootId": item['rootId']
  262. }
  263. return res
  264. except (ConnectionError, ValueError) as e:
  265. logger.error(f"Neo4j数据库连接失败: {str(e)}")
  266. return {}
  267. finally:
  268. if driver:
  269. driver.close()
  270. # 数据标准图谱展示(所有关系)
  271. def standard_all_graph(nodeid):
  272. """
  273. 生成数据标准的所有关系图谱
  274. Args:
  275. nodeid: 节点ID
  276. Returns:
  277. 图谱数据
  278. """
  279. # 查询语句
  280. cql = """
  281. MATCH(da:data_standard)
  282. WHERE id(da)=$nodeId
  283. OPTIONAL MATCH(a:DataResource)-[:clean_resource]-(da)
  284. OPTIONAL MATCH(b:DataModel)-[:clean_model]-(da)
  285. OPTIONAL MATCH(da)-[:clean_model]-(m1:DataMeta)-[:clean_model]-(da)
  286. OPTIONAL MATCH(da)-[:clean_model]-(m2:DataMeta)-[:clean_model]-(da)
  287. WITH
  288. collect({
  289. id:toString(id(a)),
  290. text:a.name,
  291. type:split(labels(a)[0],'_')[1]
  292. })+
  293. collect({
  294. id:toString(id(b)),
  295. text:b.name,
  296. type:split(labels(b)[0],'_')[1]
  297. })+
  298. collect({
  299. id:toString(id(da)),
  300. text:da.name,
  301. type:split(labels(da)[0],'_')[1]
  302. })+
  303. collect({id:toString(id(m1)),text:m1.name})+
  304. collect({id:toString(id(m2)),text:m2.name}) as nodes,
  305. da,
  306. collect({from:toString(id(a)),to:toString(id(da)),text:'标准'})+
  307. collect({from:toString(id(b)),to:toString(id(da)),text:'标准'})+
  308. collect({
  309. from:toString(id(da)),
  310. to:toString(id(m1)),
  311. text:'标准清洗'
  312. })+
  313. collect({
  314. from:toString(id(da)),
  315. to:toString(id(m2)),
  316. text:'标准清洗'
  317. }) as lines
  318. WITH
  319. toString(id(da)) as rootId,
  320. apoc.coll.toSet(lines) as lines,
  321. apoc.coll.toSet(nodes) as nodes
  322. RETURN nodes,lines,rootId
  323. """
  324. driver = None
  325. try:
  326. driver = connect_graph()
  327. with driver.session() as session:
  328. result = session.run(cql, nodeId=nodeid)
  329. res = {}
  330. for item in result:
  331. res = {
  332. "nodes": [
  333. record for record in item['nodes'] if record['id']
  334. ],
  335. "lines": [
  336. record
  337. for record in item['lines']
  338. if record['from'] and record['to']
  339. ],
  340. "rootId": item['rootId']
  341. }
  342. return res
  343. except (ConnectionError, ValueError) as e:
  344. logger.error(f"Neo4j数据库连接失败: {str(e)}")
  345. return {}
  346. finally:
  347. if driver:
  348. driver.close()
  349. # 数据标签列表展示
  350. def label_list(
  351. skip_count,
  352. page_size,
  353. name_en_filter=None,
  354. name_zh_filter=None,
  355. category_filter=None,
  356. group_filter=None,
  357. ):
  358. """
  359. 获取数据标签列表
  360. Args:
  361. skip_count: 跳过的记录数量
  362. page_size: 每页记录数量
  363. name_en_filter: 英文名称过滤条件
  364. name_zh_filter: 名称过滤条件
  365. category_filter: 分类过滤条件
  366. group_filter: 分组过滤条件
  367. Returns:
  368. tuple: (数据列表, 总记录数)
  369. """
  370. data = []
  371. # 构建查询条件
  372. where_clause = []
  373. params = {}
  374. if name_zh_filter:
  375. where_clause.append("n.name_zh CONTAINS $name_zh_filter")
  376. params['name_zh_filter'] = name_zh_filter
  377. if name_en_filter:
  378. where_clause.append("n.name_en CONTAINS $name_en_filter")
  379. params['name_en_filter'] = name_en_filter
  380. where_clause.extend(
  381. _build_category_filter_conditions(category_filter, params)
  382. )
  383. if group_filter:
  384. where_clause.append("n.group CONTAINS $group_filter")
  385. params['group_filter'] = group_filter
  386. if not where_clause:
  387. where_clause.append("TRUE")
  388. where_str = " AND ".join(where_clause)
  389. # 构建完整的查询语句
  390. cql = f"""
  391. MATCH (n:DataLabel)
  392. WHERE {where_str}
  393. WITH
  394. n,
  395. properties(n) as properties,
  396. n.create_time as create_time,
  397. id(n) as nodeid
  398. OPTIONAL MATCH (n)<-[r]-()
  399. WITH
  400. n,
  401. properties,
  402. create_time,
  403. nodeid,
  404. count(r) as incoming
  405. OPTIONAL MATCH (n)-[r]->()
  406. WITH
  407. n,
  408. properties,
  409. create_time,
  410. nodeid,
  411. incoming,
  412. count(r) as outgoing
  413. RETURN
  414. properties,
  415. create_time,
  416. nodeid,
  417. incoming + outgoing as relationship_count
  418. ORDER BY create_time desc
  419. SKIP $skip_count
  420. LIMIT $page_size
  421. """
  422. params['skip_count'] = skip_count
  423. params['page_size'] = page_size
  424. driver = None
  425. try:
  426. driver = connect_graph()
  427. with driver.session() as session:
  428. result = session.run(cql, **params)
  429. for record in result:
  430. properties = record['properties']
  431. new_attr = {
  432. 'id': record['nodeid'],
  433. 'number': record['relationship_count']
  434. }
  435. if "describe" not in properties:
  436. properties["describe"] = None
  437. if "scope" not in properties:
  438. properties["scope"] = None
  439. properties.update(new_attr)
  440. data.append(properties)
  441. # 获取总量
  442. total_query = (
  443. f"MATCH (n:DataLabel) WHERE {where_str} "
  444. "RETURN COUNT(n) AS total"
  445. )
  446. total_record = session.run(total_query, **params).single()
  447. total = total_record["total"] if total_record else 0
  448. return data, total
  449. except (ConnectionError, ValueError) as e:
  450. logger.error(f"Neo4j数据库连接失败: {str(e)}")
  451. return [], 0
  452. finally:
  453. if driver:
  454. driver.close()
  455. # 数据标签图谱展示
  456. def id_label_graph(id):
  457. """
  458. 根据ID生成数据标签图谱
  459. Args:
  460. id: 节点ID
  461. Returns:
  462. 图谱数据
  463. """
  464. query = """
  465. MATCH (n:DataLabel)
  466. WHERE id(n) = $nodeId
  467. OPTIONAL MATCH (a)-[:LABEL]-(n)
  468. WITH
  469. collect({
  470. from: toString(id(a)),
  471. to: toString(id(n)),
  472. text: "标签"
  473. }) AS line1,
  474. collect({
  475. id: toString(id(n)),
  476. text: n.name_zh,
  477. type:"label"
  478. }) AS node1,
  479. collect({
  480. id: toString(id(a)),
  481. text: a.name_zh,
  482. type: split(labels(a)[0], '_')[1]
  483. }) AS node2,
  484. n
  485. WITH
  486. apoc.coll.toSet(line1) AS lines,
  487. apoc.coll.toSet(node1 + node2) AS nodes,
  488. toString(id(n)) AS res
  489. RETURN lines, nodes, res
  490. """
  491. driver = None
  492. try:
  493. driver = connect_graph()
  494. with driver.session() as session:
  495. result = session.run(query, nodeId=id)
  496. res = {}
  497. for item in result:
  498. res = {
  499. "nodes": [
  500. record for record in item['nodes'] if record['id']
  501. ],
  502. "lines": [
  503. record
  504. for record in item['lines']
  505. if record['from'] and record['to']
  506. ],
  507. "rootId": item['res'],
  508. }
  509. return res
  510. except (ConnectionError, ValueError) as e:
  511. logger.error(f"Neo4j数据库连接失败: {str(e)}")
  512. return {}
  513. finally:
  514. if driver:
  515. driver.close()
  516. # 数据标签图谱展示(血缘关系)父节点/(所有关系)
  517. def label_kinship_graph(nodeid):
  518. """
  519. 生成数据标签的血缘关系图谱
  520. Args:
  521. nodeid: 节点ID
  522. Returns:
  523. 图谱数据
  524. """
  525. # 查询语句
  526. cql = """
  527. MATCH(la:DataLabel)
  528. WHERE id(la)=$nodeId
  529. OPTIONAL MATCH(a:DataResource)-[:LABEL]-(la)
  530. OPTIONAL MATCH(b:DataModel)-[:LABEL]-(la)
  531. OPTIONAL MATCH(meta:DataMeta)-[:LABEL]-(la)
  532. OPTIONAL MATCH(d:data_standard)-[:LABEL]-(la)
  533. OPTIONAL MATCH(e:DataMetric)-[:LABEL]-(la)
  534. WITH
  535. collect({
  536. id:toString(id(a)),
  537. text:a.name_zh,
  538. type:split(labels(a)[0],'_')[1]
  539. })+
  540. collect({
  541. id:toString(id(b)),
  542. text:b.name_zh,
  543. type:split(labels(b)[0],'_')[1]
  544. })+
  545. collect({
  546. id:toString(id(d)),
  547. text:d.name_zh,
  548. type:split(labels(d)[0],'_')[1]
  549. })+
  550. collect({
  551. id:toString(id(e)),
  552. text:e.name_zh,
  553. type:split(labels(e)[0],'_')[1]
  554. })+
  555. collect({
  556. id:toString(id(la)),
  557. text:la.name_zh,
  558. type:split(labels(la)[0],'_')[1]
  559. })+
  560. collect({id:toString(id(meta)),text:meta.name_zh}) as nodes,
  561. la,
  562. collect({from:toString(id(a)),to:toString(id(la)),text:'标签'})+
  563. collect({from:toString(id(b)),to:toString(id(la)),text:'标签'})+
  564. collect({from:toString(id(meta)),to:toString(id(la)),text:'标签'})+
  565. collect({from:toString(id(d)),to:toString(id(la)),text:'标签'})+
  566. collect({from:toString(id(e)),to:toString(id(la)),text:'标签'}) as lines
  567. WITH
  568. toString(id(la)) as rootId,
  569. apoc.coll.toSet(lines) as lines,
  570. apoc.coll.toSet(nodes) as nodes
  571. RETURN nodes,lines,rootId
  572. """
  573. driver = None
  574. try:
  575. driver = connect_graph()
  576. with driver.session() as session:
  577. result = session.run(cql, nodeId=nodeid)
  578. res = {}
  579. for item in result:
  580. res = {
  581. "nodes": [
  582. record for record in item['nodes'] if record['id']
  583. ],
  584. "lines": [
  585. record
  586. for record in item['lines']
  587. if record['from'] and record['to']
  588. ],
  589. "rootId": item['rootId']
  590. }
  591. return res
  592. except (ConnectionError, ValueError) as e:
  593. logger.error(f"Neo4j数据库连接失败: {str(e)}")
  594. return {}
  595. finally:
  596. if driver:
  597. driver.close()
  598. # 数据标签图谱展示(影响关系)下游
  599. def label_impact_graph(nodeid):
  600. """
  601. 生成数据标签的影响关系图谱
  602. Args:
  603. nodeid: 节点ID
  604. Returns:
  605. 图谱数据
  606. """
  607. # 查询语句
  608. cql = """
  609. MATCH(n:DataLabel)
  610. WHERE id(n)=$nodeId
  611. RETURN {
  612. id:toString(id(n)),
  613. text:(n.name_zh),
  614. type:"label"
  615. } AS nodes,
  616. toString(id(n)) as rootId
  617. """
  618. driver = None
  619. try:
  620. driver = connect_graph()
  621. with driver.session() as session:
  622. result = session.run(cql, nodeId=nodeid)
  623. res = {}
  624. for item in result:
  625. res = {
  626. "nodes": item['nodes'],
  627. "rootId": item['rootId'],
  628. "lines": []
  629. }
  630. return res
  631. except (ConnectionError, ValueError) as e:
  632. logger.error(f"Neo4j数据库连接失败: {str(e)}")
  633. return {}
  634. finally:
  635. if driver:
  636. driver.close()
  637. # 数据标签按照提交内容查询相似分组,并且返回
  638. def dynamic_label_list(name_filter=None):
  639. """
  640. 根据内容查询相似的数据标签分组
  641. Args:
  642. name_filter: 内容过滤条件
  643. Returns:
  644. 标签分组列表
  645. """
  646. # 构建完整的查询语句
  647. cql = """
  648. MATCH (n:DataLabel)
  649. WITH
  650. n,
  651. apoc.text.levenshteinSimilarity(n.group, $name_filter) AS similarity
  652. WHERE similarity > 0.1 // 设置相似度阈值
  653. RETURN DISTINCT n.group as name_zh, id(n) as nodeid
  654. """
  655. driver = None
  656. try:
  657. driver = connect_graph()
  658. with driver.session() as session:
  659. result = session.run(cql, name_filter=name_filter or "")
  660. data = []
  661. for record in result:
  662. data.append(
  663. {
  664. "name_zh": record['name_zh'],
  665. "id": record['nodeid'],
  666. }
  667. )
  668. return data
  669. except (ConnectionError, ValueError) as e:
  670. logger.error(f"Neo4j数据库连接失败: {str(e)}")
  671. return []
  672. finally:
  673. if driver:
  674. driver.close()
  675. def search_info(key, value):
  676. """
  677. 搜索指定属性的节点信息
  678. Args:
  679. key: 搜索属性键
  680. value: 搜索属性值
  681. Returns:
  682. 搜索结果列表
  683. """
  684. field_pattern = r"^[A-Za-z_][A-Za-z0-9_]*$"
  685. if not re.match(field_pattern, str(key)):
  686. logger.warning("非法属性键: %s", key)
  687. return []
  688. query = """
  689. MATCH (n)
  690. WHERE n[$field] =~ $pattern
  691. WITH
  692. n,
  693. properties(n) as properties,
  694. n.create_time as create_time,
  695. id(n) as nodeid
  696. RETURN properties, nodeid, create_time, labels(n) as labels
  697. LIMIT 30
  698. """
  699. driver = None
  700. try:
  701. driver = connect_graph()
  702. except (ConnectionError, ValueError) as e:
  703. logger.error(f"无法连接到Neo4j数据库: {str(e)}")
  704. return []
  705. try:
  706. with driver.session() as session:
  707. result = session.run(
  708. query,
  709. field=key,
  710. pattern=f"(?i).*{value}.*",
  711. )
  712. results = []
  713. for record in result:
  714. results.append(
  715. {
  716. "properties": record["properties"],
  717. "id": record["nodeid"],
  718. "create_time": record["create_time"],
  719. "labels": record["labels"],
  720. }
  721. )
  722. return results
  723. except Exception as e:
  724. logger.error(f"搜索节点信息失败: {str(e)}")
  725. return []
  726. finally:
  727. if driver:
  728. driver.close()
  729. def label_info(id):
  730. """
  731. 获取标签节点的信息
  732. Args:
  733. id: 节点ID
  734. Returns:
  735. 标签节点信息
  736. """
  737. query = """
  738. MATCH (n)
  739. WHERE id(n) = $nodeId
  740. RETURN {
  741. id:toString(id(n)),
  742. text:(n.name_zh),
  743. type:"label"
  744. } AS nodes,
  745. toString(id(n)) as rootId
  746. """
  747. driver = None
  748. try:
  749. driver = connect_graph()
  750. with driver.session() as session:
  751. result = session.run(query, nodeId=id).data()
  752. return result[0] if result else {}
  753. except (ConnectionError, ValueError) as e:
  754. logger.error(f"无法连接到Neo4j数据库: {str(e)}")
  755. return {}
  756. finally:
  757. if driver:
  758. driver.close()
  759. def graph_all(domain_id, include_meta=True):
  760. """
  761. 获取完整关系图谱
  762. Args:
  763. domain_id: 节点ID
  764. include_meta: 是否包含元数据节点
  765. Returns:
  766. dict: 包含 nodes 与 lines 的图谱数据
  767. """
  768. try:
  769. domain_id_int = int(domain_id)
  770. except (ValueError, TypeError):
  771. logger.error(f"节点ID不是有效的整数: {domain_id}")
  772. return {"nodes": [], "lines": []}
  773. try:
  774. with neo4j_driver.get_session() as session:
  775. nodes = {}
  776. lines = {}
  777. # 使用路径查询同时获取节点和关系
  778. if include_meta:
  779. cypher = """
  780. MATCH (n)
  781. WHERE id(n) = $domain_id
  782. OPTIONAL MATCH path = (n)-[r]-(m)
  783. RETURN n,
  784. collect(DISTINCT m) as related_nodes,
  785. collect(DISTINCT r) as relationships
  786. """
  787. else:
  788. cypher = """
  789. MATCH (n)
  790. WHERE id(n) = $domain_id
  791. OPTIONAL MATCH path = (n)-[r]-(m)
  792. WHERE NOT (m:DataMeta)
  793. RETURN n,
  794. collect(DISTINCT m) as related_nodes,
  795. collect(DISTINCT r) as relationships
  796. """
  797. result = session.run(cypher, domain_id=domain_id_int)
  798. record = result.single()
  799. if not record:
  800. logger.warning(f"未找到节点: {domain_id_int}")
  801. return {"nodes": [], "lines": []}
  802. # 处理起始节点
  803. n_node = record["n"]
  804. if n_node:
  805. n_props = dict(n_node)
  806. n_labels = list(n_node.labels)
  807. n_props["id"] = domain_id_int
  808. n_props["node_type"] = n_labels[0] if n_labels else ""
  809. nodes[domain_id_int] = n_props
  810. # 处理关联节点
  811. related_nodes = record["related_nodes"] or []
  812. for m_node in related_nodes:
  813. if m_node is None:
  814. continue
  815. m_elem_id = m_node.element_id
  816. m_id = int(m_elem_id.split(":")[-1])
  817. if m_id not in nodes:
  818. m_props = dict(m_node)
  819. m_labels = list(m_node.labels)
  820. m_props["id"] = m_id
  821. m_props["node_type"] = m_labels[0] if m_labels else ""
  822. nodes[m_id] = m_props
  823. # 处理关系
  824. relationships = record["relationships"] or []
  825. for rel in relationships:
  826. if rel is None:
  827. continue
  828. rel_elem_id = rel.element_id
  829. rel_id = rel_elem_id.split(":")[-1]
  830. if rel_id not in lines:
  831. # 获取关系的起始和结束节点 ID
  832. start_elem_id = rel.start_node.element_id
  833. end_elem_id = rel.end_node.element_id
  834. start_id = start_elem_id.split(":")[-1]
  835. end_id = end_elem_id.split(":")[-1]
  836. # 获取关系类型
  837. rel_type = type(rel).__name__
  838. lines[rel_id] = {
  839. "id": rel_id,
  840. "from": start_id,
  841. "to": end_id,
  842. "text": rel_type,
  843. }
  844. logger.info(
  845. f"graph_all 结果: node_id={domain_id_int}, "
  846. f"nodes={len(nodes)}, lines={len(lines)}"
  847. )
  848. return {
  849. "nodes": list(nodes.values()),
  850. "lines": list(lines.values()),
  851. }
  852. except Exception as e:
  853. logger.error(f"获取图谱失败: {str(e)}")
  854. import traceback
  855. logger.error(traceback.format_exc())
  856. return {"nodes": [], "lines": []}
  857. def node_delete(node_id):
  858. """
  859. 删除 DataLabel 节点及其所有关联关系
  860. Args:
  861. node_id: 节点ID(整数)
  862. Returns:
  863. dict: 删除结果,包含 success 状态和 message 信息
  864. """
  865. driver = None
  866. try:
  867. driver = connect_graph()
  868. except (ConnectionError, ValueError) as e:
  869. logger.error(f"无法连接到Neo4j数据库: {str(e)}")
  870. return {"success": False, "message": "无法连接到数据库"}
  871. try:
  872. with driver.session() as session:
  873. # 首先检查节点是否存在且为 DataLabel 类型
  874. check_query = """
  875. MATCH (n:DataLabel)
  876. WHERE id(n) = $nodeId
  877. RETURN n
  878. """
  879. check_result = session.run(
  880. check_query,
  881. nodeId=node_id,
  882. ).single()
  883. if not check_result:
  884. logger.warning(f"DataLabel 节点不存在: ID={node_id}")
  885. return {
  886. "success": False,
  887. "message": f"DataLabel 节点不存在 (ID: {node_id})",
  888. }
  889. # 删除节点及其所有关系
  890. delete_query = """
  891. MATCH (n:DataLabel)
  892. WHERE id(n) = $nodeId
  893. DETACH DELETE n
  894. RETURN count(n) as deleted_count
  895. """
  896. delete_result = session.run(
  897. delete_query,
  898. nodeId=node_id,
  899. ).single()
  900. if not delete_result:
  901. logger.warning(f"删除结果为空: ID={node_id}")
  902. return {
  903. "success": False,
  904. "message": "删除失败,未获取到删除结果",
  905. }
  906. deleted_count = delete_result["deleted_count"]
  907. if deleted_count > 0:
  908. logger.info(f"成功删除 DataLabel 节点: ID={node_id}")
  909. return {
  910. "success": True,
  911. "message": (
  912. f"成功删除 DataLabel 节点 (ID: {node_id})"
  913. ),
  914. }
  915. else:
  916. logger.warning(f"删除失败,节点可能已被删除: ID={node_id}")
  917. return {"success": False, "message": "删除失败,节点可能已被删除"}
  918. except Exception as e:
  919. logger.error(f"删除 DataLabel 节点失败: {str(e)}")
  920. return {"success": False, "message": f"删除失败: {str(e)}"}
  921. finally:
  922. if driver:
  923. driver.close()