resource.py 56 KB


  1. import json
  2. import re
  3. import logging
  4. from py2neo import Relationship
  5. import pandas as pd
  6. from app.services.neo4j_driver import neo4j_driver
  7. from app.core.graph.graph_operations import create_or_get_node, relationship_exists, get_node, connect_graph
  8. import time
  9. logger = logging.getLogger("app")
  10. def get_formatted_time():
  11. """获取格式化的当前时间"""
  12. return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  13. def get_node_by_id(label, id):
  14. """根据ID获取指定标签的节点"""
  15. try:
  16. with neo4j_driver.get_session() as session:
  17. # 确保id为整数
  18. try:
  19. id_int = int(id)
  20. except (ValueError, TypeError):
  21. logger.error(f"节点ID不是有效的整数: {id}")
  22. return None
  23. cypher = f"MATCH (n:{label}) WHERE id(n) = $id RETURN n"
  24. result = session.run(cypher, id=id_int)
  25. record = result.single()
  26. return record["n"] if record else None
  27. except Exception as e:
  28. logger.error(f"根据ID获取节点失败: {str(e)}")
  29. return None
  30. def get_node_by_id_no_label(id):
  31. """根据ID获取节点,不限制标签"""
  32. try:
  33. with neo4j_driver.get_session() as session:
  34. # 确保id为整数
  35. try:
  36. id_int = int(id)
  37. except (ValueError, TypeError):
  38. logger.error(f"节点ID不是有效的整数: {id}")
  39. return None
  40. cypher = "MATCH (n) WHERE id(n) = $id RETURN n"
  41. result = session.run(cypher, id=id_int)
  42. record = result.single()
  43. return record["n"] if record else None
  44. except Exception as e:
  45. logger.error(f"根据ID获取节点失败: {str(e)}")
  46. return None
  47. def delete_relationships(start_node, rel_type=None, end_node=None):
  48. """删除关系"""
  49. try:
  50. with neo4j_driver.get_session() as session:
  51. if rel_type and end_node:
  52. cypher = "MATCH (a)-[r:`{rel_type}`]->(b) WHERE id(a) = $start_id AND id(b) = $end_id DELETE r"
  53. cypher = cypher.replace("{rel_type}", rel_type)
  54. session.run(cypher, start_id=start_node.id, end_id=end_node.id)
  55. elif rel_type:
  56. cypher = "MATCH (a)-[r:`{rel_type}`]->() WHERE id(a) = $start_id DELETE r"
  57. cypher = cypher.replace("{rel_type}", rel_type)
  58. session.run(cypher, start_id=start_node.id)
  59. else:
  60. cypher = "MATCH (a)-[r]->() WHERE id(a) = $start_id DELETE r"
  61. session.run(cypher, start_id=start_node.id)
  62. return True
  63. except Exception as e:
  64. logger.error(f"删除关系失败: {str(e)}")
  65. return False
  66. def update_or_create_node(label, **properties):
  67. """更新或创建节点"""
  68. try:
  69. with neo4j_driver.get_session() as session:
  70. node_id = properties.pop('id', None)
  71. if node_id:
  72. # 更新现有节点
  73. set_clause = ", ".join([f"n.{k} = ${k}" for k in properties.keys()])
  74. cypher = f"MATCH (n:{label}) WHERE id(n) = $id SET {set_clause} RETURN n"
  75. result = session.run(cypher, id=int(node_id), **properties)
  76. else:
  77. # 创建新节点
  78. props_str = ", ".join([f"{k}: ${k}" for k in properties.keys()])
  79. cypher = f"CREATE (n:{label} {{{props_str}}}) RETURN n"
  80. result = session.run(cypher, **properties)
  81. record = result.single()
  82. return record["n"] if record else None
  83. except Exception as e:
  84. logger.error(f"更新或创建节点失败: {str(e)}")
  85. return None
  86. # 数据资源-元数据 关系节点创建、查询
  87. def handle_node(receiver, head_data, data_source=None, resource_type=None):
  88. """处理数据资源节点创建和关系建立"""
  89. try:
  90. # 根据resource_type设置type属性的值
  91. if resource_type == 'ddl':
  92. type_value = 'ddl'
  93. else:
  94. type_value = 'structure'
  95. # 更新属性
  96. update_attributes = {
  97. 'en_name': receiver.get('en_name', receiver.get('name', '')),
  98. 'time': get_formatted_time(),
  99. 'type': type_value # 根据资源类型设置不同的type值
  100. }
  101. # 记录describe字段是否存在于创建数据中
  102. if "describe" in receiver:
  103. logger.info(f"创建资源,describe字段将被设置为: {receiver.get('describe')}")
  104. else:
  105. logger.info("创建资源,describe字段不在创建数据中")
  106. if 'additional_info' in receiver:
  107. del receiver['additional_info']
  108. # 从receiver中移除data_source属性,避免将复杂对象作为节点属性
  109. if 'data_source' in receiver:
  110. del receiver['data_source']
  111. tag_list = receiver.get('tag')
  112. receiver.update(update_attributes)
  113. # 创建或获取 data_resource 节点
  114. with neo4j_driver.get_session() as session:
  115. props_str = ", ".join([f"{k}: ${k}" for k in receiver.keys()])
  116. cypher = f"""
  117. MERGE (n:data_resource {{name: $name}})
  118. ON CREATE SET n = {{{props_str}}}
  119. ON MATCH SET {", ".join([f"n.{k} = ${k}" for k in receiver.keys()])}
  120. RETURN n
  121. """
  122. result = session.run(cypher, **receiver)
  123. data_resource_node = result.single()["n"]
  124. resource_id = data_resource_node.id # 使用id属性获取数值ID
  125. # 记录创建后的节点数据
  126. logger.info(f"创建后的节点数据,describe字段: {data_resource_node.get('describe')}")
  127. # 处理标签关系
  128. if tag_list:
  129. tag_node = get_node_by_id('data_label', tag_list)
  130. if tag_node:
  131. # 检查关系是否存在
  132. rel_check = """
  133. MATCH (a:data_resource)-[r:label]->(b:data_label)
  134. WHERE id(a) = $resource_id AND id(b) = $tag_id
  135. RETURN r
  136. """
  137. rel_result = session.run(rel_check, resource_id=resource_id, tag_id=tag_node.id) # 使用数值id
  138. # 如果关系不存在则创建
  139. if not rel_result.single():
  140. rel_create = """
  141. MATCH (a:data_resource), (b:data_label)
  142. WHERE id(a) = $resource_id AND id(b) = $tag_id
  143. CREATE (a)-[r:label]->(b)
  144. RETURN r
  145. """
  146. session.run(rel_create, resource_id=resource_id, tag_id=tag_node.id)
  147. # 处理头部数据(元数据,字段)
  148. if head_data:
  149. for item in head_data:
  150. # 创建元数据节点
  151. meta_cypher = """
  152. MERGE (m:meta_data {name: $name})
  153. ON CREATE SET m.en_name = $en_name,
  154. m.create_time = $create_time,
  155. m.data_type = $type,
  156. m.status = 'true'
  157. ON MATCH SET m.data_type = $type,
  158. m.status = 'true'
  159. RETURN m
  160. """
  161. create_time = get_formatted_time()
  162. meta_result = session.run(
  163. meta_cypher,
  164. name=item['name'],
  165. en_name=item['en_name'],
  166. create_time=create_time,
  167. type=item['data_type'] # 使用data_type作为data_type属性
  168. )
  169. meta_record = meta_result.single()
  170. if meta_record and meta_record["m"]:
  171. meta_node = meta_record["m"]
  172. meta_id = meta_node.id # 使用数值ID
  173. # 打印日志确认节点创建成功和ID
  174. logger.info(f"创建或获取到元数据节点: ID={meta_id}, name={item['name']}")
  175. # 确认数据资源节点是否可以正确查询到
  176. check_resource_cypher = """
  177. MATCH (n:data_resource)
  178. WHERE id(n) = $resource_id
  179. RETURN n
  180. """
  181. check_resource = session.run(check_resource_cypher, resource_id=resource_id)
  182. if check_resource.single():
  183. logger.info(f"找到数据资源节点: ID={resource_id}")
  184. else:
  185. logger.error(f"无法找到数据资源节点: ID={resource_id}")
  186. continue
  187. # 创建关系
  188. rel_cypher = """
  189. MATCH (a:data_resource), (m:meta_data)
  190. WHERE id(a) = $resource_id AND id(m) = $meta_id
  191. MERGE (a)-[r:contain]->(m)
  192. RETURN r
  193. """
  194. rel_result = session.run(
  195. rel_cypher,
  196. resource_id=resource_id,
  197. meta_id=meta_id
  198. )
  199. rel_record = rel_result.single()
  200. if rel_record:
  201. logger.info(f"成功创建数据资源与元数据的关系: {resource_id} -> {meta_id}")
  202. else:
  203. logger.warning(f"创建数据资源与元数据的关系失败: {resource_id} -> {meta_id}")
  204. else:
  205. logger.error(f"未能创建或获取元数据节点: {item['name']}")
  206. # 处理数据源关系
  207. if data_source and resource_type == 'ddl':
  208. try:
  209. # 创建或获取数据源节点
  210. # data_source_en_name = handle_data_source(data_source)
  211. data_source_en_name = data_source['en_name']
  212. # 创建数据资源与数据源的关系
  213. if data_source_en_name:
  214. # 创建 originates_from 关系
  215. rel_data_source_cypher = """
  216. MATCH (a:data_resource), (b:DataSource)
  217. WHERE id(a) = $resource_id AND b.en_name = $ds_en_name
  218. MERGE (a)-[r:originates_from]->(b)
  219. RETURN r
  220. """
  221. rel_result = session.run(
  222. rel_data_source_cypher,
  223. resource_id=resource_id,
  224. ds_en_name=data_source_en_name
  225. )
  226. rel_record = rel_result.single()
  227. if rel_record:
  228. logger.info(f"已创建数据资源与数据源的关系: {resource_id} -> {data_source_en_name}")
  229. else:
  230. # 添加严重错误日志
  231. error_msg = f"创建数据资源与数据源的关系失败: {resource_id} -> {data_source_en_name}"
  232. logger.error(error_msg)
  233. # 检查数据源节点是否存在
  234. check_ds_cypher = "MATCH (b:DataSource) WHERE b.en_name = $ds_en_name RETURN b"
  235. check_ds_result = session.run(check_ds_cypher, ds_en_name=data_source_en_name)
  236. if not check_ds_result.single():
  237. logger.error(f"数据源节点不存在: en_name={data_source_en_name}")
  238. # 检查数据资源节点是否存在
  239. check_res_cypher = "MATCH (a:data_resource) WHERE id(a) = $resource_id RETURN a"
  240. check_res_result = session.run(check_res_cypher, resource_id=resource_id)
  241. if not check_res_result.single():
  242. logger.error(f"数据资源节点不存在: id={resource_id}")
  243. # 严重错误应该抛出异常
  244. raise RuntimeError(error_msg)
  245. except Exception as e:
  246. logger.error(f"处理数据源关系失败: {str(e)}")
  247. raise RuntimeError(f"处理数据源关系失败: {str(e)}")
  248. return resource_id
  249. except Exception as e:
  250. logger.error(f"处理数据资源节点创建和关系建立失败: {str(e)}")
  251. raise
  252. def handle_id_resource(resource_id):
  253. """处理单个数据资源查询"""
  254. try:
  255. with neo4j_driver.get_session() as session:
  256. # 确保resource_id为整数
  257. try:
  258. resource_id_int = int(resource_id)
  259. except (ValueError, TypeError):
  260. logger.error(f"资源ID不是有效的整数: {resource_id}")
  261. return None
  262. # 使用数值ID查询
  263. cypher = """
  264. MATCH (n:data_resource)
  265. WHERE id(n) = $resource_id
  266. RETURN n
  267. """
  268. result = session.run(cypher, resource_id=resource_id_int)
  269. record = result.single()
  270. if not record:
  271. logger.error(f"未找到资源,ID: {resource_id_int}")
  272. return None
  273. # 构建返回数据
  274. logger.info(f"record: {record}")
  275. data_resource = dict(record["n"])
  276. logger.info(f"data_resource: {data_resource}")
  277. logger.info(f"describe field in node: {record['n'].get('describe')}")
  278. # 确保describe字段存在,即使为null也记录下来
  279. if 'describe' in record["n"]:
  280. data_resource["describe"] = record["n"].get('describe')
  281. logger.info(f"设置describe字段: {data_resource['describe']}")
  282. data_resource["id"] = record["n"].id
  283. # 查询关联的标签
  284. tag_cypher = """
  285. MATCH (n:data_resource)-[r:label]->(t:data_label)
  286. WHERE id(n) = $resource_id
  287. RETURN t
  288. """
  289. tag_result = session.run(tag_cypher, resource_id=resource_id_int)
  290. tag_record = tag_result.single()
  291. # 设置标签信息
  292. if tag_record:
  293. tag = {
  294. "name": tag_record["t"].get("name"),
  295. "id": tag_record["t"].id
  296. }
  297. else:
  298. tag = {
  299. "name": None,
  300. "id": None
  301. }
  302. data_resource["tag"] = tag
  303. # 查询关联的元数据 - 支持meta_data和Metadata两种标签
  304. meta_cypher = """
  305. MATCH (n:data_resource)-[:contain]->(m)
  306. WHERE id(n) = $resource_id
  307. AND (m:meta_data OR m:Metadata)
  308. RETURN m
  309. """
  310. meta_result = session.run(meta_cypher, resource_id=resource_id_int)
  311. parsed_data = []
  312. for meta_record in meta_result:
  313. meta = dict(meta_record["m"])
  314. meta_data = {
  315. "id": meta_record["m"].id,
  316. "name": meta.get("name"),
  317. "en_name": meta.get("en_name"),
  318. "data_type": meta.get("data_type"),
  319. "data_standard": {
  320. "name": None,
  321. "id": None
  322. }
  323. }
  324. parsed_data.append(meta_data)
  325. data_resource["parsed_data"] = parsed_data
  326. # 确保所有必需字段都有默认值
  327. required_fields = {
  328. "leader": "",
  329. "organization": "",
  330. "name": "",
  331. "en_name": "",
  332. "data_sensitivity": "",
  333. "location": "/",
  334. "time": "",
  335. "type": "",
  336. "category": "",
  337. "url": "",
  338. "frequency": "",
  339. "status": True,
  340. "keywords": [],
  341. "describe": ""
  342. }
  343. for field, default_value in required_fields.items():
  344. if field not in data_resource or data_resource[field] is None:
  345. data_resource[field] = default_value
  346. logger.info(f"成功获取资源详情,ID: {resource_id_int}, describe: {data_resource.get('describe')}")
  347. return data_resource
  348. except Exception as e:
  349. logger.error(f"处理单个数据资源查询失败: {str(e)}")
  350. return None
  351. def id_resource_graph(resource_id):
  352. """获取数据资源图谱"""
  353. try:
  354. with neo4j_driver.get_session() as session:
  355. # 查询数据资源节点及其关系
  356. cypher = """
  357. MATCH (n:data_resource)-[r]-(m)
  358. WHERE id(n) = $resource_id
  359. RETURN n, r, m
  360. """
  361. result = session.run(cypher, resource_id=int(resource_id))
  362. # 收集节点和关系
  363. nodes = {}
  364. relationships = []
  365. for record in result:
  366. # 处理源节点
  367. source_node = dict(record["n"])
  368. source_node["id"] = record["n"].id
  369. nodes[source_node["id"]] = source_node
  370. # 处理目标节点
  371. target_node = dict(record["m"])
  372. target_node["id"] = record["m"].id
  373. nodes[target_node["id"]] = target_node
  374. # 处理关系
  375. rel = record["r"]
  376. relationship = {
  377. "id": rel.id,
  378. "source": record["n"].id,
  379. "target": record["m"].id,
  380. "type": rel.type
  381. }
  382. relationships.append(relationship)
  383. return {
  384. "nodes": list(nodes.values()),
  385. "relationships": relationships
  386. }
  387. except Exception as e:
  388. logger.error(f"获取数据资源图谱失败: {str(e)}")
  389. return {"nodes": [], "relationships": []}
  390. def resource_list(page, page_size, en_name_filter=None, name_filter=None,
  391. type_filter='all', category_filter=None, tag_filter=None):
  392. """获取数据资源列表"""
  393. try:
  394. with neo4j_driver.get_session() as session:
  395. # 构建查询条件
  396. match_clause = "MATCH (n:data_resource)"
  397. where_conditions = []
  398. if en_name_filter:
  399. where_conditions.append(f"n.en_name CONTAINS '{en_name_filter}'")
  400. if name_filter:
  401. where_conditions.append(f"n.name CONTAINS '{name_filter}'")
  402. if type_filter and type_filter != 'all':
  403. where_conditions.append(f"n.type = '{type_filter}'")
  404. if category_filter:
  405. where_conditions.append(f"n.category = '{category_filter}'")
  406. # 标签过滤需要额外的匹配
  407. if tag_filter:
  408. match_clause += "-[:label]->(t:data_label)"
  409. where_conditions.append(f"t.name = '{tag_filter}'")
  410. where_clause = " WHERE " + " AND ".join(where_conditions) if where_conditions else ""
  411. # 计算总数
  412. count_cypher = f"{match_clause}{where_clause} RETURN count(n) as count"
  413. count_result = session.run(count_cypher)
  414. total_count = count_result.single()["count"]
  415. # 分页查询
  416. skip = (page - 1) * page_size
  417. cypher = f"""
  418. {match_clause}{where_clause}
  419. RETURN n
  420. ORDER BY n.time DESC
  421. SKIP {skip} LIMIT {page_size}
  422. """
  423. result = session.run(cypher)
  424. # 格式化结果
  425. resources = []
  426. for record in result:
  427. node = dict(record["n"])
  428. node["id"] = record["n"].id
  429. # 查询关联的标签
  430. tag_cypher = """
  431. MATCH (n:data_resource)-[r:label]->(t:data_label)
  432. WHERE id(n) = $resource_id
  433. RETURN t
  434. """
  435. tag_result = session.run(tag_cypher, resource_id=node["id"])
  436. tag_record = tag_result.single()
  437. if tag_record:
  438. tag = dict(tag_record["t"])
  439. tag["id"] = tag_record["t"].id
  440. node["tag_info"] = tag
  441. resources.append(node)
  442. return resources, total_count
  443. except Exception as e:
  444. logger.error(f"获取数据资源列表失败: {str(e)}")
  445. return [], 0
  446. def id_data_search_list(resource_id, page, page_size, en_name_filter=None,
  447. name_filter=None, category_filter=None, tag_filter=None):
  448. """获取特定数据资源关联的元数据列表"""
  449. try:
  450. with neo4j_driver.get_session() as session:
  451. # 确保resource_id为整数
  452. try:
  453. resource_id_int = int(resource_id)
  454. except (ValueError, TypeError):
  455. logger.error(f"资源ID不是有效的整数: {resource_id}")
  456. return [], 0
  457. # 基本匹配语句 - 支持meta_data和Metadata标签
  458. match_clause = """
  459. MATCH (n:data_resource)-[:contain]->(m)
  460. WHERE id(n) = $resource_id
  461. AND (m:meta_data OR m:Metadata)
  462. """
  463. where_conditions = []
  464. if en_name_filter:
  465. where_conditions.append(f"m.en_name CONTAINS '{en_name_filter}'")
  466. if name_filter:
  467. where_conditions.append(f"m.name CONTAINS '{name_filter}'")
  468. if category_filter:
  469. where_conditions.append(f"m.category = '{category_filter}'")
  470. # 标签过滤需要额外的匹配
  471. tag_match = ""
  472. if tag_filter:
  473. tag_match = "MATCH (m)-[:HAS_TAG]->(t:Tag) WHERE t.name = $tag_filter"
  474. where_clause = " AND " + " AND ".join(where_conditions) if where_conditions else ""
  475. # 计算总数
  476. count_cypher = f"""
  477. {match_clause}{where_clause}
  478. {tag_match}
  479. RETURN count(m) as count
  480. """
  481. count_params = {"resource_id": resource_id_int}
  482. if tag_filter:
  483. count_params["tag_filter"] = tag_filter
  484. count_result = session.run(count_cypher, **count_params)
  485. total_count = count_result.single()["count"]
  486. # 分页查询
  487. skip = (page - 1) * page_size
  488. cypher = f"""
  489. {match_clause}{where_clause}
  490. {tag_match}
  491. RETURN m
  492. ORDER BY m.name
  493. SKIP {skip} LIMIT {page_size}
  494. """
  495. result = session.run(cypher, **count_params)
  496. # 格式化结果
  497. metadata_list = []
  498. for record in result:
  499. meta = dict(record["m"])
  500. meta["id"] = record["m"].id
  501. metadata_list.append(meta)
  502. logger.info(f"成功获取资源关联元数据,ID: {resource_id_int}, 元数据数量: {total_count}")
  503. return metadata_list, total_count
  504. except Exception as e:
  505. logger.error(f"获取数据资源关联的元数据列表失败: {str(e)}")
  506. return [], 0
  507. def resource_kinship_graph(resource_id, include_meta=True):
  508. """获取数据资源亲缘关系图谱"""
  509. try:
  510. with neo4j_driver.get_session() as session:
  511. # 确保resource_id为整数
  512. try:
  513. resource_id_int = int(resource_id)
  514. except (ValueError, TypeError):
  515. logger.error(f"资源ID不是有效的整数: {resource_id}")
  516. return {"nodes": [], "relationships": []}
  517. # 基本查询
  518. cypher_parts = [
  519. f"MATCH (n:data_resource) WHERE id(n) = $resource_id",
  520. "OPTIONAL MATCH (n)-[:label]->(l:data_label)",
  521. ]
  522. # 是否包含元数据 - 支持meta_data和Metadata两种标签
  523. if include_meta:
  524. cypher_parts.append("OPTIONAL MATCH (n)-[:contain]->(m) WHERE (m:meta_data OR m:Metadata)")
  525. cypher_parts.append("RETURN n, l, collect(m) as metadata")
  526. cypher = "\n".join(cypher_parts)
  527. result = session.run(cypher, resource_id=resource_id_int)
  528. record = result.single()
  529. if not record:
  530. logger.error(f"未找到资源图谱数据,ID: {resource_id_int}")
  531. return {"nodes": [], "relationships": []}
  532. # 收集节点和关系
  533. nodes = {}
  534. relationships = []
  535. # 处理数据资源节点
  536. resource_node = dict(record["n"])
  537. resource_node["id"] = record["n"].id
  538. resource_node["labels"] = list(record["n"].labels)
  539. nodes[resource_node["id"]] = resource_node
  540. # 处理标签节点
  541. if record["l"]:
  542. label_node = dict(record["l"])
  543. label_node["id"] = record["l"].id
  544. label_node["labels"] = list(record["l"].labels)
  545. nodes[label_node["id"]] = label_node
  546. # 添加资源-标签关系
  547. relationships.append({
  548. "id": f"rel-{resource_node['id']}-label-{label_node['id']}",
  549. "source": resource_node["id"],
  550. "target": label_node["id"],
  551. "type": "label"
  552. })
  553. # 处理元数据节点
  554. if include_meta and record["metadata"]:
  555. for meta in record["metadata"]:
  556. if meta: # 检查元数据节点是否存在
  557. meta_node = dict(meta)
  558. meta_node["id"] = meta.id
  559. meta_node["labels"] = list(meta.labels)
  560. nodes[meta_node["id"]] = meta_node
  561. # 添加资源-元数据关系
  562. relationships.append({
  563. "id": f"rel-{resource_node['id']}-contain-{meta_node['id']}",
  564. "source": resource_node["id"],
  565. "target": meta_node["id"],
  566. "type": "contain"
  567. })
  568. logger.info(f"成功获取资源图谱,ID: {resource_id_int}, 节点数: {len(nodes)}")
  569. return {
  570. "nodes": list(nodes.values()),
  571. "relationships": relationships
  572. }
  573. except Exception as e:
  574. logger.error(f"获取数据资源亲缘关系图谱失败: {str(e)}")
  575. return {"nodes": [], "relationships": []}
  576. def resource_impact_all_graph(resource_id, include_meta=True):
  577. """获取数据资源影响关系图谱"""
  578. try:
  579. with neo4j_driver.get_session() as session:
  580. # 确保resource_id为整数
  581. try:
  582. resource_id_int = int(resource_id)
  583. except (ValueError, TypeError):
  584. logger.error(f"资源ID不是有效的整数: {resource_id}")
  585. return {"nodes": [], "relationships": []}
  586. # 根据meta参数决定查询深度
  587. if include_meta:
  588. cypher = """
  589. MATCH path = (n:data_resource)-[*1..3]-(m)
  590. WHERE id(n) = $resource_id
  591. RETURN path
  592. """
  593. else:
  594. cypher = """
  595. MATCH path = (n:data_resource)-[*1..2]-(m)
  596. WHERE id(n) = $resource_id
  597. AND NOT (m:meta_data) AND NOT (m:Metadata)
  598. RETURN path
  599. """
  600. result = session.run(cypher, resource_id=resource_id_int)
  601. # 收集节点和关系
  602. nodes = {}
  603. relationships = {}
  604. for record in result:
  605. path = record["path"]
  606. # 处理路径中的所有节点
  607. for node in path.nodes:
  608. if node.id not in nodes:
  609. node_dict = dict(node)
  610. node_dict["id"] = node.id
  611. node_dict["labels"] = list(node.labels)
  612. nodes[node.id] = node_dict
  613. # 处理路径中的所有关系
  614. for rel in path.relationships:
  615. if rel.id not in relationships:
  616. rel_dict = {
  617. "id": rel.id,
  618. "source": rel.start_node.id,
  619. "target": rel.end_node.id,
  620. "type": rel.type
  621. }
  622. relationships[rel.id] = rel_dict
  623. logger.info(f"成功获取完整图谱,ID: {resource_id_int}, 节点数: {len(nodes)}")
  624. return {
  625. "nodes": list(nodes.values()),
  626. "relationships": list(relationships.values())
  627. }
  628. except Exception as e:
  629. logger.error(f"获取数据资源影响关系图谱失败: {str(e)}")
  630. return {"nodes": [], "relationships": []}
  631. def clean_type(type_str):
  632. """清洗SQL类型字符串"""
  633. # 提取基本类型,不包括长度或精度信息
  634. basic_type = re.sub(r'\(.*?\)', '', type_str).strip().upper()
  635. # 移除 VARYING 这样的后缀
  636. basic_type = re.sub(r'\s+VARYING$', '', basic_type)
  637. # 标准化常见类型
  638. type_mapping = {
  639. 'INT': 'INTEGER',
  640. 'INT4': 'INTEGER',
  641. 'INT8': 'BIGINT',
  642. 'SMALLINT': 'SMALLINT',
  643. 'BIGINT': 'BIGINT',
  644. 'FLOAT4': 'FLOAT',
  645. 'FLOAT8': 'DOUBLE',
  646. 'REAL': 'FLOAT',
  647. 'DOUBLE PRECISION': 'DOUBLE',
  648. 'NUMERIC': 'DECIMAL',
  649. 'BOOL': 'BOOLEAN',
  650. 'CHARACTER': 'CHAR',
  651. 'CHAR VARYING': 'VARCHAR',
  652. 'CHARACTER VARYING': 'VARCHAR',
  653. 'TEXT': 'TEXT',
  654. 'DATE': 'DATE',
  655. 'TIME': 'TIME',
  656. 'TIMESTAMP': 'TIMESTAMP',
  657. 'TIMESTAMPTZ': 'TIMESTAMP WITH TIME ZONE',
  658. 'BYTEA': 'BINARY',
  659. 'JSON': 'JSON',
  660. 'JSONB': 'JSONB',
  661. 'UUID': 'UUID',
  662. 'SERIAL': 'SERIAL',
  663. 'SERIAL4': 'SERIAL',
  664. 'SERIAL8': 'BIGSERIAL',
  665. 'BIGSERIAL': 'BIGSERIAL'
  666. }
  667. # 尝试从映射表中获取标准化的类型
  668. return type_mapping.get(basic_type, basic_type)
  669. def clean_field_name(field_name):
  670. """清洗字段名"""
  671. return field_name.strip('`').strip('"').strip("'")
  672. def select_create_ddl(sql_content):
  673. """从SQL内容中提取创建表的DDL语句"""
  674. try:
  675. # 解析复杂的SQL文件,识别所有的CREATE TABLE语句及其关联的注释
  676. # 找到所有以CREATE TABLE开头的语句块,每个语句块包含主语句和相关的注释
  677. # 首先,分割 SQL 内容按分号
  678. statements = []
  679. current_statement = ""
  680. in_string = False
  681. string_quote = None
  682. for char in sql_content:
  683. if char in ["'", '"']:
  684. if not in_string:
  685. in_string = True
  686. string_quote = char
  687. elif char == string_quote:
  688. in_string = False
  689. string_quote = None
  690. current_statement += char
  691. elif char == ';' and not in_string:
  692. current_statement += char
  693. if current_statement.strip():
  694. statements.append(current_statement.strip())
  695. current_statement = ""
  696. else:
  697. current_statement += char
  698. if current_statement.strip():
  699. statements.append(current_statement.strip())
  700. # 找出所有的CREATE TABLE语句和关联的注释
  701. create_table_statements = []
  702. create_index = -1
  703. in_table_block = False
  704. current_table = None
  705. current_block = ""
  706. for i, stmt in enumerate(statements):
  707. if re.search(r'^\s*CREATE\s+TABLE', stmt, re.IGNORECASE):
  708. # 如果已经在处理表,先保存当前块
  709. if in_table_block and current_block:
  710. create_table_statements.append(current_block)
  711. # 开始新的表块
  712. in_table_block = True
  713. current_block = stmt
  714. # 提取表名
  715. table_match = re.search(r'CREATE\s+TABLE\s+(?:(?:"[^"]+"|\'[^\']+\'|[^"\'\s\.]+)\.)?(?:"([^"]+)"|\'([^\']+)\'|([^"\'\s\(]+))', stmt, re.IGNORECASE)
  716. if table_match:
  717. current_table = table_match.group(1) or table_match.group(2) or table_match.group(3)
  718. current_table = current_table.strip('"\'') if current_table else ""
  719. elif in_table_block and (re.search(r'COMMENT\s+ON\s+TABLE', stmt, re.IGNORECASE) or
  720. re.search(r'COMMENT\s+ON\s+COLUMN', stmt, re.IGNORECASE)):
  721. # 检查注释是否属于当前表
  722. if current_table:
  723. # 表注释处理
  724. if re.search(r'COMMENT\s+ON\s+TABLE', stmt, re.IGNORECASE):
  725. table_comment_match = re.search(r'COMMENT\s+ON\s+TABLE\s+[\'"]?(\w+)[\'"]?', stmt, re.IGNORECASE)
  726. if table_comment_match:
  727. comment_table = table_comment_match.group(1).strip('"\'')
  728. if comment_table == current_table:
  729. current_block += " " + stmt
  730. else:
  731. # 这是另一个表的注释,当前表的DDL到此结束
  732. create_table_statements.append(current_block)
  733. in_table_block = False
  734. current_block = ""
  735. current_table = None
  736. # 列注释处理
  737. elif re.search(r'COMMENT\s+ON\s+COLUMN', stmt, re.IGNORECASE):
  738. column_comment_match = re.search(
  739. r'COMMENT\s+ON\s+COLUMN\s+[\'"]?(\w+)[\'"]?\.[\'"]?(\w+)[\'"]?\s+IS\s+\'([^\']+)\'',
  740. stmt,
  741. re.IGNORECASE
  742. )
  743. if column_comment_match:
  744. comment_table = column_comment_match.group(1)
  745. if comment_table == current_table:
  746. current_block += " " + stmt
  747. else:
  748. # 这是另一个表的注释,当前表的DDL到此结束
  749. create_table_statements.append(current_block)
  750. in_table_block = False
  751. current_block = ""
  752. current_table = None
  753. elif in_table_block and re.search(r'^\s*CREATE\s+', stmt, re.IGNORECASE):
  754. # 如果遇到新的CREATE语句(不是注释),保存当前块并结束
  755. create_table_statements.append(current_block)
  756. in_table_block = False
  757. current_block = ""
  758. current_table = None
  759. # 添加最后一个块
  760. if in_table_block and current_block:
  761. create_table_statements.append(current_block)
  762. # 日志记录
  763. logger.debug(f"提取到 {len(create_table_statements)} 个DDL语句")
  764. for i, stmt in enumerate(create_table_statements):
  765. logger.debug(f"DDL语句 {i+1}: {stmt}")
  766. return create_table_statements
  767. except Exception as e:
  768. logger.error(f"提取DDL语句失败: {str(e)}")
  769. # logger.error(traceback.format_exc())
  770. return []
  771. def table_sql(sql):
  772. """解析表定义SQL,支持带schema和不带schema两种格式"""
  773. try:
  774. # 支持以下格式:
  775. # 1. CREATE TABLE tablename
  776. # 2. CREATE TABLE "tablename"
  777. # 3. CREATE TABLE 'tablename'
  778. # 4. CREATE TABLE schema.tablename
  779. # 5. CREATE TABLE "schema"."tablename"
  780. # 6. CREATE TABLE 'schema'.'tablename'
  781. # 匹配表名,支持带引号和不带引号的情况
  782. table_pattern = r'CREATE\s+TABLE\s+(?:(?:"([^"]+)"|\'([^\']+)\'|([^"\'\s\.]+))\.)?(?:"([^"]+)"|\'([^\']+)\'|([^"\'\s\(]+))'
  783. table_match = re.search(table_pattern, sql, re.IGNORECASE)
  784. if not table_match:
  785. logger.error(f"无法匹配CREATE TABLE语句: {sql[:100]}...")
  786. return None
  787. # 获取表名
  788. schema = table_match.group(1) or table_match.group(2) or table_match.group(3)
  789. table_name = table_match.group(4) or table_match.group(5) or table_match.group(6)
  790. if not table_name:
  791. logger.error("无法解析表名")
  792. return None
  793. logger.debug(f"解析到表名: {table_name}")
  794. # 提取CREATE TABLE语句的主体部分(括号内的内容)
  795. body_pattern = r'CREATE\s+TABLE\s+[^(]*\((.*?)\)(?=\s*;|\s*$)'
  796. body_match = re.search(body_pattern, sql, re.DOTALL | re.IGNORECASE)
  797. if not body_match:
  798. logger.error("无法提取表主体内容")
  799. return None
  800. body_text = body_match.group(1).strip()
  801. logger.debug(f"表定义主体部分: {body_text}")
  802. # 解析字段定义
  803. fields = []
  804. # 分割字段定义,处理括号嵌套和引号
  805. field_defs = []
  806. pos = 0
  807. in_parentheses = 0
  808. in_quotes = False
  809. quote_char = None
  810. for i, char in enumerate(body_text):
  811. if char in ["'", '"', '`'] and (not in_quotes or char == quote_char):
  812. in_quotes = not in_quotes
  813. if in_quotes:
  814. quote_char = char
  815. else:
  816. quote_char = None
  817. elif char == '(' and not in_quotes:
  818. in_parentheses += 1
  819. elif char == ')' and not in_quotes:
  820. in_parentheses -= 1
  821. elif char == ',' and in_parentheses == 0 and not in_quotes:
  822. field_defs.append(body_text[pos:i].strip())
  823. pos = i + 1
  824. # 添加最后一个字段定义
  825. if pos < len(body_text):
  826. field_defs.append(body_text[pos:].strip())
  827. logger.debug(f"解析出 {len(field_defs)} 个字段定义")
  828. # 处理每个字段定义
  829. for field_def in field_defs:
  830. # 跳过约束定义
  831. if re.match(r'^\s*(?:PRIMARY|UNIQUE|FOREIGN|CHECK|CONSTRAINT)\s+', field_def, re.IGNORECASE):
  832. continue
  833. # 提取字段名和类型
  834. field_pattern = r'^\s*(?:"([^"]+)"|\'([^\']+)\'|`([^`]+)`|([a-zA-Z0-9_]+))\s+(.+?)(?:\s+DEFAULT\s+|\s+NOT\s+NULL|\s+REFERENCES|\s*$)'
  835. field_match = re.search(field_pattern, field_def, re.IGNORECASE)
  836. if field_match:
  837. # 提取字段名
  838. field_name = field_match.group(1) or field_match.group(2) or field_match.group(3) or field_match.group(4)
  839. # 提取类型
  840. field_type = field_match.group(5).strip()
  841. # 处理类型中可能的括号
  842. type_base = re.split(r'\s+', field_type)[0]
  843. clean_type_value = clean_type(type_base)
  844. fields.append((field_name, clean_type_value))
  845. logger.debug(f"解析到字段: {field_name}, 类型: {clean_type_value}")
  846. else:
  847. logger.warning(f"无法解析字段定义: {field_def}")
  848. # 提取表注释
  849. table_comment = ""
  850. table_comment_pattern = r"COMMENT\s+ON\s+TABLE\s+(?:['\"]?(\w+)['\"]?)\s+IS\s+'([^']+)'"
  851. table_comment_match = re.search(table_comment_pattern, sql, re.IGNORECASE)
  852. if table_comment_match:
  853. comment_table = table_comment_match.group(1)
  854. if comment_table.strip("'\"") == table_name.strip("'\""):
  855. table_comment = table_comment_match.group(2)
  856. logger.debug(f"找到表注释: {table_comment}")
  857. # 提取列注释
  858. comments = {}
  859. column_comment_pattern = r"COMMENT\s+ON\s+COLUMN\s+['\"]?(\w+)['\"]?\.['\"]?(\w+)['\"]?\s+IS\s+'([^']+)'"
  860. for match in re.finditer(column_comment_pattern, sql, re.IGNORECASE):
  861. comment_table = match.group(1)
  862. column_name = match.group(2)
  863. comment = match.group(3)
  864. # 检查表名是否匹配
  865. if comment_table.strip("'\"") == table_name.strip("'\""):
  866. comments[column_name] = comment
  867. logger.debug(f"找到列注释: {column_name} - {comment}")
  868. else:
  869. logger.debug(f"忽略列注释,表名不匹配: {comment_table} vs {table_name}")
  870. # 检查字段和注释匹配情况
  871. logger.debug("========字段和注释匹配情况========")
  872. field_names = [f[0] for f in fields]
  873. logger.debug(f"字段列表 ({len(field_names)}): {field_names}")
  874. logger.debug(f"注释字段 ({len(comments)}): {list(comments.keys())}")
  875. # 构建返回结果
  876. meta_list = []
  877. for field_name, field_type in fields:
  878. chinese_name = comments.get(field_name, "")
  879. meta_list.append({
  880. "en_name": field_name,
  881. "data_type": field_type,
  882. "name": chinese_name if chinese_name else field_name
  883. })
  884. # 检查表是否存在
  885. try:
  886. status = status_query([table_name])
  887. except Exception as e:
  888. logger.error(f"检查表存在状态失败: {str(e)}")
  889. status = [False]
  890. # 构建返回结果
  891. result = {
  892. table_name: {
  893. "exist": status[0] if status else False,
  894. "meta": meta_list
  895. }
  896. }
  897. logger.debug(f"解析结果: {json.dumps(result, ensure_ascii=False)}")
  898. return result
  899. except Exception as e:
  900. logger.error(f"解析表定义SQL失败: {str(e)}")
  901. logger.error(f"异常详情: {e}")
  902. import traceback
  903. logger.error(traceback.format_exc())
  904. return None
  905. # 判断英文表名是否在图谱中存在
  906. def status_query(key_list):
  907. query = """
  908. unwind $Key_list as name
  909. OPTIONAL MATCH (n:data_model {en_name: name})
  910. OPTIONAL MATCH (n:data_resource {en_name: name})
  911. OPTIONAL MATCH (n:data_metric {en_name: name})
  912. WITH name, CASE
  913. WHEN n IS NOT NULL THEN True
  914. ELSE False
  915. END AS exist
  916. return collect(exist)AS exist
  917. """
  918. with neo4j_driver.get_session() as session:
  919. result = session.run(query, Key_list=key_list)
  920. data = result.value() # 获取单个值
  921. return data
  922. def select_sql(sql_query):
  923. """解析SELECT查询语句"""
  924. try:
  925. # 提取SELECT子句
  926. select_pattern = r'SELECT\s+(.*?)\s+FROM'
  927. select_match = re.search(select_pattern, sql_query, re.IGNORECASE | re.DOTALL)
  928. if not select_match:
  929. return None
  930. select_clause = select_match.group(1)
  931. # 分割字段
  932. fields = []
  933. # 处理字段列表,避免在函数调用中的逗号导致错误分割
  934. in_parenthesis = 0
  935. current_field = ""
  936. for char in select_clause:
  937. if char == '(':
  938. in_parenthesis += 1
  939. current_field += char
  940. elif char == ')':
  941. in_parenthesis -= 1
  942. current_field += char
  943. elif char == ',' and in_parenthesis == 0:
  944. fields.append(current_field.strip())
  945. current_field = ""
  946. else:
  947. current_field += char
  948. if current_field.strip():
  949. fields.append(current_field.strip())
  950. # 解析每个字段
  951. parsed_fields = []
  952. for field in fields:
  953. # 检查是否有字段别名
  954. alias_pattern = r'(.*?)\s+[aA][sS]\s+(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))$'
  955. alias_match = re.search(alias_pattern, field)
  956. if alias_match:
  957. field_expr = alias_match.group(1).strip()
  958. field_alias = next((g for g in alias_match.groups()[1:] if g is not None), "")
  959. parsed_fields.append({
  960. "expression": field_expr,
  961. "alias": field_alias
  962. })
  963. else:
  964. # 没有别名的情况
  965. parsed_fields.append({
  966. "expression": field.strip(),
  967. "alias": None
  968. })
  969. # 提取FROM子句和表名
  970. from_pattern = r'FROM\s+(.*?)(?:\s+WHERE|\s+GROUP|\s+HAVING|\s+ORDER|\s+LIMIT|$)'
  971. from_match = re.search(from_pattern, sql_query, re.IGNORECASE | re.DOTALL)
  972. tables = []
  973. if from_match:
  974. from_clause = from_match.group(1).strip()
  975. # 分析FROM子句中的表
  976. table_pattern = r'(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))(?:\s+(?:AS\s+)?(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))?'
  977. for match in re.finditer(table_pattern, from_clause):
  978. table_name = match.group(1) or match.group(2) or match.group(3) or match.group(4)
  979. if table_name:
  980. tables.append(table_name)
  981. return tables
  982. except Exception as e:
  983. logger.error(f"解析SELECT查询语句失败: {str(e)}")
  984. # logger.error(traceback.format_exc())
  985. return None
  986. def model_resource_list(page, page_size, name_filter=None):
  987. """获取模型资源列表"""
  988. try:
  989. with neo4j_driver.get_session() as session:
  990. # 构建查询条件
  991. match_clause = "MATCH (n:model_resource)"
  992. where_clause = ""
  993. if name_filter:
  994. where_clause = f" WHERE n.name CONTAINS '{name_filter}'"
  995. # 计算总数
  996. count_cypher = f"{match_clause}{where_clause} RETURN count(n) as count"
  997. count_result = session.run(count_cypher)
  998. total_count = count_result.single()["count"]
  999. # 分页查询
  1000. skip = (page - 1) * page_size
  1001. cypher = f"""
  1002. {match_clause}{where_clause}
  1003. RETURN n
  1004. ORDER BY n.createTime DESC
  1005. SKIP {skip} LIMIT {page_size}
  1006. """
  1007. result = session.run(cypher)
  1008. # 格式化结果
  1009. resources = []
  1010. for record in result:
  1011. node = dict(record["n"])
  1012. node["id"] = record["n"].id
  1013. resources.append(node)
  1014. return resources, total_count
  1015. except Exception as e:
  1016. logger.error(f"获取模型资源列表失败: {str(e)}")
  1017. return [], 0
  1018. def data_resource_edit(data):
  1019. """编辑数据资源"""
  1020. try:
  1021. resource_id = data.get("id")
  1022. if not resource_id:
  1023. raise ValueError("缺少资源ID")
  1024. with neo4j_driver.get_session() as session:
  1025. # 更新节点属性
  1026. update_fields = {}
  1027. for key, value in data.items():
  1028. if key != "id" and key != "parsed_data" and value is not None:
  1029. update_fields[key] = value
  1030. # 记录describe字段是否存在于待更新数据中
  1031. if "describe" in data:
  1032. logger.info(f"编辑资源,describe字段将被更新为: {data.get('describe')}")
  1033. else:
  1034. logger.info("编辑资源,describe字段不在更新数据中")
  1035. # 添加更新时间
  1036. update_fields["updateTime"] = get_formatted_time()
  1037. # 构建更新语句
  1038. set_clause = ", ".join([f"n.{k} = ${k}" for k in update_fields.keys()])
  1039. cypher = f"""
  1040. MATCH (n:data_resource)
  1041. WHERE id(n) = $resource_id
  1042. SET {set_clause}
  1043. RETURN n
  1044. """
  1045. result = session.run(cypher, resource_id=int(resource_id), **update_fields)
  1046. updated_node = result.single()
  1047. if not updated_node:
  1048. raise ValueError("资源不存在")
  1049. # 记录更新后的节点数据
  1050. logger.info(f"更新后的节点数据,describe字段: {updated_node['n'].get('describe')}")
  1051. # 处理标签关系
  1052. tag_id = data.get("tag")
  1053. if tag_id:
  1054. # 删除旧的标签关系
  1055. delete_rel_cypher = """
  1056. MATCH (n:data_resource)-[r:label]->()
  1057. WHERE id(n) = $resource_id
  1058. DELETE r
  1059. """
  1060. session.run(delete_rel_cypher, resource_id=int(resource_id))
  1061. # 创建新的标签关系
  1062. create_rel_cypher = """
  1063. MATCH (n:data_resource), (t:data_label)
  1064. WHERE id(n) = $resource_id AND id(t) = $tag_id
  1065. CREATE (n)-[r:label]->(t)
  1066. RETURN r
  1067. """
  1068. session.run(create_rel_cypher, resource_id=int(resource_id), tag_id=int(tag_id))
  1069. # 处理元数据关系
  1070. parsed_data = data.get("parsed_data", [])
  1071. if parsed_data:
  1072. # 删除旧的元数据关系
  1073. delete_meta_cypher = """
  1074. MATCH (n:data_resource)-[r:contain]->()
  1075. WHERE id(n) = $resource_id
  1076. DELETE r
  1077. """
  1078. session.run(delete_meta_cypher, resource_id=int(resource_id))
  1079. # 删除旧的清洗资源关系
  1080. delete_clean_cypher = """
  1081. MATCH (n:data_resource)-[r:clean_resource]->()
  1082. WHERE id(n) = $resource_id
  1083. DELETE r
  1084. """
  1085. session.run(delete_clean_cypher, resource_id=int(resource_id))
  1086. # 创建新的元数据关系和相关关系
  1087. for meta in parsed_data:
  1088. meta_id = meta.get("id")
  1089. if meta_id:
  1090. # 创建元数据关系
  1091. create_meta_cypher = """
  1092. MATCH (n:data_resource), (m:meta_data)
  1093. WHERE id(n) = $resource_id AND id(m) = $meta_id
  1094. CREATE (n)-[r:contain]->(m)
  1095. RETURN r
  1096. """
  1097. session.run(create_meta_cypher, resource_id=int(resource_id), meta_id=int(meta_id))
  1098. # 处理主数据关系
  1099. master_data = meta.get("master_data")
  1100. if master_data:
  1101. # 创建主数据关系
  1102. create_master_cypher = """
  1103. MATCH (master), (meta:meta_data)
  1104. WHERE id(master) = $master_id AND id(meta) = $meta_id
  1105. MERGE (master)-[r:master]->(meta)
  1106. RETURN r
  1107. """
  1108. session.run(create_master_cypher, master_id=int(master_data), meta_id=int(meta_id))
  1109. # 处理数据标准关系
  1110. data_standard = meta.get("data_standard")
  1111. if data_standard and isinstance(data_standard, dict) and data_standard.get("id"):
  1112. standard_id = data_standard.get("id")
  1113. # 创建数据标准与元数据的关系
  1114. create_standard_meta_cypher = """
  1115. MATCH (standard), (meta:meta_data)
  1116. WHERE id(standard) = $standard_id AND id(meta) = $meta_id
  1117. MERGE (standard)-[r:clean_resource]->(meta)
  1118. RETURN r
  1119. """
  1120. session.run(create_standard_meta_cypher, standard_id=int(standard_id), meta_id=int(meta_id))
  1121. # 创建数据资源与数据标准的关系
  1122. create_resource_standard_cypher = """
  1123. MATCH (resource:data_resource), (standard)
  1124. WHERE id(resource) = $resource_id AND id(standard) = $standard_id
  1125. MERGE (resource)-[r:clean_resource]->(standard)
  1126. RETURN r
  1127. """
  1128. session.run(create_resource_standard_cypher, resource_id=int(resource_id), standard_id=int(standard_id))
  1129. # 返回更新后的节点
  1130. node_data = dict(updated_node["n"])
  1131. node_data["id"] = updated_node["n"].id
  1132. # 记录最终返回的describe字段
  1133. logger.info(f"data_resource_edit返回数据,describe字段: {node_data.get('describe')}")
  1134. return node_data
  1135. except Exception as e:
  1136. logger.error(f"编辑数据资源失败: {str(e)}")
  1137. raise
  1138. def handle_data_source(data_source):
  1139. """处理数据源信息,创建或获取数据源节点"""
  1140. try:
  1141. with neo4j_driver.get_session() as session:
  1142. # 获取英文名称作为唯一标识
  1143. ds_en_name = data_source.get("en_name")
  1144. if not ds_en_name:
  1145. logger.error("数据源缺少必要的en_name属性")
  1146. return None
  1147. # 如果没有设置name,使用en_name作为name
  1148. if "name" not in data_source or not data_source["name"]:
  1149. data_source["name"] = ds_en_name
  1150. # 检查必填字段
  1151. required_fields = ["type", "host", "port", "database", "username"]
  1152. has_required_fields = all(data_source.get(field) for field in required_fields)
  1153. # 查询是否已存在相同en_name的数据源
  1154. existing_cypher = """
  1155. MATCH (ds:DataSource {en_name: $en_name})
  1156. RETURN ds
  1157. """
  1158. existing_result = session.run(existing_cypher, en_name=ds_en_name)
  1159. existing_record = existing_result.single()
  1160. if existing_record:
  1161. existing_data_source = dict(existing_record["ds"])
  1162. logger.info(f"根据名称找到现有数据源: {existing_data_source.get('en_name')}")
  1163. return existing_data_source.get("en_name")
  1164. else:
  1165. # 数据源不存在,抛出异常
  1166. raise ValueError(f"未找到名称为 {ds_en_name} 的数据源,请先创建该数据源或提供完整的数据源信息")
  1167. except Exception as e:
  1168. logger.error(f"处理数据源失败: {str(e)}")
  1169. raise RuntimeError(f"处理数据源失败: {str(e)}")