interface.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014
  1. """
  2. 数据接口核心业务逻辑模块
  3. 本模块包含了数据接口相关的所有核心业务逻辑函数,包括:
  4. - 数据标准(data_standard)相关功能
  5. - 数据标签(DataLabel)相关功能
  6. - 图谱生成
  7. - 动态标签识别等功能
  8. """
  9. import logging
  10. import re
  11. from app.core.graph.graph_operations import connect_graph
  12. from app.services.neo4j_driver import neo4j_driver
  13. # 配置logger
  14. logger = logging.getLogger(__name__)
  15. def _build_category_filter_conditions(category_filter, params):
  16. """
  17. 将 category_filter 转换为 Cypher 查询条件列表。
  18. 支持:
  19. - 字典: {field: value, ...}
  20. - 列表: [{"field": "...", "value": "..."}, {"category": "xxx"}]
  21. - 字符串: 兼容旧用法,等同于按 category 字段过滤
  22. """
  23. conditions = []
  24. param_index = 0
  25. def add_condition(field, value):
  26. nonlocal param_index
  27. if value is None:
  28. return
  29. if not isinstance(field, str):
  30. return
  31. if not re.match(r"^[A-Za-z_][A-Za-z0-9_]*$", field):
  32. logger.warning(f"忽略非法属性字段: {field}")
  33. return
  34. param_key = f"category_filter_{param_index}"
  35. param_index += 1
  36. conditions.append(f"n.{field} CONTAINS ${param_key}")
  37. params[param_key] = value
  38. if isinstance(category_filter, dict):
  39. for field, value in category_filter.items():
  40. add_condition(field, value)
  41. elif isinstance(category_filter, list):
  42. for item in category_filter:
  43. if not isinstance(item, dict):
  44. continue
  45. if "field" in item and "value" in item:
  46. add_condition(item.get("field"), item.get("value"))
  47. elif len(item) == 1:
  48. field, value = next(iter(item.items()))
  49. add_condition(field, value)
  50. elif category_filter:
  51. add_condition("category", category_filter)
  52. return conditions
  53. # 数据标准列表展示
  54. def standard_list(
  55. skip_count,
  56. page_size,
  57. name_en_filter=None,
  58. name_zh_filter=None,
  59. category_filter=None,
  60. create_time_filter=None,
  61. ):
  62. """
  63. 获取数据标准列表
  64. Args:
  65. skip_count: 跳过的记录数量
  66. page_size: 每页记录数量
  67. name_en_filter: 英文名称过滤条件
  68. name_zh_filter: 名称过滤条件
  69. category_filter: 分类过滤条件
  70. create_time_filter: 时间过滤条件
  71. Returns:
  72. tuple: (数据列表, 总记录数)
  73. """
  74. data = []
  75. # 构建查询条件
  76. where_clause = []
  77. params = {}
  78. if name_zh_filter:
  79. where_clause.append("n.name_zh CONTAINS $name_zh_filter")
  80. params['name_zh_filter'] = name_zh_filter
  81. if name_en_filter:
  82. where_clause.append("n.name_en CONTAINS $name_en_filter")
  83. params['name_en_filter'] = name_en_filter
  84. if category_filter:
  85. where_clause.append("n.category CONTAINS $category_filter")
  86. params['category_filter'] = category_filter
  87. if create_time_filter:
  88. where_clause.append("n.create_time CONTAINS $create_time_filter")
  89. params['create_time_filter'] = create_time_filter
  90. else:
  91. where_clause.append("TRUE")
  92. where_str = " AND ".join(where_clause)
  93. # 构建完整的查询语句
  94. cql = f"""
  95. MATCH (n:data_standard)
  96. WHERE {where_str}
  97. RETURN
  98. properties(n) as properties,
  99. n.create_time as create_time,
  100. id(n) as nodeid,
  101. size([(n)<-[]-() | 1]) + size([(n)-[]->() | 1]) as relationship_count
  102. ORDER BY create_time desc
  103. SKIP $skip_count
  104. LIMIT $page_size
  105. """
  106. params['skip_count'] = skip_count
  107. params['page_size'] = page_size
  108. # 修复:使用正确的session方式执行查询
  109. driver = None
  110. try:
  111. driver = connect_graph()
  112. with driver.session() as session:
  113. result = session.run(cql, **params)
  114. for record in result:
  115. properties = {
  116. key: value for key, value in record['properties'].items()
  117. if key not in ['input', 'code', 'output']
  118. }
  119. properties.setdefault("describe", None)
  120. new_attr = {
  121. 'id': record['nodeid'],
  122. 'number': record['relationship_count']
  123. }
  124. properties.update(new_attr)
  125. data.append(properties)
  126. # 获取总量
  127. total_query = (
  128. f"MATCH (n:data_standard) WHERE {where_str} "
  129. "RETURN COUNT(n) AS total"
  130. )
  131. total_record = session.run(total_query, **params).single()
  132. total = total_record["total"] if total_record else 0
  133. return data, total
  134. except (ConnectionError, ValueError) as e:
  135. logger.error(f"Neo4j数据库连接失败: {str(e)}")
  136. return [], 0
  137. finally:
  138. if driver:
  139. driver.close()
  140. # 数据标准图谱展示(血缘关系)父节点
  141. def standard_kinship_graph(nodeid):
  142. """
  143. 生成数据标准的血缘关系图谱
  144. Args:
  145. nodeid: 节点ID
  146. Returns:
  147. 图谱数据
  148. """
  149. # 查询语句
  150. cql = """
  151. MATCH(da:data_standard)
  152. WHERE id(da)=$nodeId
  153. OPTIONAL MATCH(a:DataResource)-[:clean_resource]-(da)
  154. OPTIONAL MATCH(b:DataModel)-[:clean_model]-(da)
  155. WITH
  156. collect({
  157. id:toString(id(a)),
  158. text:a.name,
  159. type:split(labels(a)[0],'_')[1]
  160. })+
  161. collect({
  162. id:toString(id(b)),
  163. text:b.name,
  164. type:split(labels(b)[0],'_')[1]
  165. })+
  166. collect({
  167. id:toString(id(da)),
  168. text:da.name,
  169. type:split(labels(da)[0],'_')[1]
  170. }) as nodes,
  171. da,
  172. collect({from:toString(id(a)),to:toString(id(da)),text:'标准'})+
  173. collect({from:toString(id(b)),to:toString(id(da)),text:'标准'}) as lines
  174. WITH
  175. toString(id(da)) as rootId,
  176. apoc.coll.toSet(lines) as lines,
  177. apoc.coll.toSet(nodes) as nodes
  178. RETURN nodes,lines,rootId
  179. """
  180. driver = None
  181. try:
  182. driver = connect_graph()
  183. with driver.session() as session:
  184. result = session.run(cql, nodeId=nodeid)
  185. res = {}
  186. for item in result:
  187. res = {
  188. "nodes": [
  189. record for record in item['nodes'] if record['id']
  190. ],
  191. "lines": [
  192. record
  193. for record in item['lines']
  194. if record['from'] and record['to']
  195. ],
  196. "rootId": item['rootId']
  197. }
  198. return res
  199. except (ConnectionError, ValueError) as e:
  200. logger.error(f"Neo4j数据库连接失败: {str(e)}")
  201. return {}
  202. finally:
  203. if driver:
  204. driver.close()
  205. # 数据标准图谱展示(影响关系)下游
  206. def standard_impact_graph(nodeid):
  207. """
  208. 生成数据标准的影响关系图谱
  209. Args:
  210. nodeid: 节点ID
  211. Returns:
  212. 图谱数据
  213. """
  214. # 查询语句
  215. cql = """
  216. MATCH(da:data_standard)
  217. WHERE id(da)=$nodeId
  218. OPTIONAL MATCH(da)-[:clean_model]-(m1:DataMeta)-[:clean_model]-(da)
  219. OPTIONAL MATCH(da)-[:clean_model]-(m2:DataMeta)-[:clean_model]-(da)
  220. WITH
  221. collect({
  222. id:toString(id(da)),
  223. text:da.name,
  224. type:split(labels(da)[0],'_')[1]
  225. })+
  226. collect({id:toString(id(m1)),text:m1.name})+
  227. collect({id:toString(id(m2)),text:m2.name}) as nodes,
  228. da,
  229. collect({
  230. from:toString(id(da)),
  231. to:toString(id(m1)),
  232. text:'标准清洗'
  233. })+
  234. collect({
  235. from:toString(id(da)),
  236. to:toString(id(m2)),
  237. text:'标准清洗'
  238. }) as lines
  239. WITH
  240. toString(id(da)) as rootId,
  241. apoc.coll.toSet(lines) as lines,
  242. apoc.coll.toSet(nodes) as nodes
  243. RETURN nodes,lines,rootId
  244. """
  245. driver = None
  246. try:
  247. driver = connect_graph()
  248. with driver.session() as session:
  249. result = session.run(cql, nodeId=nodeid)
  250. res = {}
  251. for item in result:
  252. res = {
  253. "nodes": [
  254. record for record in item['nodes'] if record['id']
  255. ],
  256. "lines": [
  257. record
  258. for record in item['lines']
  259. if record['from'] and record['to']
  260. ],
  261. "rootId": item['rootId']
  262. }
  263. return res
  264. except (ConnectionError, ValueError) as e:
  265. logger.error(f"Neo4j数据库连接失败: {str(e)}")
  266. return {}
  267. finally:
  268. if driver:
  269. driver.close()
  270. # 数据标准图谱展示(所有关系)
  271. def standard_all_graph(nodeid):
  272. """
  273. 生成数据标准的所有关系图谱
  274. Args:
  275. nodeid: 节点ID
  276. Returns:
  277. 图谱数据
  278. """
  279. # 查询语句
  280. cql = """
  281. MATCH(da:data_standard)
  282. WHERE id(da)=$nodeId
  283. OPTIONAL MATCH(a:DataResource)-[:clean_resource]-(da)
  284. OPTIONAL MATCH(b:DataModel)-[:clean_model]-(da)
  285. OPTIONAL MATCH(da)-[:clean_model]-(m1:DataMeta)-[:clean_model]-(da)
  286. OPTIONAL MATCH(da)-[:clean_model]-(m2:DataMeta)-[:clean_model]-(da)
  287. WITH
  288. collect({
  289. id:toString(id(a)),
  290. text:a.name,
  291. type:split(labels(a)[0],'_')[1]
  292. })+
  293. collect({
  294. id:toString(id(b)),
  295. text:b.name,
  296. type:split(labels(b)[0],'_')[1]
  297. })+
  298. collect({
  299. id:toString(id(da)),
  300. text:da.name,
  301. type:split(labels(da)[0],'_')[1]
  302. })+
  303. collect({id:toString(id(m1)),text:m1.name})+
  304. collect({id:toString(id(m2)),text:m2.name}) as nodes,
  305. da,
  306. collect({from:toString(id(a)),to:toString(id(da)),text:'标准'})+
  307. collect({from:toString(id(b)),to:toString(id(da)),text:'标准'})+
  308. collect({
  309. from:toString(id(da)),
  310. to:toString(id(m1)),
  311. text:'标准清洗'
  312. })+
  313. collect({
  314. from:toString(id(da)),
  315. to:toString(id(m2)),
  316. text:'标准清洗'
  317. }) as lines
  318. WITH
  319. toString(id(da)) as rootId,
  320. apoc.coll.toSet(lines) as lines,
  321. apoc.coll.toSet(nodes) as nodes
  322. RETURN nodes,lines,rootId
  323. """
  324. driver = None
  325. try:
  326. driver = connect_graph()
  327. with driver.session() as session:
  328. result = session.run(cql, nodeId=nodeid)
  329. res = {}
  330. for item in result:
  331. res = {
  332. "nodes": [
  333. record for record in item['nodes'] if record['id']
  334. ],
  335. "lines": [
  336. record
  337. for record in item['lines']
  338. if record['from'] and record['to']
  339. ],
  340. "rootId": item['rootId']
  341. }
  342. return res
  343. except (ConnectionError, ValueError) as e:
  344. logger.error(f"Neo4j数据库连接失败: {str(e)}")
  345. return {}
  346. finally:
  347. if driver:
  348. driver.close()
  349. # 数据标签列表展示
  350. def label_list(
  351. skip_count,
  352. page_size,
  353. name_en_filter=None,
  354. name_zh_filter=None,
  355. category_filter=None,
  356. group_filter=None,
  357. ):
  358. """
  359. 获取数据标签列表
  360. Args:
  361. skip_count: 跳过的记录数量
  362. page_size: 每页记录数量
  363. name_en_filter: 英文名称过滤条件
  364. name_zh_filter: 名称过滤条件
  365. category_filter: 分类过滤条件
  366. group_filter: 分组过滤条件
  367. Returns:
  368. tuple: (数据列表, 总记录数)
  369. """
  370. data = []
  371. # 构建查询条件
  372. where_clause = []
  373. params = {}
  374. if name_zh_filter:
  375. where_clause.append("n.name_zh CONTAINS $name_zh_filter")
  376. params['name_zh_filter'] = name_zh_filter
  377. if name_en_filter:
  378. where_clause.append("n.name_en CONTAINS $name_en_filter")
  379. params['name_en_filter'] = name_en_filter
  380. where_clause.extend(
  381. _build_category_filter_conditions(category_filter, params)
  382. )
  383. if group_filter:
  384. where_clause.append("n.group CONTAINS $group_filter")
  385. params['group_filter'] = group_filter
  386. if not where_clause:
  387. where_clause.append("TRUE")
  388. where_str = " AND ".join(where_clause)
  389. # 构建完整的查询语句
  390. cql = f"""
  391. MATCH (n:DataLabel)
  392. WHERE {where_str}
  393. WITH
  394. n,
  395. properties(n) as properties,
  396. n.create_time as create_time,
  397. id(n) as nodeid
  398. OPTIONAL MATCH (n)<-[r]-()
  399. WITH
  400. n,
  401. properties,
  402. create_time,
  403. nodeid,
  404. count(r) as incoming
  405. OPTIONAL MATCH (n)-[r]->()
  406. WITH
  407. n,
  408. properties,
  409. create_time,
  410. nodeid,
  411. incoming,
  412. count(r) as outgoing
  413. RETURN
  414. properties,
  415. create_time,
  416. nodeid,
  417. incoming + outgoing as relationship_count
  418. ORDER BY create_time desc
  419. SKIP $skip_count
  420. LIMIT $page_size
  421. """
  422. params['skip_count'] = skip_count
  423. params['page_size'] = page_size
  424. driver = None
  425. try:
  426. driver = connect_graph()
  427. with driver.session() as session:
  428. result = session.run(cql, **params)
  429. for record in result:
  430. properties = record['properties']
  431. new_attr = {
  432. 'id': record['nodeid'],
  433. 'number': record['relationship_count']
  434. }
  435. if "describe" not in properties:
  436. properties["describe"] = None
  437. if "scope" not in properties:
  438. properties["scope"] = None
  439. properties.update(new_attr)
  440. data.append(properties)
  441. # 获取总量
  442. total_query = (
  443. f"MATCH (n:DataLabel) WHERE {where_str} "
  444. "RETURN COUNT(n) AS total"
  445. )
  446. total_record = session.run(total_query, **params).single()
  447. total = total_record["total"] if total_record else 0
  448. return data, total
  449. except (ConnectionError, ValueError) as e:
  450. logger.error(f"Neo4j数据库连接失败: {str(e)}")
  451. return [], 0
  452. finally:
  453. if driver:
  454. driver.close()
  455. # 数据标签图谱展示
  456. def id_label_graph(id):
  457. """
  458. 根据ID生成数据标签图谱
  459. Args:
  460. id: 节点ID
  461. Returns:
  462. 图谱数据
  463. """
  464. query = """
  465. MATCH (n:DataLabel)
  466. WHERE id(n) = $nodeId
  467. OPTIONAL MATCH (a)-[:LABEL]-(n)
  468. WITH
  469. collect({
  470. from: toString(id(a)),
  471. to: toString(id(n)),
  472. text: "标签"
  473. }) AS line1,
  474. collect({
  475. id: toString(id(n)),
  476. text: n.name_zh,
  477. type:"label"
  478. }) AS node1,
  479. collect({
  480. id: toString(id(a)),
  481. text: a.name_zh,
  482. type: split(labels(a)[0], '_')[1]
  483. }) AS node2,
  484. n
  485. WITH
  486. apoc.coll.toSet(line1) AS lines,
  487. apoc.coll.toSet(node1 + node2) AS nodes,
  488. toString(id(n)) AS res
  489. RETURN lines, nodes, res
  490. """
  491. driver = None
  492. try:
  493. driver = connect_graph()
  494. with driver.session() as session:
  495. result = session.run(query, nodeId=id)
  496. res = {}
  497. for item in result:
  498. res = {
  499. "nodes": [
  500. record for record in item['nodes'] if record['id']
  501. ],
  502. "lines": [
  503. record
  504. for record in item['lines']
  505. if record['from'] and record['to']
  506. ],
  507. "rootId": item['res'],
  508. }
  509. return res
  510. except (ConnectionError, ValueError) as e:
  511. logger.error(f"Neo4j数据库连接失败: {str(e)}")
  512. return {}
  513. finally:
  514. if driver:
  515. driver.close()
  516. # 数据标签图谱展示(血缘关系)父节点/(所有关系)
  517. def label_kinship_graph(nodeid):
  518. """
  519. 生成数据标签的血缘关系图谱
  520. Args:
  521. nodeid: 节点ID
  522. Returns:
  523. 图谱数据
  524. """
  525. # 查询语句
  526. cql = """
  527. MATCH(la:DataLabel)
  528. WHERE id(la)=$nodeId
  529. OPTIONAL MATCH(a:DataResource)-[:LABEL]-(la)
  530. OPTIONAL MATCH(b:DataModel)-[:LABEL]-(la)
  531. OPTIONAL MATCH(meta:DataMeta)-[:LABEL]-(la)
  532. OPTIONAL MATCH(d:data_standard)-[:LABEL]-(la)
  533. WITH
  534. collect({
  535. id:toString(id(a)),
  536. text:a.name_zh,
  537. type:split(labels(a)[0],'_')[1]
  538. })+
  539. collect({
  540. id:toString(id(b)),
  541. text:b.name_zh,
  542. type:split(labels(b)[0],'_')[1]
  543. })+
  544. collect({
  545. id:toString(id(d)),
  546. text:d.name_zh,
  547. type:split(labels(e)[0],'_')[1]
  548. })+
  549. collect({
  550. id:toString(id(la)),
  551. text:la.name_zh,
  552. type:split(labels(la)[0],'_')[1]
  553. })+
  554. collect({id:toString(id(meta)),text:meta.name_zh}) as nodes,
  555. la,
  556. collect({from:toString(id(a)),to:toString(id(la)),text:'标签'})+
  557. collect({from:toString(id(b)),to:toString(id(la)),text:'标签'})+
  558. collect({from:toString(id(meta)),to:toString(id(la)),text:'标签'})+
  559. collect({from:toString(id(d)),to:toString(id(la)),text:'标签'})+
  560. collect({from:toString(id(e)),to:toString(id(la)),text:'标签'}) as lines
  561. WITH
  562. toString(id(la)) as rootId,
  563. apoc.coll.toSet(lines) as lines,
  564. apoc.coll.toSet(nodes) as nodes
  565. RETURN nodes,lines,rootId
  566. """
  567. driver = None
  568. try:
  569. driver = connect_graph()
  570. with driver.session() as session:
  571. result = session.run(cql, nodeId=nodeid)
  572. res = {}
  573. for item in result:
  574. res = {
  575. "nodes": [
  576. record for record in item['nodes'] if record['id']
  577. ],
  578. "lines": [
  579. record
  580. for record in item['lines']
  581. if record['from'] and record['to']
  582. ],
  583. "rootId": item['rootId']
  584. }
  585. return res
  586. except (ConnectionError, ValueError) as e:
  587. logger.error(f"Neo4j数据库连接失败: {str(e)}")
  588. return {}
  589. finally:
  590. if driver:
  591. driver.close()
  592. # 数据标签图谱展示(影响关系)下游
  593. def label_impact_graph(nodeid):
  594. """
  595. 生成数据标签的影响关系图谱
  596. Args:
  597. nodeid: 节点ID
  598. Returns:
  599. 图谱数据
  600. """
  601. # 查询语句
  602. cql = """
  603. MATCH(n:DataLabel)
  604. WHERE id(n)=$nodeId
  605. RETURN {
  606. id:toString(id(n)),
  607. text:(n.name_zh),
  608. type:"label"
  609. } AS nodes,
  610. toString(id(n)) as rootId
  611. """
  612. driver = None
  613. try:
  614. driver = connect_graph()
  615. with driver.session() as session:
  616. result = session.run(cql, nodeId=nodeid)
  617. res = {}
  618. for item in result:
  619. res = {
  620. "nodes": item['nodes'],
  621. "rootId": item['rootId'],
  622. "lines": []
  623. }
  624. return res
  625. except (ConnectionError, ValueError) as e:
  626. logger.error(f"Neo4j数据库连接失败: {str(e)}")
  627. return {}
  628. finally:
  629. if driver:
  630. driver.close()
  631. # 数据标签按照提交内容查询相似分组,并且返回
  632. def dynamic_label_list(name_filter=None):
  633. """
  634. 根据内容查询相似的数据标签分组
  635. Args:
  636. name_filter: 内容过滤条件
  637. Returns:
  638. 标签分组列表
  639. """
  640. # 构建完整的查询语句
  641. cql = """
  642. MATCH (n:DataLabel)
  643. WITH
  644. n,
  645. apoc.text.levenshteinSimilarity(n.group, $name_filter) AS similarity
  646. WHERE similarity > 0.1 // 设置相似度阈值
  647. RETURN DISTINCT n.group as name_zh, id(n) as nodeid
  648. """
  649. driver = None
  650. try:
  651. driver = connect_graph()
  652. with driver.session() as session:
  653. result = session.run(cql, name_filter=name_filter or "")
  654. data = []
  655. for record in result:
  656. data.append(
  657. {
  658. "name_zh": record['name_zh'],
  659. "id": record['nodeid'],
  660. }
  661. )
  662. return data
  663. except (ConnectionError, ValueError) as e:
  664. logger.error(f"Neo4j数据库连接失败: {str(e)}")
  665. return []
  666. finally:
  667. if driver:
  668. driver.close()
  669. def search_info(key, value):
  670. """
  671. 搜索指定属性的节点信息
  672. Args:
  673. key: 搜索属性键
  674. value: 搜索属性值
  675. Returns:
  676. 搜索结果列表
  677. """
  678. field_pattern = r"^[A-Za-z_][A-Za-z0-9_]*$"
  679. if not re.match(field_pattern, str(key)):
  680. logger.warning("非法属性键: %s", key)
  681. return []
  682. query = """
  683. MATCH (n)
  684. WHERE n[$field] =~ $pattern
  685. WITH
  686. n,
  687. properties(n) as properties,
  688. n.create_time as create_time,
  689. id(n) as nodeid
  690. RETURN properties, nodeid, create_time, labels(n) as labels
  691. LIMIT 30
  692. """
  693. driver = None
  694. try:
  695. driver = connect_graph()
  696. except (ConnectionError, ValueError) as e:
  697. logger.error(f"无法连接到Neo4j数据库: {str(e)}")
  698. return []
  699. try:
  700. with driver.session() as session:
  701. result = session.run(
  702. query,
  703. field=key,
  704. pattern=f"(?i).*{value}.*",
  705. )
  706. results = []
  707. for record in result:
  708. results.append(
  709. {
  710. "properties": record["properties"],
  711. "id": record["nodeid"],
  712. "create_time": record["create_time"],
  713. "labels": record["labels"],
  714. }
  715. )
  716. return results
  717. except Exception as e:
  718. logger.error(f"搜索节点信息失败: {str(e)}")
  719. return []
  720. finally:
  721. if driver:
  722. driver.close()
  723. def label_info(id):
  724. """
  725. 获取标签节点的信息
  726. Args:
  727. id: 节点ID
  728. Returns:
  729. 标签节点信息
  730. """
  731. query = """
  732. MATCH (n)
  733. WHERE id(n) = $nodeId
  734. RETURN {
  735. id:toString(id(n)),
  736. text:(n.name_zh),
  737. type:"label"
  738. } AS nodes,
  739. toString(id(n)) as rootId
  740. """
  741. driver = None
  742. try:
  743. driver = connect_graph()
  744. with driver.session() as session:
  745. result = session.run(query, nodeId=id).data()
  746. return result[0] if result else {}
  747. except (ConnectionError, ValueError) as e:
  748. logger.error(f"无法连接到Neo4j数据库: {str(e)}")
  749. return {}
  750. finally:
  751. if driver:
  752. driver.close()
  753. def graph_all(domain_id, include_meta=True):
  754. """
  755. 获取完整关系图谱
  756. Args:
  757. domain_id: 节点ID
  758. include_meta: 是否包含元数据节点
  759. Returns:
  760. dict: 包含 nodes 与 lines 的图谱数据
  761. """
  762. try:
  763. domain_id_int = int(domain_id)
  764. except (ValueError, TypeError):
  765. logger.error(f"节点ID不是有效的整数: {domain_id}")
  766. return {"nodes": [], "lines": []}
  767. try:
  768. with neo4j_driver.get_session() as session:
  769. nodes = {}
  770. lines = {}
  771. # 使用路径查询同时获取节点和关系
  772. if include_meta:
  773. cypher = """
  774. MATCH (n)
  775. WHERE id(n) = $domain_id
  776. OPTIONAL MATCH path = (n)-[r]-(m)
  777. RETURN n,
  778. collect(DISTINCT m) as related_nodes,
  779. collect(DISTINCT r) as relationships
  780. """
  781. else:
  782. cypher = """
  783. MATCH (n)
  784. WHERE id(n) = $domain_id
  785. OPTIONAL MATCH path = (n)-[r]-(m)
  786. WHERE NOT (m:DataMeta)
  787. RETURN n,
  788. collect(DISTINCT m) as related_nodes,
  789. collect(DISTINCT r) as relationships
  790. """
  791. result = session.run(cypher, domain_id=domain_id_int)
  792. record = result.single()
  793. if not record:
  794. logger.warning(f"未找到节点: {domain_id_int}")
  795. return {"nodes": [], "lines": []}
  796. # 处理起始节点
  797. n_node = record["n"]
  798. if n_node:
  799. n_props = dict(n_node)
  800. n_labels = list(n_node.labels)
  801. n_props["id"] = domain_id_int
  802. n_props["node_type"] = n_labels[0] if n_labels else ""
  803. nodes[domain_id_int] = n_props
  804. # 处理关联节点
  805. related_nodes = record["related_nodes"] or []
  806. for m_node in related_nodes:
  807. if m_node is None:
  808. continue
  809. m_elem_id = m_node.element_id
  810. m_id = int(m_elem_id.split(":")[-1])
  811. if m_id not in nodes:
  812. m_props = dict(m_node)
  813. m_labels = list(m_node.labels)
  814. m_props["id"] = m_id
  815. m_props["node_type"] = m_labels[0] if m_labels else ""
  816. nodes[m_id] = m_props
  817. # 处理关系
  818. relationships = record["relationships"] or []
  819. for rel in relationships:
  820. if rel is None:
  821. continue
  822. rel_elem_id = rel.element_id
  823. rel_id = rel_elem_id.split(":")[-1]
  824. if rel_id not in lines:
  825. # 获取关系的起始和结束节点 ID
  826. start_elem_id = rel.start_node.element_id
  827. end_elem_id = rel.end_node.element_id
  828. start_id = start_elem_id.split(":")[-1]
  829. end_id = end_elem_id.split(":")[-1]
  830. # 获取关系类型
  831. rel_type = type(rel).__name__
  832. lines[rel_id] = {
  833. "id": rel_id,
  834. "from": start_id,
  835. "to": end_id,
  836. "text": rel_type,
  837. }
  838. logger.info(
  839. f"graph_all 结果: node_id={domain_id_int}, "
  840. f"nodes={len(nodes)}, lines={len(lines)}"
  841. )
  842. return {
  843. "nodes": list(nodes.values()),
  844. "lines": list(lines.values()),
  845. }
  846. except Exception as e:
  847. logger.error(f"获取图谱失败: {str(e)}")
  848. import traceback
  849. logger.error(traceback.format_exc())
  850. return {"nodes": [], "lines": []}
  851. def node_delete(node_id):
  852. """
  853. 删除 DataLabel 节点及其所有关联关系
  854. Args:
  855. node_id: 节点ID(整数)
  856. Returns:
  857. dict: 删除结果,包含 success 状态和 message 信息
  858. """
  859. driver = None
  860. try:
  861. driver = connect_graph()
  862. except (ConnectionError, ValueError) as e:
  863. logger.error(f"无法连接到Neo4j数据库: {str(e)}")
  864. return {"success": False, "message": "无法连接到数据库"}
  865. try:
  866. with driver.session() as session:
  867. # 首先检查节点是否存在且为 DataLabel 类型
  868. check_query = """
  869. MATCH (n:DataLabel)
  870. WHERE id(n) = $nodeId
  871. RETURN n
  872. """
  873. check_result = session.run(
  874. check_query,
  875. nodeId=node_id,
  876. ).single()
  877. if not check_result:
  878. logger.warning(f"DataLabel 节点不存在: ID={node_id}")
  879. return {
  880. "success": False,
  881. "message": f"DataLabel 节点不存在 (ID: {node_id})",
  882. }
  883. # 删除节点及其所有关系
  884. delete_query = """
  885. MATCH (n:DataLabel)
  886. WHERE id(n) = $nodeId
  887. DETACH DELETE n
  888. RETURN count(n) as deleted_count
  889. """
  890. delete_result = session.run(
  891. delete_query,
  892. nodeId=node_id,
  893. ).single()
  894. if not delete_result:
  895. logger.warning(f"删除结果为空: ID={node_id}")
  896. return {
  897. "success": False,
  898. "message": "删除失败,未获取到删除结果",
  899. }
  900. deleted_count = delete_result["deleted_count"]
  901. if deleted_count > 0:
  902. logger.info(f"成功删除 DataLabel 节点: ID={node_id}")
  903. return {
  904. "success": True,
  905. "message": (
  906. f"成功删除 DataLabel 节点 (ID: {node_id})"
  907. ),
  908. }
  909. else:
  910. logger.warning(f"删除失败,节点可能已被删除: ID={node_id}")
  911. return {"success": False, "message": "删除失败,节点可能已被删除"}
  912. except Exception as e:
  913. logger.error(f"删除 DataLabel 节点失败: {str(e)}")
  914. return {"success": False, "message": f"删除失败: {str(e)}"}
  915. finally:
  916. if driver:
  917. driver.close()