resource.py 52 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298
  1. import json
  2. import re
  3. import logging
  4. from py2neo import Relationship
  5. import pandas as pd
  6. from app.services.neo4j_driver import neo4j_driver
  7. from app.services.package_function import create_or_get_node, relationship_exists, get_node
  8. logger = logging.getLogger("app")
  9. def get_formatted_time():
  10. """获取格式化的当前时间"""
  11. import time
  12. return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  13. def get_node_by_id(label, id):
  14. """根据ID获取指定标签的节点"""
  15. try:
  16. with neo4j_driver.get_session() as session:
  17. # 确保id为整数
  18. try:
  19. id_int = int(id)
  20. except (ValueError, TypeError):
  21. logger.error(f"节点ID不是有效的整数: {id}")
  22. return None
  23. cypher = f"MATCH (n:{label}) WHERE id(n) = $id RETURN n"
  24. result = session.run(cypher, id=id_int)
  25. record = result.single()
  26. return record["n"] if record else None
  27. except Exception as e:
  28. logger.error(f"根据ID获取节点失败: {str(e)}")
  29. return None
  30. def get_node_by_id_no_label(id):
  31. """根据ID获取节点,不限制标签"""
  32. try:
  33. with neo4j_driver.get_session() as session:
  34. # 确保id为整数
  35. try:
  36. id_int = int(id)
  37. except (ValueError, TypeError):
  38. logger.error(f"节点ID不是有效的整数: {id}")
  39. return None
  40. cypher = "MATCH (n) WHERE id(n) = $id RETURN n"
  41. result = session.run(cypher, id=id_int)
  42. record = result.single()
  43. return record["n"] if record else None
  44. except Exception as e:
  45. logger.error(f"根据ID获取节点失败: {str(e)}")
  46. return None
  47. def delete_relationships(start_node, rel_type=None, end_node=None):
  48. """删除关系"""
  49. try:
  50. with neo4j_driver.get_session() as session:
  51. if rel_type and end_node:
  52. cypher = "MATCH (a)-[r:`{rel_type}`]->(b) WHERE id(a) = $start_id AND id(b) = $end_id DELETE r"
  53. cypher = cypher.replace("{rel_type}", rel_type)
  54. session.run(cypher, start_id=start_node.id, end_id=end_node.id)
  55. elif rel_type:
  56. cypher = "MATCH (a)-[r:`{rel_type}`]->() WHERE id(a) = $start_id DELETE r"
  57. cypher = cypher.replace("{rel_type}", rel_type)
  58. session.run(cypher, start_id=start_node.id)
  59. else:
  60. cypher = "MATCH (a)-[r]->() WHERE id(a) = $start_id DELETE r"
  61. session.run(cypher, start_id=start_node.id)
  62. return True
  63. except Exception as e:
  64. logger.error(f"删除关系失败: {str(e)}")
  65. return False
  66. def update_or_create_node(label, **properties):
  67. """更新或创建节点"""
  68. try:
  69. with neo4j_driver.get_session() as session:
  70. node_id = properties.pop('id', None)
  71. if node_id:
  72. # 更新现有节点
  73. set_clause = ", ".join([f"n.{k} = ${k}" for k in properties.keys()])
  74. cypher = f"MATCH (n:{label}) WHERE id(n) = $id SET {set_clause} RETURN n"
  75. result = session.run(cypher, id=int(node_id), **properties)
  76. else:
  77. # 创建新节点
  78. props_str = ", ".join([f"{k}: ${k}" for k in properties.keys()])
  79. cypher = f"CREATE (n:{label} {{{props_str}}}) RETURN n"
  80. result = session.run(cypher, **properties)
  81. record = result.single()
  82. return record["n"] if record else None
  83. except Exception as e:
  84. logger.error(f"更新或创建节点失败: {str(e)}")
  85. return None
  86. # 数据资源-元数据 关系节点创建、查询
  87. def handle_node(receiver, head_data, data_source=None, resource_type=None):
  88. """处理数据资源节点创建和关系建立"""
  89. try:
  90. # 根据resource_type设置type属性的值
  91. if resource_type == 'ddl':
  92. type_value = 'ddl'
  93. else:
  94. type_value = 'structure'
  95. # 更新属性
  96. update_attributes = {
  97. 'en_name': receiver['en_name'],
  98. 'time': get_formatted_time(),
  99. 'type': type_value # 根据资源类型设置不同的type值
  100. }
  101. if 'additional_info' in receiver:
  102. del receiver['additional_info']
  103. # 从receiver中移除data_source属性,避免将复杂对象作为节点属性
  104. if 'data_source' in receiver:
  105. del receiver['data_source']
  106. tag_list = receiver.get('tag')
  107. receiver.update(update_attributes)
  108. # 创建或获取 data_resource 节点
  109. with neo4j_driver.get_session() as session:
  110. props_str = ", ".join([f"{k}: ${k}" for k in receiver.keys()])
  111. cypher = f"""
  112. MERGE (n:data_resource {{name: $name}})
  113. ON CREATE SET n = {{{props_str}}}
  114. ON MATCH SET {", ".join([f"n.{k} = ${k}" for k in receiver.keys()])}
  115. RETURN n
  116. """
  117. result = session.run(cypher, **receiver)
  118. data_resource_node = result.single()["n"]
  119. resource_id = data_resource_node.id # 使用id属性获取数值ID
  120. # 处理标签关系
  121. if tag_list:
  122. tag_node = get_node_by_id('data_label', tag_list)
  123. if tag_node:
  124. # 检查关系是否存在
  125. rel_check = """
  126. MATCH (a:data_resource)-[r:label]->(b:data_label)
  127. WHERE id(a) = $resource_id AND id(b) = $tag_id
  128. RETURN r
  129. """
  130. rel_result = session.run(rel_check, resource_id=resource_id, tag_id=tag_node.id) # 使用数值id
  131. # 如果关系不存在则创建
  132. if not rel_result.single():
  133. rel_create = """
  134. MATCH (a:data_resource), (b:data_label)
  135. WHERE id(a) = $resource_id AND id(b) = $tag_id
  136. CREATE (a)-[r:label]->(b)
  137. RETURN r
  138. """
  139. session.run(rel_create, resource_id=resource_id, tag_id=tag_node.id)
  140. # 处理头部数据(元数据,字段)
  141. if head_data:
  142. for item in head_data:
  143. # 创建元数据节点
  144. meta_cypher = """
  145. MERGE (m:meta_data {name: $name})
  146. ON CREATE SET m.en_name = $en_name,
  147. m.create_time = $create_time,
  148. m.data_type = $type,
  149. m.status = 'true'
  150. ON MATCH SET m.data_type = $type,
  151. m.status = 'true'
  152. RETURN m
  153. """
  154. create_time = get_formatted_time()
  155. meta_result = session.run(
  156. meta_cypher,
  157. name=item['name'],
  158. en_name=item['en_name'],
  159. create_time=create_time,
  160. type=item['data_type'] # 使用data_type作为data_type属性
  161. )
  162. meta_record = meta_result.single()
  163. if meta_record and meta_record["m"]:
  164. meta_node = meta_record["m"]
  165. meta_id = meta_node.id # 使用数值ID
  166. # 打印日志确认节点创建成功和ID
  167. logger.info(f"创建或获取到元数据节点: ID={meta_id}, name={item['name']}")
  168. # 确认数据资源节点是否可以正确查询到
  169. check_resource_cypher = """
  170. MATCH (n:data_resource)
  171. WHERE id(n) = $resource_id
  172. RETURN n
  173. """
  174. check_resource = session.run(check_resource_cypher, resource_id=resource_id)
  175. if check_resource.single():
  176. logger.info(f"找到数据资源节点: ID={resource_id}")
  177. else:
  178. logger.error(f"无法找到数据资源节点: ID={resource_id}")
  179. continue
  180. # 创建关系
  181. rel_cypher = """
  182. MATCH (a:data_resource), (m:meta_data)
  183. WHERE id(a) = $resource_id AND id(m) = $meta_id
  184. MERGE (a)-[r:contain]->(m)
  185. RETURN r
  186. """
  187. rel_result = session.run(
  188. rel_cypher,
  189. resource_id=resource_id,
  190. meta_id=meta_id
  191. )
  192. rel_record = rel_result.single()
  193. if rel_record:
  194. logger.info(f"成功创建数据资源与元数据的关系: {resource_id} -> {meta_id}")
  195. else:
  196. logger.warning(f"创建数据资源与元数据的关系失败: {resource_id} -> {meta_id}")
  197. else:
  198. logger.error(f"未能创建或获取元数据节点: {item['name']}")
  199. # 处理数据源关系
  200. if data_source:
  201. try:
  202. # 创建或获取数据源节点
  203. data_source_en_name = handle_data_source(data_source)
  204. # 创建数据资源与数据源的关系
  205. if data_source_en_name:
  206. # 创建 isbelongto 关系
  207. rel_data_source_cypher = """
  208. MATCH (a:data_resource), (b:data_source)
  209. WHERE id(a) = $resource_id AND b.en_name = $ds_en_name
  210. MERGE (a)-[r:isbelongto]->(b)
  211. RETURN r
  212. """
  213. session.run(
  214. rel_data_source_cypher,
  215. resource_id=resource_id,
  216. ds_en_name=data_source_en_name
  217. )
  218. logger.info(f"已创建数据资源与数据源的关系: {resource_id} -> {data_source_en_name}")
  219. except Exception as e:
  220. logger.error(f"处理数据源关系失败: {str(e)}")
  221. raise RuntimeError(f"处理数据源关系失败: {str(e)}")
  222. return resource_id
  223. except Exception as e:
  224. logger.error(f"处理数据资源节点创建和关系建立失败: {str(e)}")
  225. raise
  226. def handle_id_resource(resource_id):
  227. """处理单个数据资源查询"""
  228. try:
  229. with neo4j_driver.get_session() as session:
  230. # 确保resource_id为整数
  231. try:
  232. resource_id_int = int(resource_id)
  233. except (ValueError, TypeError):
  234. logger.error(f"资源ID不是有效的整数: {resource_id}")
  235. return None
  236. # 使用数值ID查询
  237. cypher = """
  238. MATCH (n:data_resource)
  239. WHERE id(n) = $resource_id
  240. RETURN n
  241. """
  242. result = session.run(cypher, resource_id=resource_id_int)
  243. record = result.single()
  244. if not record:
  245. logger.error(f"未找到资源,ID: {resource_id_int}")
  246. return None
  247. # 构建返回数据
  248. data_resource = dict(record["n"])
  249. data_resource["id"] = record["n"].id
  250. # 查询关联的标签
  251. tag_cypher = """
  252. MATCH (n:data_resource)-[:label]->(t:data_label)
  253. WHERE id(n) = $resource_id
  254. RETURN t
  255. """
  256. tag_result = session.run(tag_cypher, resource_id=resource_id_int)
  257. tag_record = tag_result.single()
  258. if tag_record:
  259. tag = dict(tag_record["t"])
  260. tag["id"] = tag_record["t"].id
  261. data_resource["tag_info"] = tag
  262. # 查询关联的元数据 - 支持meta_data和Metadata两种标签
  263. meta_cypher = """
  264. MATCH (n:data_resource)-[:contain]->(m)
  265. WHERE id(n) = $resource_id
  266. AND (m:meta_data OR m:Metadata)
  267. RETURN m
  268. """
  269. meta_result = session.run(meta_cypher, resource_id=resource_id_int)
  270. meta_list = []
  271. for meta_record in meta_result:
  272. meta = dict(meta_record["m"])
  273. meta["id"] = meta_record["m"].id
  274. meta_list.append(meta)
  275. data_resource["meta_list"] = meta_list
  276. logger.info(f"成功获取资源详情,ID: {resource_id_int}")
  277. return data_resource
  278. except Exception as e:
  279. logger.error(f"处理单个数据资源查询失败: {str(e)}")
  280. return None
  281. def id_resource_graph(resource_id):
  282. """获取数据资源图谱"""
  283. try:
  284. with neo4j_driver.get_session() as session:
  285. # 查询数据资源节点及其关系
  286. cypher = """
  287. MATCH (n:data_resource)-[r]-(m)
  288. WHERE id(n) = $resource_id
  289. RETURN n, r, m
  290. """
  291. result = session.run(cypher, resource_id=int(resource_id))
  292. # 收集节点和关系
  293. nodes = {}
  294. relationships = []
  295. for record in result:
  296. # 处理源节点
  297. source_node = dict(record["n"])
  298. source_node["id"] = record["n"].id
  299. nodes[source_node["id"]] = source_node
  300. # 处理目标节点
  301. target_node = dict(record["m"])
  302. target_node["id"] = record["m"].id
  303. nodes[target_node["id"]] = target_node
  304. # 处理关系
  305. rel = record["r"]
  306. relationship = {
  307. "id": rel.id,
  308. "source": record["n"].id,
  309. "target": record["m"].id,
  310. "type": rel.type
  311. }
  312. relationships.append(relationship)
  313. return {
  314. "nodes": list(nodes.values()),
  315. "relationships": relationships
  316. }
  317. except Exception as e:
  318. logger.error(f"获取数据资源图谱失败: {str(e)}")
  319. return {"nodes": [], "relationships": []}
  320. def resource_list(page, page_size, en_name_filter=None, name_filter=None,
  321. type_filter='all', category_filter=None, tag_filter=None):
  322. """获取数据资源列表"""
  323. try:
  324. with neo4j_driver.get_session() as session:
  325. # 构建查询条件
  326. match_clause = "MATCH (n:data_resource)"
  327. where_conditions = []
  328. if en_name_filter:
  329. where_conditions.append(f"n.en_name CONTAINS '{en_name_filter}'")
  330. if name_filter:
  331. where_conditions.append(f"n.name CONTAINS '{name_filter}'")
  332. if type_filter and type_filter != 'all':
  333. where_conditions.append(f"n.type = '{type_filter}'")
  334. if category_filter:
  335. where_conditions.append(f"n.category = '{category_filter}'")
  336. # 标签过滤需要额外的匹配
  337. if tag_filter:
  338. match_clause += "-[:label]->(t:data_label)"
  339. where_conditions.append(f"t.name = '{tag_filter}'")
  340. where_clause = " WHERE " + " AND ".join(where_conditions) if where_conditions else ""
  341. # 计算总数
  342. count_cypher = f"{match_clause}{where_clause} RETURN count(n) as count"
  343. count_result = session.run(count_cypher)
  344. total_count = count_result.single()["count"]
  345. # 分页查询
  346. skip = (page - 1) * page_size
  347. cypher = f"""
  348. {match_clause}{where_clause}
  349. RETURN n
  350. ORDER BY n.time DESC
  351. SKIP {skip} LIMIT {page_size}
  352. """
  353. result = session.run(cypher)
  354. # 格式化结果
  355. resources = []
  356. for record in result:
  357. node = dict(record["n"])
  358. node["id"] = record["n"].id
  359. # 查询关联的标签
  360. tag_cypher = """
  361. MATCH (n:data_resource)-[:label]->(t:data_label)
  362. WHERE id(n) = $resource_id
  363. RETURN t
  364. """
  365. tag_result = session.run(tag_cypher, resource_id=node["id"])
  366. tag_record = tag_result.single()
  367. if tag_record:
  368. tag = dict(tag_record["t"])
  369. tag["id"] = tag_record["t"].id
  370. node["tag_info"] = tag
  371. resources.append(node)
  372. return resources, total_count
  373. except Exception as e:
  374. logger.error(f"获取数据资源列表失败: {str(e)}")
  375. return [], 0
  376. def id_data_search_list(resource_id, page, page_size, en_name_filter=None,
  377. name_filter=None, category_filter=None, tag_filter=None):
  378. """获取特定数据资源关联的元数据列表"""
  379. try:
  380. with neo4j_driver.get_session() as session:
  381. # 确保resource_id为整数
  382. try:
  383. resource_id_int = int(resource_id)
  384. except (ValueError, TypeError):
  385. logger.error(f"资源ID不是有效的整数: {resource_id}")
  386. return [], 0
  387. # 基本匹配语句 - 支持meta_data和Metadata标签
  388. match_clause = """
  389. MATCH (n:data_resource)-[:contain]->(m)
  390. WHERE id(n) = $resource_id
  391. AND (m:meta_data OR m:Metadata)
  392. """
  393. where_conditions = []
  394. if en_name_filter:
  395. where_conditions.append(f"m.en_name CONTAINS '{en_name_filter}'")
  396. if name_filter:
  397. where_conditions.append(f"m.name CONTAINS '{name_filter}'")
  398. if category_filter:
  399. where_conditions.append(f"m.category = '{category_filter}'")
  400. # 标签过滤需要额外的匹配
  401. tag_match = ""
  402. if tag_filter:
  403. tag_match = "MATCH (m)-[:HAS_TAG]->(t:Tag) WHERE t.name = $tag_filter"
  404. where_clause = " AND " + " AND ".join(where_conditions) if where_conditions else ""
  405. # 计算总数
  406. count_cypher = f"""
  407. {match_clause}{where_clause}
  408. {tag_match}
  409. RETURN count(m) as count
  410. """
  411. count_params = {"resource_id": resource_id_int}
  412. if tag_filter:
  413. count_params["tag_filter"] = tag_filter
  414. count_result = session.run(count_cypher, **count_params)
  415. total_count = count_result.single()["count"]
  416. # 分页查询
  417. skip = (page - 1) * page_size
  418. cypher = f"""
  419. {match_clause}{where_clause}
  420. {tag_match}
  421. RETURN m
  422. ORDER BY m.name
  423. SKIP {skip} LIMIT {page_size}
  424. """
  425. result = session.run(cypher, **count_params)
  426. # 格式化结果
  427. metadata_list = []
  428. for record in result:
  429. meta = dict(record["m"])
  430. meta["id"] = record["m"].id
  431. metadata_list.append(meta)
  432. logger.info(f"成功获取资源关联元数据,ID: {resource_id_int}, 元数据数量: {total_count}")
  433. return metadata_list, total_count
  434. except Exception as e:
  435. logger.error(f"获取数据资源关联的元数据列表失败: {str(e)}")
  436. return [], 0
  437. def resource_kinship_graph(resource_id, include_meta=True):
  438. """获取数据资源亲缘关系图谱"""
  439. try:
  440. with neo4j_driver.get_session() as session:
  441. # 确保resource_id为整数
  442. try:
  443. resource_id_int = int(resource_id)
  444. except (ValueError, TypeError):
  445. logger.error(f"资源ID不是有效的整数: {resource_id}")
  446. return {"nodes": [], "relationships": []}
  447. # 基本查询
  448. cypher_parts = [
  449. f"MATCH (n:data_resource) WHERE id(n) = $resource_id",
  450. "OPTIONAL MATCH (n)-[:label]->(l:data_label)",
  451. ]
  452. # 是否包含元数据 - 支持meta_data和Metadata两种标签
  453. if include_meta:
  454. cypher_parts.append("OPTIONAL MATCH (n)-[:contain]->(m) WHERE (m:meta_data OR m:Metadata)")
  455. cypher_parts.append("RETURN n, l, collect(m) as metadata")
  456. cypher = "\n".join(cypher_parts)
  457. result = session.run(cypher, resource_id=resource_id_int)
  458. record = result.single()
  459. if not record:
  460. logger.error(f"未找到资源图谱数据,ID: {resource_id_int}")
  461. return {"nodes": [], "relationships": []}
  462. # 收集节点和关系
  463. nodes = {}
  464. relationships = []
  465. # 处理数据资源节点
  466. resource_node = dict(record["n"])
  467. resource_node["id"] = record["n"].id
  468. resource_node["labels"] = list(record["n"].labels)
  469. nodes[resource_node["id"]] = resource_node
  470. # 处理标签节点
  471. if record["l"]:
  472. label_node = dict(record["l"])
  473. label_node["id"] = record["l"].id
  474. label_node["labels"] = list(record["l"].labels)
  475. nodes[label_node["id"]] = label_node
  476. # 添加资源-标签关系
  477. relationships.append({
  478. "id": f"rel-{resource_node['id']}-label-{label_node['id']}",
  479. "source": resource_node["id"],
  480. "target": label_node["id"],
  481. "type": "label"
  482. })
  483. # 处理元数据节点
  484. if include_meta and record["metadata"]:
  485. for meta in record["metadata"]:
  486. if meta: # 检查元数据节点是否存在
  487. meta_node = dict(meta)
  488. meta_node["id"] = meta.id
  489. meta_node["labels"] = list(meta.labels)
  490. nodes[meta_node["id"]] = meta_node
  491. # 添加资源-元数据关系
  492. relationships.append({
  493. "id": f"rel-{resource_node['id']}-contain-{meta_node['id']}",
  494. "source": resource_node["id"],
  495. "target": meta_node["id"],
  496. "type": "contain"
  497. })
  498. logger.info(f"成功获取资源图谱,ID: {resource_id_int}, 节点数: {len(nodes)}")
  499. return {
  500. "nodes": list(nodes.values()),
  501. "relationships": relationships
  502. }
  503. except Exception as e:
  504. logger.error(f"获取数据资源亲缘关系图谱失败: {str(e)}")
  505. return {"nodes": [], "relationships": []}
  506. def resource_impact_all_graph(resource_id, include_meta=True):
  507. """获取数据资源影响关系图谱"""
  508. try:
  509. with neo4j_driver.get_session() as session:
  510. # 确保resource_id为整数
  511. try:
  512. resource_id_int = int(resource_id)
  513. except (ValueError, TypeError):
  514. logger.error(f"资源ID不是有效的整数: {resource_id}")
  515. return {"nodes": [], "relationships": []}
  516. # 根据meta参数决定查询深度
  517. if include_meta:
  518. cypher = """
  519. MATCH path = (n:data_resource)-[*1..3]-(m)
  520. WHERE id(n) = $resource_id
  521. RETURN path
  522. """
  523. else:
  524. cypher = """
  525. MATCH path = (n:data_resource)-[*1..2]-(m)
  526. WHERE id(n) = $resource_id
  527. AND NOT (m:meta_data) AND NOT (m:Metadata)
  528. RETURN path
  529. """
  530. result = session.run(cypher, resource_id=resource_id_int)
  531. # 收集节点和关系
  532. nodes = {}
  533. relationships = {}
  534. for record in result:
  535. path = record["path"]
  536. # 处理路径中的所有节点
  537. for node in path.nodes:
  538. if node.id not in nodes:
  539. node_dict = dict(node)
  540. node_dict["id"] = node.id
  541. node_dict["labels"] = list(node.labels)
  542. nodes[node.id] = node_dict
  543. # 处理路径中的所有关系
  544. for rel in path.relationships:
  545. if rel.id not in relationships:
  546. rel_dict = {
  547. "id": rel.id,
  548. "source": rel.start_node.id,
  549. "target": rel.end_node.id,
  550. "type": rel.type
  551. }
  552. relationships[rel.id] = rel_dict
  553. logger.info(f"成功获取完整图谱,ID: {resource_id_int}, 节点数: {len(nodes)}")
  554. return {
  555. "nodes": list(nodes.values()),
  556. "relationships": list(relationships.values())
  557. }
  558. except Exception as e:
  559. logger.error(f"获取数据资源影响关系图谱失败: {str(e)}")
  560. return {"nodes": [], "relationships": []}
  561. def clean_type(type_str):
  562. """清洗SQL类型字符串"""
  563. # 提取基本类型,不包括长度或精度信息
  564. basic_type = re.sub(r'\(.*?\)', '', type_str).strip().upper()
  565. # 移除 VARYING 这样的后缀
  566. basic_type = re.sub(r'\s+VARYING$', '', basic_type)
  567. # 标准化常见类型
  568. type_mapping = {
  569. 'INT': 'INTEGER',
  570. 'INT4': 'INTEGER',
  571. 'INT8': 'BIGINT',
  572. 'SMALLINT': 'SMALLINT',
  573. 'BIGINT': 'BIGINT',
  574. 'FLOAT4': 'FLOAT',
  575. 'FLOAT8': 'DOUBLE',
  576. 'REAL': 'FLOAT',
  577. 'DOUBLE PRECISION': 'DOUBLE',
  578. 'NUMERIC': 'DECIMAL',
  579. 'BOOL': 'BOOLEAN',
  580. 'CHARACTER': 'CHAR',
  581. 'CHAR VARYING': 'VARCHAR',
  582. 'CHARACTER VARYING': 'VARCHAR',
  583. 'TEXT': 'TEXT',
  584. 'DATE': 'DATE',
  585. 'TIME': 'TIME',
  586. 'TIMESTAMP': 'TIMESTAMP',
  587. 'TIMESTAMPTZ': 'TIMESTAMP WITH TIME ZONE',
  588. 'BYTEA': 'BINARY',
  589. 'JSON': 'JSON',
  590. 'JSONB': 'JSONB',
  591. 'UUID': 'UUID',
  592. 'SERIAL': 'SERIAL',
  593. 'SERIAL4': 'SERIAL',
  594. 'SERIAL8': 'BIGSERIAL',
  595. 'BIGSERIAL': 'BIGSERIAL'
  596. }
  597. # 尝试从映射表中获取标准化的类型
  598. return type_mapping.get(basic_type, basic_type)
  599. def clean_field_name(field_name):
  600. """清洗字段名"""
  601. return field_name.strip('`').strip('"').strip("'")
  602. def select_create_ddl(sql_content):
  603. """从SQL内容中提取创建表的DDL语句"""
  604. try:
  605. # 解析复杂的SQL文件,识别所有的CREATE TABLE语句及其关联的注释
  606. # 找到所有以CREATE TABLE开头的语句块,每个语句块包含主语句和相关的注释
  607. # 首先,分割 SQL 内容按分号
  608. statements = []
  609. current_statement = ""
  610. in_string = False
  611. string_quote = None
  612. for char in sql_content:
  613. if char in ["'", '"']:
  614. if not in_string:
  615. in_string = True
  616. string_quote = char
  617. elif char == string_quote:
  618. in_string = False
  619. string_quote = None
  620. current_statement += char
  621. elif char == ';' and not in_string:
  622. current_statement += char
  623. if current_statement.strip():
  624. statements.append(current_statement.strip())
  625. current_statement = ""
  626. else:
  627. current_statement += char
  628. if current_statement.strip():
  629. statements.append(current_statement.strip())
  630. # 找出所有的CREATE TABLE语句和关联的注释
  631. create_table_statements = []
  632. create_index = -1
  633. in_table_block = False
  634. current_table = None
  635. current_block = ""
  636. for i, stmt in enumerate(statements):
  637. if re.search(r'^\s*CREATE\s+TABLE', stmt, re.IGNORECASE):
  638. # 如果已经在处理表,先保存当前块
  639. if in_table_block and current_block:
  640. create_table_statements.append(current_block)
  641. # 开始新的表块
  642. in_table_block = True
  643. current_block = stmt
  644. # 提取表名
  645. table_match = re.search(r'CREATE\s+TABLE\s+(?:(?:"[^"]+"|\'[^\']+\'|[^"\'\s\.]+)\.)?(?:"([^"]+)"|\'([^\']+)\'|([^"\'\s\(]+))', stmt, re.IGNORECASE)
  646. if table_match:
  647. current_table = table_match.group(1) or table_match.group(2) or table_match.group(3)
  648. current_table = current_table.strip('"\'') if current_table else ""
  649. elif in_table_block and (re.search(r'COMMENT\s+ON\s+TABLE', stmt, re.IGNORECASE) or
  650. re.search(r'COMMENT\s+ON\s+COLUMN', stmt, re.IGNORECASE)):
  651. # 检查注释是否属于当前表
  652. if current_table:
  653. # 表注释处理
  654. if re.search(r'COMMENT\s+ON\s+TABLE', stmt, re.IGNORECASE):
  655. table_comment_match = re.search(r'COMMENT\s+ON\s+TABLE\s+[\'"]?(\w+)[\'"]?', stmt, re.IGNORECASE)
  656. if table_comment_match:
  657. comment_table = table_comment_match.group(1).strip('"\'')
  658. if comment_table == current_table:
  659. current_block += " " + stmt
  660. else:
  661. # 这是另一个表的注释,当前表的DDL到此结束
  662. create_table_statements.append(current_block)
  663. in_table_block = False
  664. current_block = ""
  665. current_table = None
  666. # 列注释处理
  667. elif re.search(r'COMMENT\s+ON\s+COLUMN', stmt, re.IGNORECASE):
  668. column_comment_match = re.search(
  669. r'COMMENT\s+ON\s+COLUMN\s+[\'"]?(\w+)[\'"]?\.[\'"]?(\w+)[\'"]?\s+IS\s+\'([^\']+)\'',
  670. stmt,
  671. re.IGNORECASE
  672. )
  673. if column_comment_match:
  674. comment_table = column_comment_match.group(1)
  675. if comment_table == current_table:
  676. current_block += " " + stmt
  677. else:
  678. # 这是另一个表的注释,当前表的DDL到此结束
  679. create_table_statements.append(current_block)
  680. in_table_block = False
  681. current_block = ""
  682. current_table = None
  683. elif in_table_block and re.search(r'^\s*CREATE\s+', stmt, re.IGNORECASE):
  684. # 如果遇到新的CREATE语句(不是注释),保存当前块并结束
  685. create_table_statements.append(current_block)
  686. in_table_block = False
  687. current_block = ""
  688. current_table = None
  689. # 添加最后一个块
  690. if in_table_block and current_block:
  691. create_table_statements.append(current_block)
  692. # 日志记录
  693. logger.debug(f"提取到 {len(create_table_statements)} 个DDL语句")
  694. for i, stmt in enumerate(create_table_statements):
  695. logger.debug(f"DDL语句 {i+1}: {stmt}")
  696. return create_table_statements
  697. except Exception as e:
  698. logger.error(f"提取DDL语句失败: {str(e)}")
  699. # logger.error(traceback.format_exc())
  700. return []
  701. def table_sql(sql):
  702. """解析表定义SQL,支持带schema和不带schema两种格式"""
  703. try:
  704. # 支持以下格式:
  705. # 1. CREATE TABLE tablename
  706. # 2. CREATE TABLE "tablename"
  707. # 3. CREATE TABLE 'tablename'
  708. # 4. CREATE TABLE schema.tablename
  709. # 5. CREATE TABLE "schema"."tablename"
  710. # 6. CREATE TABLE 'schema'.'tablename'
  711. # 匹配表名,支持带引号和不带引号的情况
  712. table_pattern = r'CREATE\s+TABLE\s+(?:(?:"([^"]+)"|\'([^\']+)\'|([^"\'\s\.]+))\.)?(?:"([^"]+)"|\'([^\']+)\'|([^"\'\s\(]+))'
  713. table_match = re.search(table_pattern, sql, re.IGNORECASE)
  714. if not table_match:
  715. logger.error(f"无法匹配CREATE TABLE语句: {sql[:100]}...")
  716. return None
  717. # 获取表名
  718. schema = table_match.group(1) or table_match.group(2) or table_match.group(3)
  719. table_name = table_match.group(4) or table_match.group(5) or table_match.group(6)
  720. if not table_name:
  721. logger.error("无法解析表名")
  722. return None
  723. logger.debug(f"解析到表名: {table_name}")
  724. # 提取CREATE TABLE语句的主体部分(括号内的内容)
  725. body_pattern = r'CREATE\s+TABLE\s+[^(]*\((.*?)\)(?=\s*;|\s*$)'
  726. body_match = re.search(body_pattern, sql, re.DOTALL | re.IGNORECASE)
  727. if not body_match:
  728. logger.error("无法提取表主体内容")
  729. return None
  730. body_text = body_match.group(1).strip()
  731. logger.debug(f"表定义主体部分: {body_text}")
  732. # 解析字段定义
  733. fields = []
  734. # 分割字段定义,处理括号嵌套和引号
  735. field_defs = []
  736. pos = 0
  737. in_parentheses = 0
  738. in_quotes = False
  739. quote_char = None
  740. for i, char in enumerate(body_text):
  741. if char in ["'", '"', '`'] and (not in_quotes or char == quote_char):
  742. in_quotes = not in_quotes
  743. if in_quotes:
  744. quote_char = char
  745. else:
  746. quote_char = None
  747. elif char == '(' and not in_quotes:
  748. in_parentheses += 1
  749. elif char == ')' and not in_quotes:
  750. in_parentheses -= 1
  751. elif char == ',' and in_parentheses == 0 and not in_quotes:
  752. field_defs.append(body_text[pos:i].strip())
  753. pos = i + 1
  754. # 添加最后一个字段定义
  755. if pos < len(body_text):
  756. field_defs.append(body_text[pos:].strip())
  757. logger.debug(f"解析出 {len(field_defs)} 个字段定义")
  758. # 处理每个字段定义
  759. for field_def in field_defs:
  760. # 跳过约束定义
  761. if re.match(r'^\s*(?:PRIMARY|UNIQUE|FOREIGN|CHECK|CONSTRAINT)\s+', field_def, re.IGNORECASE):
  762. continue
  763. # 提取字段名和类型
  764. field_pattern = r'^\s*(?:"([^"]+)"|\'([^\']+)\'|`([^`]+)`|([a-zA-Z0-9_]+))\s+(.+?)(?:\s+DEFAULT\s+|\s+NOT\s+NULL|\s+REFERENCES|\s*$)'
  765. field_match = re.search(field_pattern, field_def, re.IGNORECASE)
  766. if field_match:
  767. # 提取字段名
  768. field_name = field_match.group(1) or field_match.group(2) or field_match.group(3) or field_match.group(4)
  769. # 提取类型
  770. field_type = field_match.group(5).strip()
  771. # 处理类型中可能的括号
  772. type_base = re.split(r'\s+', field_type)[0]
  773. clean_type_value = clean_type(type_base)
  774. fields.append((field_name, clean_type_value))
  775. logger.debug(f"解析到字段: {field_name}, 类型: {clean_type_value}")
  776. else:
  777. logger.warning(f"无法解析字段定义: {field_def}")
  778. # 提取表注释
  779. table_comment = ""
  780. table_comment_pattern = r"COMMENT\s+ON\s+TABLE\s+(?:['\"]?(\w+)['\"]?)\s+IS\s+'([^']+)'"
  781. table_comment_match = re.search(table_comment_pattern, sql, re.IGNORECASE)
  782. if table_comment_match:
  783. comment_table = table_comment_match.group(1)
  784. if comment_table.strip("'\"") == table_name.strip("'\""):
  785. table_comment = table_comment_match.group(2)
  786. logger.debug(f"找到表注释: {table_comment}")
  787. # 提取列注释
  788. comments = {}
  789. column_comment_pattern = r"COMMENT\s+ON\s+COLUMN\s+['\"]?(\w+)['\"]?\.['\"]?(\w+)['\"]?\s+IS\s+'([^']+)'"
  790. for match in re.finditer(column_comment_pattern, sql, re.IGNORECASE):
  791. comment_table = match.group(1)
  792. column_name = match.group(2)
  793. comment = match.group(3)
  794. # 检查表名是否匹配
  795. if comment_table.strip("'\"") == table_name.strip("'\""):
  796. comments[column_name] = comment
  797. logger.debug(f"找到列注释: {column_name} - {comment}")
  798. else:
  799. logger.debug(f"忽略列注释,表名不匹配: {comment_table} vs {table_name}")
  800. # 检查字段和注释匹配情况
  801. logger.debug("========字段和注释匹配情况========")
  802. field_names = [f[0] for f in fields]
  803. logger.debug(f"字段列表 ({len(field_names)}): {field_names}")
  804. logger.debug(f"注释字段 ({len(comments)}): {list(comments.keys())}")
  805. # 构建返回结果
  806. meta_list = []
  807. for field_name, field_type in fields:
  808. chinese_name = comments.get(field_name, "")
  809. meta_list.append({
  810. "en_name": field_name,
  811. "data_type": field_type,
  812. "name": chinese_name if chinese_name else field_name
  813. })
  814. # 检查表是否存在
  815. try:
  816. status = status_query([table_name])
  817. except Exception as e:
  818. logger.error(f"检查表存在状态失败: {str(e)}")
  819. status = [False]
  820. # 构建返回结果
  821. result = {
  822. table_name: {
  823. "exist": status[0] if status else False,
  824. "meta": meta_list
  825. }
  826. }
  827. logger.debug(f"解析结果: {json.dumps(result, ensure_ascii=False)}")
  828. return result
  829. except Exception as e:
  830. logger.error(f"解析表定义SQL失败: {str(e)}")
  831. logger.error(f"异常详情: {e}")
  832. import traceback
  833. logger.error(traceback.format_exc())
  834. return None
  835. # 判断英文表名是否在图谱中存在
  836. def status_query(key_list):
  837. query = """
  838. unwind $Key_list as name
  839. OPTIONAL MATCH (n:data_model {en_name: name})
  840. OPTIONAL MATCH (n:data_resource {en_name: name})
  841. OPTIONAL MATCH (n:data_metric {en_name: name})
  842. WITH name, CASE
  843. WHEN n IS NOT NULL THEN True
  844. ELSE False
  845. END AS exist
  846. return collect(exist)AS exist
  847. """
  848. with neo4j_driver.get_session() as session:
  849. result = session.run(query, Key_list=key_list)
  850. data = result.value() # 获取单个值
  851. return data
  852. def select_sql(sql_query):
  853. """解析SELECT查询语句"""
  854. try:
  855. # 提取SELECT子句
  856. select_pattern = r'SELECT\s+(.*?)\s+FROM'
  857. select_match = re.search(select_pattern, sql_query, re.IGNORECASE | re.DOTALL)
  858. if not select_match:
  859. return None
  860. select_clause = select_match.group(1)
  861. # 分割字段
  862. fields = []
  863. # 处理字段列表,避免在函数调用中的逗号导致错误分割
  864. in_parenthesis = 0
  865. current_field = ""
  866. for char in select_clause:
  867. if char == '(':
  868. in_parenthesis += 1
  869. current_field += char
  870. elif char == ')':
  871. in_parenthesis -= 1
  872. current_field += char
  873. elif char == ',' and in_parenthesis == 0:
  874. fields.append(current_field.strip())
  875. current_field = ""
  876. else:
  877. current_field += char
  878. if current_field.strip():
  879. fields.append(current_field.strip())
  880. # 解析每个字段
  881. parsed_fields = []
  882. for field in fields:
  883. # 检查是否有字段别名
  884. alias_pattern = r'(.*?)\s+[aA][sS]\s+(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))$'
  885. alias_match = re.search(alias_pattern, field)
  886. if alias_match:
  887. field_expr = alias_match.group(1).strip()
  888. field_alias = next((g for g in alias_match.groups()[1:] if g is not None), "")
  889. parsed_fields.append({
  890. "expression": field_expr,
  891. "alias": field_alias
  892. })
  893. else:
  894. # 没有别名的情况
  895. parsed_fields.append({
  896. "expression": field.strip(),
  897. "alias": None
  898. })
  899. # 提取FROM子句和表名
  900. from_pattern = r'FROM\s+(.*?)(?:\s+WHERE|\s+GROUP|\s+HAVING|\s+ORDER|\s+LIMIT|$)'
  901. from_match = re.search(from_pattern, sql_query, re.IGNORECASE | re.DOTALL)
  902. tables = []
  903. if from_match:
  904. from_clause = from_match.group(1).strip()
  905. # 分析FROM子句中的表
  906. table_pattern = r'(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))(?:\s+(?:AS\s+)?(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))?'
  907. for match in re.finditer(table_pattern, from_clause):
  908. table_name = match.group(1) or match.group(2) or match.group(3) or match.group(4)
  909. if table_name:
  910. tables.append(table_name)
  911. return tables
  912. except Exception as e:
  913. logger.error(f"解析SELECT查询语句失败: {str(e)}")
  914. # logger.error(traceback.format_exc())
  915. return None
  916. def model_resource_list(page, page_size, name_filter=None):
  917. """获取模型资源列表"""
  918. try:
  919. with neo4j_driver.get_session() as session:
  920. # 构建查询条件
  921. match_clause = "MATCH (n:model_resource)"
  922. where_clause = ""
  923. if name_filter:
  924. where_clause = f" WHERE n.name CONTAINS '{name_filter}'"
  925. # 计算总数
  926. count_cypher = f"{match_clause}{where_clause} RETURN count(n) as count"
  927. count_result = session.run(count_cypher)
  928. total_count = count_result.single()["count"]
  929. # 分页查询
  930. skip = (page - 1) * page_size
  931. cypher = f"""
  932. {match_clause}{where_clause}
  933. RETURN n
  934. ORDER BY n.createTime DESC
  935. SKIP {skip} LIMIT {page_size}
  936. """
  937. result = session.run(cypher)
  938. # 格式化结果
  939. resources = []
  940. for record in result:
  941. node = dict(record["n"])
  942. node["id"] = record["n"].id
  943. resources.append(node)
  944. return resources, total_count
  945. except Exception as e:
  946. logger.error(f"获取模型资源列表失败: {str(e)}")
  947. return [], 0
  948. def data_resource_edit(data):
  949. """编辑数据资源"""
  950. try:
  951. resource_id = data.get("id")
  952. if not resource_id:
  953. raise ValueError("缺少资源ID")
  954. with neo4j_driver.get_session() as session:
  955. # 更新节点属性
  956. update_fields = {}
  957. for key, value in data.items():
  958. if key != "id" and key != "tag":
  959. update_fields[key] = value
  960. # 添加更新时间
  961. update_fields["updateTime"] = get_formatted_time()
  962. # 构建更新语句
  963. set_clause = ", ".join([f"n.{k} = ${k}" for k in update_fields.keys()])
  964. cypher = f"""
  965. MATCH (n:data_resource)
  966. WHERE id(n) = $resource_id
  967. SET {set_clause}
  968. RETURN n
  969. """
  970. result = session.run(cypher, resource_id=int(resource_id), **update_fields)
  971. updated_node = result.single()
  972. if not updated_node:
  973. raise ValueError("资源不存在")
  974. # 处理标签关系
  975. tag_id = data.get("tag")
  976. if tag_id:
  977. # 删除旧的标签关系
  978. delete_rel_cypher = """
  979. MATCH (n:data_resource)-[r:label]->()
  980. WHERE id(n) = $resource_id
  981. DELETE r
  982. """
  983. session.run(delete_rel_cypher, resource_id=int(resource_id))
  984. # 创建新的标签关系
  985. create_rel_cypher = """
  986. MATCH (n:data_resource), (t:data_label)
  987. WHERE id(n) = $resource_id AND id(t) = $tag_id
  988. CREATE (n)-[r:label]->(t)
  989. RETURN r
  990. """
  991. session.run(create_rel_cypher, resource_id=int(resource_id), tag_id=int(tag_id))
  992. # 返回更新后的节点
  993. return dict(updated_node["n"])
  994. except Exception as e:
  995. logger.error(f"编辑数据资源失败: {str(e)}")
  996. raise
  997. def handle_data_source(data_source):
  998. """处理数据源的检查和创建
  999. 参数:
  1000. data_source: 包含数据源信息的字典,支持两种情况:
  1001. 1. 简单情况(只需查询已有数据源),只要包含en_name即可:
  1002. {
  1003. "en_name": "10-52-31-104_5432_inventory"
  1004. }
  1005. {
  1006. "en_name": "10-52-31-104_5432_inventory",
  1007. "name": "教师数据库"
  1008. }
  1009. 2. 完整的数据源信息(用于创建新数据源):
  1010. {
  1011. "en_name": "10-52-31-104_5432_inventory",
  1012. "name": "教师数据库",
  1013. "type": "postgresql",
  1014. "host": "10.52.31.104",
  1015. "port": 5432,
  1016. "database": "inventory_management",
  1017. "username": "app_user",
  1018. "password": "pG$ecur3P@ss789"
  1019. }
  1020. 返回:
  1021. 成功时返回数据源的名称,失败时抛出异常
  1022. """
  1023. try:
  1024. # 获取数据源名称
  1025. ds_en_name = data_source.get("en_name")
  1026. if not ds_en_name:
  1027. raise ValueError("数据源信息不完整,缺少名称")
  1028. # 检查是否为简单查询模式
  1029. # 判断数据源是否包含创建新数据源所需的必要字段
  1030. required_fields = ["type", "host", "port", "database", "username"]
  1031. has_required_fields = all(data_source.get(field) for field in required_fields)
  1032. with neo4j_driver.get_session() as session:
  1033. if not has_required_fields:
  1034. # 简单查询模式:只通过en_name查找已有数据源
  1035. logger.info(f"简单数据源查询模式,查找en_name为: {ds_en_name}")
  1036. check_name_cypher = """
  1037. MATCH (ds:data_source {en_name: $en_name})
  1038. RETURN ds
  1039. """
  1040. check_result = session.run(check_name_cypher, en_name=ds_en_name)
  1041. existing_record = check_result.single()
  1042. if existing_record:
  1043. # 数据源已存在,返回其名称
  1044. existing_data_source = dict(existing_record["ds"])
  1045. logger.info(f"根据名称找到现有数据源: {existing_data_source.get('en_name')}")
  1046. return existing_data_source.get("en_name")
  1047. else:
  1048. # 数据源不存在,抛出异常
  1049. raise ValueError(f"未找到名称为 {ds_en_name} 的数据源,请先创建该数据源或提供完整的数据源信息")
  1050. else:
  1051. # 完整的数据源信息模式:创建或获取数据源
  1052. # 确保name不为空,如果为空则使用en_name
  1053. if "name" not in data_source or not data_source["name"]:
  1054. data_source["name"] = ds_en_name
  1055. logger.info(f"数据源name为空,使用en_name作为替代: {ds_en_name}")
  1056. # 检查是否已存在相同数据源(除名称和密码外属性相同)
  1057. check_cypher = """
  1058. MATCH (ds:data_source)
  1059. WHERE ds.type = $type AND
  1060. ds.host = $host AND
  1061. ds.port = $port AND
  1062. ds.database = $database AND
  1063. ds.username = $username
  1064. RETURN ds
  1065. """
  1066. connection_info = {
  1067. "type": data_source.get("type", "").lower(),
  1068. "host": data_source.get("host"),
  1069. "port": data_source.get("port"),
  1070. "database": data_source.get("database"), # 支持新旧字段名
  1071. "username": data_source.get("username"), # 支持新旧字段名
  1072. "password": data_source.get("password") # 支持新旧字段名
  1073. }
  1074. check_result = session.run(
  1075. check_cypher,
  1076. type=connection_info["type"],
  1077. host=connection_info["host"],
  1078. port=connection_info["port"],
  1079. database=connection_info["database"],
  1080. username=connection_info["username"]
  1081. )
  1082. existing_record = check_result.single()
  1083. if existing_record:
  1084. # 数据源已存在,返回其名称
  1085. existing_data_source = dict(existing_record["ds"])
  1086. logger.info(f"找到现有数据源: {existing_data_source.get('en_name')}")
  1087. return existing_data_source.get("en_name")
  1088. # 数据源不存在,创建新节点
  1089. create_cypher = """
  1090. CREATE (ds:data_source $properties)
  1091. RETURN ds
  1092. """
  1093. # 添加创建时间
  1094. data_source["createTime"] = get_formatted_time()
  1095. create_result = session.run(create_cypher, properties=connection_info)
  1096. created_record = create_result.single()
  1097. if not created_record:
  1098. raise RuntimeError("创建数据源节点失败")
  1099. new_data_source = dict(created_record["ds"])
  1100. logger.info(f"创建新数据源: {new_data_source.get('en_name')}")
  1101. return new_data_source.get("en_name")
  1102. except Exception as e:
  1103. logger.error(f"处理数据源失败: {str(e)}")
  1104. raise RuntimeError(f"处理数据源失败: {str(e)}")