resource.py 79 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825
  1. import json
  2. import re
  3. import logging
  4. from py2neo import Relationship
  5. import pandas as pd
  6. from app.services.neo4j_driver import neo4j_driver
  7. from app.core.graph.graph_operations import create_or_get_node, relationship_exists, get_node, connect_graph
  8. import time
  9. from datetime import datetime
  10. from app.core.meta_data import get_formatted_time, translate_and_parse
  11. from app import db
  12. from sqlalchemy import text
  13. logger = logging.getLogger("app")
  14. def serialize_neo4j_object(obj):
  15. """
  16. 将Neo4j对象转换为可JSON序列化的格式
  17. Args:
  18. obj: Neo4j节点或属性值
  19. Returns:
  20. 序列化后的对象
  21. """
  22. if hasattr(obj, 'year'): # DateTime对象
  23. # 将Neo4j DateTime转换为字符串
  24. return obj.strftime("%Y-%m-%d %H:%M:%S") if hasattr(obj, 'strftime') else str(obj)
  25. elif hasattr(obj, '__dict__'): # 复杂对象
  26. return str(obj)
  27. else:
  28. return obj
  29. def serialize_node_properties(node):
  30. """
  31. 将Neo4j节点属性序列化为可JSON化的字典
  32. Args:
  33. node: Neo4j节点对象
  34. Returns:
  35. dict: 序列化后的属性字典
  36. """
  37. properties = {}
  38. for key, value in dict(node).items():
  39. properties[key] = serialize_neo4j_object(value)
  40. return properties
  41. def get_formatted_time():
  42. """获取格式化的当前时间"""
  43. return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  44. def get_node_by_id(label, id):
  45. """根据ID获取指定标签的节点"""
  46. try:
  47. with neo4j_driver.get_session() as session:
  48. # 确保id为整数
  49. try:
  50. id_int = int(id)
  51. except (ValueError, TypeError):
  52. logger.error(f"节点ID不是有效的整数: {id}")
  53. return None
  54. cypher = f"MATCH (n:{label}) WHERE id(n) = $id RETURN n "
  55. result = session.run(cypher, {'id': id_int}) # type: ignore[arg-type]
  56. record = result.single()
  57. return record["n"] if record else None
  58. except Exception as e:
  59. logger.error(f"根据ID获取节点失败: {str(e)}")
  60. return None
  61. def get_node_by_id_no_label(id):
  62. """根据ID获取节点,不限制标签"""
  63. try:
  64. with neo4j_driver.get_session() as session:
  65. # 确保id为整数
  66. try:
  67. id_int = int(id)
  68. except (ValueError, TypeError):
  69. logger.error(f"节点ID不是有效的整数: {id}")
  70. return None
  71. cypher = "MATCH (n) WHERE id(n) = $id RETURN n"
  72. result = session.run(cypher, {'id': id_int})
  73. record = result.single()
  74. return record["n"] if record else None
  75. except Exception as e:
  76. logger.error(f"根据ID获取节点失败: {str(e)}")
  77. return None
  78. def delete_relationships(start_node, rel_type=None, end_node=None):
  79. """删除关系"""
  80. try:
  81. with neo4j_driver.get_session() as session:
  82. if rel_type and end_node:
  83. cypher = "MATCH (a)-[r:`{rel_type}`]->(b) WHERE id(a) = $start_id AND id(b) = $end_id DELETE r"
  84. cypher = cypher.replace("{rel_type}", rel_type)
  85. session.run(cypher, {'start_id': start_node.id, 'end_id': end_node.id}) # type: ignore[arg-type]
  86. elif rel_type:
  87. cypher = "MATCH (a)-[r:`{rel_type}`]->() WHERE id(a) = $start_id DELETE r"
  88. cypher = cypher.replace("{rel_type}", rel_type)
  89. session.run(cypher, {'start_id': start_node.id}) # type: ignore[arg-type]
  90. else:
  91. cypher = "MATCH (a)-[r]->() WHERE id(a) = $start_id DELETE r"
  92. session.run(cypher, {'start_id': start_node.id})
  93. return True
  94. except Exception as e:
  95. logger.error(f"删除关系失败: {str(e)}")
  96. return False
  97. def update_or_create_node(label, **properties):
  98. """更新或创建节点"""
  99. try:
  100. with neo4j_driver.get_session() as session:
  101. node_id = properties.pop('id', None)
  102. if node_id:
  103. # 更新现有节点
  104. set_clause = ", ".join([f"n.{k} = ${k}" for k in properties.keys()])
  105. cypher = f"MATCH (n:{label}) WHERE id(n) = $id SET {set_clause} RETURN n"
  106. params = {'id': int(node_id)}
  107. params.update(properties)
  108. result = session.run(cypher, params) # type: ignore[arg-type]
  109. else:
  110. # 创建新节点
  111. props_str = ", ".join([f"{k}: ${k}" for k in properties.keys()])
  112. cypher = f"CREATE (n:{label} {{{props_str}}}) RETURN n"
  113. result = session.run(cypher, properties) # type: ignore[arg-type]
  114. record = result.single()
  115. return record["n"] if record else None
  116. except Exception as e:
  117. logger.error(f"更新或创建节点失败: {str(e)}")
  118. return None
  119. def handle_businessdomain_node(receiver, head_data, data_source=None, resource_type=None):
  120. """处理业务领域节点创建和关系建立"""
  121. try:
  122. # 验证必要参数
  123. if not resource_type:
  124. raise ValueError("resource_type参数不能为空")
  125. # 更新属性
  126. update_attributes = {
  127. 'name_en': receiver.get('name_en', receiver.get('name_zh', '')),
  128. 'create_time': get_formatted_time(),
  129. 'type': resource_type
  130. }
  131. # 记录describe字段
  132. if "describe" in receiver:
  133. logger.info(f"创建业务领域,describe字段将被设置为: {receiver.get('describe')}")
  134. else:
  135. logger.info("创建业务领域,describe字段不在创建数据中")
  136. # 清理不需要的属性
  137. receiver_copy = receiver.copy()
  138. if 'additional_info' in receiver_copy:
  139. del receiver_copy['additional_info']
  140. if 'data_source' in receiver_copy:
  141. del receiver_copy['data_source']
  142. tag_list = receiver_copy.get('tag')
  143. receiver_copy.update(update_attributes)
  144. # 创建或获取 BusinessDomain 节点
  145. with neo4j_driver.get_session() as session:
  146. props_str = ", ".join([f"{k}: ${k}" for k in receiver_copy.keys()])
  147. cypher = f"""
  148. MERGE (n:BusinessDomain {{name_zh: $name_zh}})
  149. ON CREATE SET n = {{{props_str}}}
  150. ON MATCH SET {", ".join([f"n.{k} = ${k}" for k in receiver_copy.keys()])}
  151. RETURN n
  152. """
  153. result = session.run(cypher, receiver_copy) # type: ignore[arg-type]
  154. record = result.single()
  155. if not record:
  156. raise ValueError("Failed to create or get BusinessDomain node")
  157. business_domain_node = record["n"]
  158. domain_id = business_domain_node.id
  159. logger.info(f"创建BusinessDomain节点成功,ID={domain_id}, describe字段: {business_domain_node.get('describe')}")
  160. # 处理标签关系
  161. if tag_list:
  162. tag_node = get_node_by_id('DataLabel', tag_list)
  163. if tag_node:
  164. rel_check = """
  165. MATCH (a:BusinessDomain)-[r:LABEL]->(b:DataLabel)
  166. WHERE id(a) = $domain_id AND id(b) = $tag_id
  167. RETURN r
  168. """
  169. rel_result = session.run(rel_check, {'domain_id': domain_id, 'tag_id': tag_node.id})
  170. if not rel_result.single():
  171. rel_create = """
  172. MATCH (a:BusinessDomain), (b:DataLabel)
  173. WHERE id(a) = $domain_id AND id(b) = $tag_id
  174. CREATE (a)-[r:LABEL]->(b)
  175. RETURN r
  176. """
  177. session.run(rel_create, {'domain_id': domain_id, 'tag_id': tag_node.id})
  178. logger.info(f"成功创建BusinessDomain与DataLabel的LABEL关系")
  179. # 处理头部数据(元数据,字段)
  180. if head_data:
  181. for item in head_data:
  182. meta_cypher = """
  183. MERGE (m:DataMeta {name_zh: $name_zh})
  184. ON CREATE SET m.name_en = $name_en,
  185. m.create_time = $create_time,
  186. m.data_type = $data_type,
  187. m.status = true
  188. ON MATCH SET m.data_type = $data_type,
  189. m.status = true
  190. RETURN m
  191. """
  192. create_time = get_formatted_time()
  193. meta_result = session.run(meta_cypher, {
  194. 'name_zh': item['name_zh'],
  195. 'name_en': item['name_en'],
  196. 'create_time': create_time,
  197. 'data_type': item['data_type']
  198. })
  199. meta_record = meta_result.single()
  200. if meta_record and meta_record["m"]:
  201. meta_node = meta_record["m"]
  202. meta_id = meta_node.id
  203. logger.info(f"创建或获取到元数据节点: ID={meta_id}, name_zh={item['name_zh']}")
  204. # 创建BusinessDomain与DataMeta的关系
  205. rel_cypher = """
  206. MATCH (a:BusinessDomain), (m:DataMeta)
  207. WHERE id(a) = $domain_id AND id(m) = $meta_id
  208. MERGE (a)-[r:INCLUDES]->(m)
  209. RETURN r
  210. """
  211. rel_result = session.run(rel_cypher, {
  212. 'domain_id': domain_id,
  213. 'meta_id': meta_id
  214. })
  215. rel_record = rel_result.single()
  216. if rel_record:
  217. logger.info(f"成功创建BusinessDomain与元数据的关系: {domain_id} -> {meta_id}")
  218. else:
  219. logger.warning(f"创建BusinessDomain与元数据的关系失败: {domain_id} -> {meta_id}")
  220. else:
  221. logger.error(f"未能创建或获取元数据节点: {item['name_zh']}")
  222. # 处理数据源关系
  223. if data_source:
  224. try:
  225. data_source_id = None
  226. data_source_name_en = None
  227. if isinstance(data_source, (int, float)) or (isinstance(data_source, str) and data_source.isdigit()):
  228. data_source_id = int(data_source)
  229. logger.info(f"data_source 为节点ID: {data_source_id}")
  230. elif isinstance(data_source, dict) and data_source.get('name_en'):
  231. data_source_name_en = data_source['name_en']
  232. logger.info(f"data_source 为字典,提取name_en: {data_source_name_en}")
  233. elif isinstance(data_source, str):
  234. data_source_name_en = data_source
  235. logger.info(f"data_source 为字符串name_en: {data_source_name_en}")
  236. if data_source_id is not None:
  237. check_ds_cypher = "MATCH (b:DataSource) WHERE id(b) = $ds_id RETURN b"
  238. check_ds_result = session.run(check_ds_cypher, {'ds_id': data_source_id})
  239. if not check_ds_result.single():
  240. logger.warning(f"数据源节点不存在: ID={data_source_id},跳过关系创建")
  241. else:
  242. rel_data_source_cypher = """
  243. MATCH (a:BusinessDomain), (b:DataSource)
  244. WHERE id(a) = $domain_id AND id(b) = $ds_id
  245. MERGE (a)-[r:COME_FROM]->(b)
  246. RETURN r
  247. """
  248. rel_result = session.run(rel_data_source_cypher, {
  249. 'domain_id': domain_id,
  250. 'ds_id': data_source_id
  251. })
  252. rel_record = rel_result.single()
  253. if rel_record:
  254. logger.info(f"已创建BusinessDomain与数据源的COME_FROM关系: domain_id={domain_id} -> data_source_id={data_source_id}")
  255. else:
  256. logger.warning(f"创建COME_FROM关系失败,但不中断主流程: {domain_id} -> {data_source_id}")
  257. elif data_source_name_en:
  258. check_ds_cypher = "MATCH (b:DataSource) WHERE b.name_en = $ds_name_en RETURN b"
  259. check_ds_result = session.run(check_ds_cypher, {'ds_name_en': data_source_name_en})
  260. if not check_ds_result.single():
  261. logger.warning(f"数据源节点不存在: name_en={data_source_name_en},跳过关系创建")
  262. else:
  263. rel_data_source_cypher = """
  264. MATCH (a:BusinessDomain), (b:DataSource)
  265. WHERE id(a) = $domain_id AND b.name_en = $ds_name_en
  266. MERGE (a)-[r:COME_FROM]->(b)
  267. RETURN r
  268. """
  269. rel_result = session.run(rel_data_source_cypher, {
  270. 'domain_id': domain_id,
  271. 'ds_name_en': data_source_name_en
  272. })
  273. rel_record = rel_result.single()
  274. if rel_record:
  275. logger.info(f"已创建BusinessDomain与数据源的COME_FROM关系: domain_id={domain_id} -> name_en={data_source_name_en}")
  276. else:
  277. logger.warning(f"创建COME_FROM关系失败,但不中断主流程: {domain_id} -> {data_source_name_en}")
  278. else:
  279. logger.warning(f"data_source参数无效,无法识别格式: {data_source}")
  280. except Exception as e:
  281. logger.error(f"处理数据源关系失败(不中断主流程): {str(e)}")
  282. # 创建与"数据资源"标签的BELONGS_TO关系
  283. try:
  284. data_model_label_cypher = """
  285. MATCH (label:DataLabel {name_zh: '数据资源'})
  286. RETURN label
  287. """
  288. label_result = session.run(data_model_label_cypher)
  289. label_record = label_result.single()
  290. if label_record:
  291. data_model_label_id = label_record["label"].id
  292. # 创建BELONGS_TO关系
  293. belongs_to_cypher = """
  294. MATCH (domain:BusinessDomain), (label:DataLabel)
  295. WHERE id(domain) = $domain_id AND id(label) = $label_id
  296. MERGE (domain)-[r:BELONGS_TO]->(label)
  297. RETURN r
  298. """
  299. belongs_result = session.run(belongs_to_cypher, {
  300. 'domain_id': domain_id,
  301. 'label_id': data_model_label_id
  302. })
  303. if belongs_result.single():
  304. logger.info(f"成功创建BusinessDomain与'数据资源'标签的BELONGS_TO关系: {domain_id} -> {data_model_label_id}")
  305. else:
  306. logger.warning(f"创建BELONGS_TO关系失败: {domain_id} -> {data_model_label_id}")
  307. else:
  308. logger.warning("未找到'数据资源'标签,跳过BELONGS_TO关系创建")
  309. except Exception as e:
  310. logger.error(f"创建与'数据资源'标签的关系失败: {str(e)}")
  311. return domain_id
  312. except Exception as e:
  313. logger.error(f"处理业务领域节点创建和关系建立失败: {str(e)}")
  314. raise
  315. # 数据资源-元数据 关系节点创建、查询
  316. def handle_node(receiver, head_data, data_source=None, resource_type=None):
  317. """处理数据资源节点创建和关系建立"""
  318. try:
  319. # 验证必要参数
  320. if not resource_type:
  321. raise ValueError("resource_type参数不能为空")
  322. # 更新属性
  323. update_attributes = {
  324. 'name_en': receiver.get('name_en', receiver.get('name_zh', '')),
  325. 'create_time': get_formatted_time(),
  326. 'type': resource_type # 直接使用传入的资源类型值
  327. }
  328. # 记录describe字段是否存在于创建数据中
  329. if "describe" in receiver:
  330. logger.info(f"创建资源,describe字段将被设置为: {receiver.get('describe')}")
  331. else:
  332. logger.info("创建资源,describe字段不在创建数据中")
  333. if 'additional_info' in receiver:
  334. del receiver['additional_info']
  335. # 从receiver中移除data_source属性,避免将复杂对象作为节点属性
  336. if 'data_source' in receiver:
  337. del receiver['data_source']
  338. tag_list = receiver.get('tag')
  339. receiver.update(update_attributes)
  340. # 创建或获取 DataResource 节点
  341. with neo4j_driver.get_session() as session:
  342. props_str = ", ".join([f"{k}: ${k}" for k in receiver.keys()])
  343. cypher = f"""
  344. MERGE (n:DataResource {{name_zh: $name_zh}})
  345. ON CREATE SET n = {{{props_str}}}
  346. ON MATCH SET {", ".join([f"n.{k} = ${k}" for k in receiver.keys()])}
  347. RETURN n
  348. """
  349. result = session.run(cypher, receiver) # type: ignore[arg-type]
  350. record = result.single()
  351. if not record:
  352. raise ValueError("Failed to create or get DataResource node")
  353. data_resource_node = record["n"]
  354. resource_id = data_resource_node.id # 使用id属性获取数值ID
  355. # 记录创建后的节点数据
  356. logger.info(f"创建后的节点数据,describe字段: {data_resource_node.get('describe')}")
  357. # 处理标签关系
  358. if tag_list:
  359. tag_node = get_node_by_id('DataLabel', tag_list)
  360. if tag_node:
  361. # 检查关系是否存在
  362. rel_check = """
  363. MATCH (a:DataResource)-[r:LABEL]->(b:DataLabel)
  364. WHERE id(a) = $resource_id AND id(b) = $tag_id
  365. RETURN r
  366. """
  367. rel_result = session.run(rel_check, {'resource_id': resource_id, 'tag_id': tag_node.id}) # 使用数值id
  368. # 如果关系不存在则创建
  369. if not rel_result.single():
  370. rel_create = """
  371. MATCH (a:DataResource), (b:DataLabel)
  372. WHERE id(a) = $resource_id AND id(b) = $tag_id
  373. CREATE (a)-[r:LABEL]->(b)
  374. RETURN r
  375. """
  376. session.run(rel_create, {'resource_id': resource_id, 'tag_id': tag_node.id})
  377. # 处理头部数据(元数据,字段)
  378. if head_data:
  379. for item in head_data:
  380. # 创建元数据节点
  381. meta_cypher = """
  382. MERGE (m:DataMeta {name_zh: $name_zh})
  383. ON CREATE SET m.name_en = $name_en,
  384. m.create_time = $create_time,
  385. m.data_type = $data_type,
  386. m.status = true
  387. ON MATCH SET m.data_type = $data_type,
  388. m.status = true
  389. RETURN m
  390. """
  391. create_time = get_formatted_time()
  392. meta_result = session.run(meta_cypher, {
  393. 'name_zh': item['name_zh'],
  394. 'name_en': item['name_en'],
  395. 'create_time': create_time,
  396. 'data_type': item['data_type'] # 使用data_type作为data_type属性
  397. })
  398. meta_record = meta_result.single()
  399. if meta_record and meta_record["m"]:
  400. meta_node = meta_record["m"]
  401. meta_id = meta_node.id # 使用数值ID
  402. # 打印日志确认节点创建成功和ID
  403. logger.info(f"创建或获取到元数据节点: ID={meta_id}, name_zh={item['name_zh']}")
  404. # 确认数据资源节点是否可以正确查询到
  405. check_resource_cypher = """
  406. MATCH (n:DataResource)
  407. WHERE id(n) = $resource_id
  408. RETURN n
  409. """
  410. check_resource = session.run(check_resource_cypher, {'resource_id': resource_id})
  411. if check_resource.single():
  412. logger.info(f"找到数据资源节点: ID={resource_id}")
  413. else:
  414. logger.error(f"无法找到数据资源节点: ID={resource_id}")
  415. continue
  416. # 创建关系
  417. rel_cypher = """
  418. MATCH (a:DataResource), (m:DataMeta)
  419. WHERE id(a) = $resource_id AND id(m) = $meta_id
  420. MERGE (a)-[r:INCLUDES]->(m)
  421. RETURN r
  422. """
  423. rel_result = session.run(rel_cypher, {
  424. 'resource_id': resource_id,
  425. 'meta_id': meta_id
  426. })
  427. rel_record = rel_result.single()
  428. if rel_record:
  429. logger.info(f"成功创建数据资源与元数据的关系: {resource_id} -> {meta_id}")
  430. else:
  431. logger.warning(f"创建数据资源与元数据的关系失败: {resource_id} -> {meta_id}")
  432. else:
  433. logger.error(f"未能创建或获取元数据节点: {item['name_zh']}")
  434. # 处理数据源关系 - 支持所有资源类型
  435. if data_source:
  436. try:
  437. # 获取数据源节点的标识(支持多种格式)
  438. data_source_id = None
  439. data_source_name_en = None
  440. # 1. 如果是数字(节点ID)
  441. if isinstance(data_source, (int, float)) or (isinstance(data_source, str) and data_source.isdigit()):
  442. data_source_id = int(data_source)
  443. logger.info(f"data_source 为节点ID: {data_source_id}")
  444. # 2. 如果是字典且包含name_en
  445. elif isinstance(data_source, dict) and data_source.get('name_en'):
  446. data_source_name_en = data_source['name_en']
  447. logger.info(f"data_source 为字典,提取name_en: {data_source_name_en}")
  448. # 3. 如果是字符串(name_en)
  449. elif isinstance(data_source, str):
  450. data_source_name_en = data_source
  451. logger.info(f"data_source 为字符串name_en: {data_source_name_en}")
  452. # 创建数据资源与数据源的关系
  453. if data_source_id is not None:
  454. # 使用节点ID创建关系
  455. # 首先检查数据源节点是否存在
  456. check_ds_cypher = "MATCH (b:DataSource) WHERE id(b) = $ds_id RETURN b"
  457. check_ds_result = session.run(check_ds_cypher, {'ds_id': data_source_id})
  458. if not check_ds_result.single():
  459. logger.warning(f"数据源节点不存在: ID={data_source_id},跳过关系创建")
  460. else:
  461. # 创建 COME_FROM 关系
  462. rel_data_source_cypher = """
  463. MATCH (a:DataResource), (b:DataSource)
  464. WHERE id(a) = $resource_id AND id(b) = $ds_id
  465. MERGE (a)-[r:COME_FROM]->(b)
  466. RETURN r
  467. """
  468. rel_result = session.run(rel_data_source_cypher, {
  469. 'resource_id': resource_id,
  470. 'ds_id': data_source_id
  471. })
  472. rel_record = rel_result.single()
  473. if rel_record:
  474. logger.info(f"已创建数据资源与数据源的COME_FROM关系: resource_id={resource_id} -> data_source_id={data_source_id}")
  475. else:
  476. logger.warning(f"创建COME_FROM关系失败,但不中断主流程: {resource_id} -> {data_source_id}")
  477. elif data_source_name_en:
  478. # 使用name_en创建关系(兼容旧方式)
  479. # 首先检查数据源节点是否存在
  480. check_ds_cypher = "MATCH (b:DataSource) WHERE b.name_en = $ds_name_en RETURN b"
  481. check_ds_result = session.run(check_ds_cypher, {'ds_name_en': data_source_name_en})
  482. if not check_ds_result.single():
  483. logger.warning(f"数据源节点不存在: name_en={data_source_name_en},跳过关系创建")
  484. else:
  485. # 创建 COME_FROM 关系
  486. rel_data_source_cypher = """
  487. MATCH (a:DataResource), (b:DataSource)
  488. WHERE id(a) = $resource_id AND b.name_en = $ds_name_en
  489. MERGE (a)-[r:COME_FROM]->(b)
  490. RETURN r
  491. """
  492. rel_result = session.run(rel_data_source_cypher, {
  493. 'resource_id': resource_id,
  494. 'ds_name_en': data_source_name_en
  495. })
  496. rel_record = rel_result.single()
  497. if rel_record:
  498. logger.info(f"已创建数据资源与数据源的COME_FROM关系: resource_id={resource_id} -> name_en={data_source_name_en}")
  499. else:
  500. logger.warning(f"创建COME_FROM关系失败,但不中断主流程: {resource_id} -> {data_source_name_en}")
  501. else:
  502. logger.warning(f"data_source参数无效,无法识别格式: {data_source}")
  503. except Exception as e:
  504. # 数据源关系创建失败不应该中断主流程
  505. logger.error(f"处理数据源关系失败(不中断主流程): {str(e)}")
  506. # 不再抛出异常,允许主流程继续
  507. # 创建BusinessDomain节点及其关联关系
  508. try:
  509. domain_id = handle_businessdomain_node(receiver, head_data, data_source, resource_type)
  510. logger.info(f"成功创建BusinessDomain节点,ID={domain_id}")
  511. except Exception as e:
  512. logger.error(f"创建BusinessDomain节点失败(不中断主流程): {str(e)}")
  513. # 不抛出异常,允许主流程继续
  514. return resource_id
  515. except Exception as e:
  516. logger.error(f"处理数据资源节点创建和关系建立失败: {str(e)}")
  517. raise
  518. def handle_id_resource(resource_id):
  519. """处理单个数据资源查询"""
  520. try:
  521. with neo4j_driver.get_session() as session:
  522. # 确保resource_id为整数
  523. try:
  524. resource_id_int = int(resource_id)
  525. except (ValueError, TypeError):
  526. logger.error(f"资源ID不是有效的整数: {resource_id}")
  527. return None
  528. # 使用数值ID查询
  529. cypher = """
  530. MATCH (n:DataResource)
  531. WHERE id(n) = $resource_id
  532. RETURN n
  533. """
  534. result = session.run(cypher, {'resource_id': resource_id_int})
  535. record = result.single()
  536. if not record:
  537. logger.error(f"未找到资源,ID: {resource_id_int}")
  538. return None
  539. # 构建返回数据
  540. logger.info(f"record: {record}")
  541. data_resource = serialize_node_properties(record["n"])
  542. logger.info(f"data_resource: {data_resource}")
  543. logger.info(f"describe field in node: {record['n'].get('describe')}")
  544. # 确保describe字段存在,即使为null也记录下来
  545. if 'describe' in record["n"]:
  546. data_resource["describe"] = record["n"].get('describe')
  547. logger.info(f"设置describe字段: {data_resource['describe']}")
  548. data_resource["id"] = record["n"].id
  549. # 查询关联的标签
  550. tag_cypher = """
  551. MATCH (n:DataResource)-[r:LABEL]->(t:DataLabel)
  552. WHERE id(n) = $resource_id
  553. RETURN t
  554. """
  555. tag_result = session.run(tag_cypher, {'resource_id': resource_id_int})
  556. tag_record = tag_result.single()
  557. # 设置标签信息
  558. if tag_record:
  559. tag = {
  560. "name_zh": tag_record["t"].get("name_zh"),
  561. "id": tag_record["t"].id
  562. }
  563. else:
  564. tag = {
  565. "name_zh": None,
  566. "id": None
  567. }
  568. data_resource["tag"] = tag
  569. # 查询关联的数据源(COME_FROM关系)
  570. data_source_cypher = """
  571. MATCH (n:DataResource)-[r:COME_FROM]->(ds:DataSource)
  572. WHERE id(n) = $resource_id
  573. RETURN ds
  574. """
  575. data_source_result = session.run(data_source_cypher, {'resource_id': resource_id_int})
  576. data_source_record = data_source_result.single()
  577. # 设置数据源信息
  578. if data_source_record:
  579. data_resource["data_source"] = data_source_record["ds"].id
  580. logger.info(f"找到关联的数据源,ID: {data_source_record['ds'].id}")
  581. else:
  582. data_resource["data_source"] = None
  583. logger.info(f"未找到关联的数据源")
  584. # 查询关联的元数据 - 支持meta_data和Metadata两种标签
  585. meta_cypher = """
  586. MATCH (n:DataResource)-[:INCLUDES]->(m)
  587. WHERE id(n) = $resource_id
  588. AND (m:DataMeta OR m:Metadata)
  589. RETURN m
  590. """
  591. meta_result = session.run(meta_cypher, {'resource_id': resource_id_int})
  592. parsed_data = []
  593. for meta_record in meta_result:
  594. meta = serialize_node_properties(meta_record["m"])
  595. meta_data = {
  596. "id": meta_record["m"].id,
  597. "name_zh": meta.get("name_zh"),
  598. "name_en": meta.get("name_en"),
  599. "data_type": meta.get("data_type"),
  600. "data_standard": {
  601. "name_zh": None,
  602. "id": None
  603. }
  604. }
  605. parsed_data.append(meta_data)
  606. data_resource["parsed_data"] = parsed_data
  607. # 确保所有必需字段都有默认值
  608. required_fields = {
  609. "leader": "",
  610. "organization": "",
  611. "name_zh": "",
  612. "name_en": "",
  613. "data_sensitivity": "",
  614. "storage_location": "/",
  615. "create_time": "",
  616. "type": "",
  617. "category": "",
  618. "url": "",
  619. "frequency": "",
  620. "status": True,
  621. "keywords": [],
  622. "describe": ""
  623. }
  624. for field, default_value in required_fields.items():
  625. if field not in data_resource or data_resource[field] is None:
  626. data_resource[field] = default_value
  627. logger.info(f"成功获取资源详情,ID: {resource_id_int}, describe: {data_resource.get('describe')}")
  628. return data_resource
  629. except Exception as e:
  630. logger.error(f"处理单个数据资源查询失败: {str(e)}")
  631. return None
  632. def id_resource_graph(resource_id):
  633. """获取数据资源图谱"""
  634. try:
  635. with neo4j_driver.get_session() as session:
  636. # 查询数据资源节点及其关系
  637. cypher = """
  638. MATCH (n:DataResource)-[r]-(m)
  639. WHERE id(n) = $resource_id
  640. RETURN n, r, m
  641. """
  642. result = session.run(cypher, {'resource_id': int(resource_id)})
  643. # 收集节点和关系
  644. nodes = {}
  645. relationships = []
  646. for record in result:
  647. # 处理源节点
  648. source_node = serialize_node_properties(record["n"])
  649. source_node["id"] = record["n"].id
  650. nodes[source_node["id"]] = source_node
  651. # 处理目标节点
  652. target_node = serialize_node_properties(record["m"])
  653. target_node["id"] = record["m"].id
  654. nodes[target_node["id"]] = target_node
  655. # 处理关系
  656. rel = record["r"]
  657. relationship = {
  658. "id": rel.id,
  659. "source": record["n"].id,
  660. "target": record["m"].id,
  661. "type": rel.type
  662. }
  663. relationships.append(relationship)
  664. return {
  665. "nodes": list(nodes.values()),
  666. "relationships": relationships
  667. }
  668. except Exception as e:
  669. logger.error(f"获取数据资源图谱失败: {str(e)}")
  670. return {"nodes": [], "relationships": []}
  671. def resource_list(page, page_size, name_en_filter=None, name_zh_filter=None,
  672. type_filter='all', category_filter=None, tag_filter=None):
  673. """获取数据资源列表"""
  674. try:
  675. with neo4j_driver.get_session() as session:
  676. # 构建基础过滤条件(针对DataResource节点)
  677. resource_conditions = []
  678. if name_en_filter:
  679. resource_conditions.append(f"n.name_en CONTAINS '{name_en_filter}'")
  680. if name_zh_filter:
  681. resource_conditions.append(f"n.name_zh CONTAINS '{name_zh_filter}'")
  682. if type_filter and type_filter != 'all':
  683. resource_conditions.append(f"n.type = '{type_filter}'")
  684. if category_filter:
  685. resource_conditions.append(f"n.category = '{category_filter}'")
  686. # 构建基础WHERE子句
  687. resource_where = " AND ".join(resource_conditions) if resource_conditions else ""
  688. # 根据是否有tag_filter选择不同的查询策略
  689. if tag_filter:
  690. # 有标签过滤:先过滤DataResource,再连接标签
  691. if resource_where:
  692. # 计算总数
  693. count_cypher = f"""
  694. MATCH (n:DataResource)
  695. WHERE {resource_where}
  696. WITH n
  697. MATCH (n)-[:LABEL]->(t:DataLabel)
  698. WHERE t.name_zh = '{tag_filter}'
  699. RETURN count(DISTINCT n) as count
  700. """
  701. # 分页查询
  702. skip = (page - 1) * page_size
  703. cypher = f"""
  704. MATCH (n:DataResource)
  705. WHERE {resource_where}
  706. WITH n
  707. MATCH (n)-[:LABEL]->(t:DataLabel)
  708. WHERE t.name_zh = '{tag_filter}'
  709. RETURN DISTINCT n
  710. ORDER BY n.create_time DESC
  711. SKIP {skip} LIMIT {page_size}
  712. """
  713. else:
  714. # 只有标签过滤条件
  715. count_cypher = f"""
  716. MATCH (n:DataResource)-[:LABEL]->(t:DataLabel)
  717. WHERE t.name_zh = '{tag_filter}'
  718. RETURN count(DISTINCT n) as count
  719. """
  720. # 分页查询
  721. skip = (page - 1) * page_size
  722. cypher = f"""
  723. MATCH (n:DataResource)-[:LABEL]->(t:DataLabel)
  724. WHERE t.name_zh = '{tag_filter}'
  725. RETURN DISTINCT n
  726. ORDER BY n.create_time DESC
  727. SKIP {skip} LIMIT {page_size}
  728. """
  729. else:
  730. # 无标签过滤:标准查询
  731. if resource_where:
  732. # 计算总数
  733. count_cypher = f"""
  734. MATCH (n:DataResource)
  735. WHERE {resource_where}
  736. RETURN count(n) as count
  737. """
  738. # 分页查询
  739. skip = (page - 1) * page_size
  740. cypher = f"""
  741. MATCH (n:DataResource)
  742. WHERE {resource_where}
  743. RETURN n
  744. ORDER BY n.create_time DESC
  745. SKIP {skip} LIMIT {page_size}
  746. """
  747. else:
  748. # 无任何过滤条件
  749. count_cypher = "MATCH (n:DataResource) RETURN count(n) as count"
  750. # 分页查询
  751. skip = (page - 1) * page_size
  752. cypher = f"""
  753. MATCH (n:DataResource)
  754. RETURN n
  755. ORDER BY n.create_time DESC
  756. SKIP {skip} LIMIT {page_size}
  757. """
  758. # 执行计数查询
  759. count_result = session.run(count_cypher) # type: ignore[arg-type]
  760. count_record = count_result.single()
  761. total_count = count_record["count"] if count_record else 0
  762. # 执行分页查询
  763. result = session.run(cypher) # type: ignore[arg-type]
  764. # 格式化结果
  765. resources = []
  766. for record in result:
  767. node = serialize_node_properties(record["n"])
  768. node["id"] = record["n"].id
  769. # 查询关联的标签
  770. tag_cypher = """
  771. MATCH (n:DataResource)-[r:LABEL]->(t:DataLabel)
  772. WHERE id(n) = $resource_id
  773. RETURN t
  774. """
  775. tag_result = session.run(tag_cypher, {'resource_id': node["id"]})
  776. tag_record = tag_result.single()
  777. if tag_record:
  778. tag = serialize_node_properties(tag_record["t"])
  779. tag["id"] = tag_record["t"].id
  780. node["tag_info"] = tag
  781. resources.append(node)
  782. return resources, total_count
  783. except Exception as e:
  784. logger.error(f"获取数据资源列表失败: {str(e)}")
  785. return [], 0
  786. def id_data_search_list(resource_id, page, page_size, name_en_filter=None,
  787. name_zh_filter=None, category_filter=None, tag_filter=None):
  788. """获取特定数据资源关联的元数据列表"""
  789. try:
  790. with neo4j_driver.get_session() as session:
  791. # 确保resource_id为整数
  792. try:
  793. resource_id_int = int(resource_id)
  794. except (ValueError, TypeError):
  795. logger.error(f"资源ID不是有效的整数: {resource_id}")
  796. return [], 0
  797. # 基本匹配语句 - 支持meta_data和Metadata标签
  798. match_clause = """
  799. MATCH (n:DataResource)-[:INCLUDES]->(m)
  800. WHERE id(n) = $resource_id
  801. AND (m:DataMeta OR m:Metadata)
  802. """
  803. where_conditions = []
  804. if name_en_filter:
  805. where_conditions.append(f"m.name_en CONTAINS '{name_en_filter}'")
  806. if name_zh_filter:
  807. where_conditions.append(f"m.name_zh CONTAINS '{name_zh_filter}'")
  808. if category_filter:
  809. where_conditions.append(f"m.category = '{category_filter}'")
  810. # 标签过滤需要额外的匹配
  811. tag_match = ""
  812. if tag_filter:
  813. tag_match = "MATCH (m)-[:HAS_TAG]->(t:Tag) WHERE t.name_zh = $tag_filter"
  814. where_clause = " AND " + " AND ".join(where_conditions) if where_conditions else ""
  815. # 计算总数
  816. count_cypher = f"""
  817. {match_clause}{where_clause}
  818. {tag_match}
  819. RETURN count(m) as count
  820. """
  821. count_params = {"resource_id": resource_id_int}
  822. if tag_filter:
  823. count_params["tag_filter"] = tag_filter
  824. count_result = session.run(count_cypher, count_params)
  825. count_record = count_result.single()
  826. total_count = count_record["count"] if count_record else 0
  827. # 分页查询
  828. skip = (page - 1) * page_size
  829. cypher = f"""
  830. {match_clause}{where_clause}
  831. {tag_match}
  832. RETURN m
  833. ORDER BY m.name_zh
  834. SKIP {skip} LIMIT {page_size}
  835. """
  836. result = session.run(cypher, count_params) # type: ignore[arg-type]
  837. # 格式化结果
  838. metadata_list = []
  839. for record in result:
  840. meta = serialize_node_properties(record["m"])
  841. meta["id"] = record["m"].id
  842. metadata_list.append(meta)
  843. logger.info(f"成功获取资源关联元数据,ID: {resource_id_int}, 元数据数量: {total_count}")
  844. return metadata_list, total_count
  845. except Exception as e:
  846. logger.error(f"获取数据资源关联的元数据列表失败: {str(e)}")
  847. return [], 0
  848. def resource_kinship_graph(resource_id, include_meta=True):
  849. """获取数据资源亲缘关系图谱"""
  850. try:
  851. with neo4j_driver.get_session() as session:
  852. # 确保resource_id为整数
  853. try:
  854. resource_id_int = int(resource_id)
  855. except (ValueError, TypeError):
  856. logger.error(f"资源ID不是有效的整数: {resource_id}")
  857. return {"nodes": [], "relationships": []}
  858. # 基本查询
  859. cypher_parts = [
  860. f"MATCH (n:DataResource) WHERE id(n) = $resource_id",
  861. "OPTIONAL MATCH (n)-[:LABEL]->(l:DataLabel)",
  862. ]
  863. # 是否包含元数据 - 支持meta_data和Metadata两种标签
  864. if include_meta:
  865. cypher_parts.append("OPTIONAL MATCH (n)-[:INCLUDES]->(m) WHERE (m:DataMeta OR m:Metadata)")
  866. cypher_parts.append("RETURN n, l, collect(m) as metadata")
  867. cypher = "\n".join(cypher_parts)
  868. result = session.run(cypher, {'resource_id': resource_id_int}) # type: ignore[arg-type]
  869. record = result.single()
  870. if not record:
  871. logger.error(f"未找到资源图谱数据,ID: {resource_id_int}")
  872. return {"nodes": [], "relationships": []}
  873. # 收集节点和关系
  874. nodes = {}
  875. relationships = []
  876. # 处理数据资源节点
  877. resource_node = serialize_node_properties(record["n"])
  878. resource_node["id"] = record["n"].id
  879. resource_node["node_type"] = list(record["n"].labels)
  880. nodes[resource_node["id"]] = resource_node
  881. # 处理标签节点
  882. if record["l"]:
  883. label_node = serialize_node_properties(record["l"])
  884. label_node["id"] = record["l"].id
  885. label_node["node_type"] = list(record["l"].labels)
  886. nodes[label_node["id"]] = label_node
  887. # 添加资源-标签关系
  888. relationships.append({
  889. "id": f"rel-{resource_node['id']}-label-{label_node['id']}",
  890. "from": resource_node["id"],
  891. "to": label_node["id"],
  892. "text": "label"
  893. })
  894. # 处理元数据节点
  895. if include_meta and record["metadata"]:
  896. for meta in record["metadata"]:
  897. if meta: # 检查元数据节点是否存在
  898. meta_node = serialize_node_properties(meta)
  899. meta_node["id"] = meta.id
  900. meta_node["node_type"] = list(meta.labels)
  901. nodes[meta_node["id"]] = meta_node
  902. # 添加资源-元数据关系
  903. relationships.append({
  904. "id": f"rel-{resource_node['id']}-INCLUDES-{meta_node['id']}",
  905. "from": resource_node["id"],
  906. "to": meta_node["id"],
  907. "text": "INCLUDES"
  908. })
  909. logger.info(f"成功获取资源图谱,ID: {resource_id_int}, 节点数: {len(nodes)}")
  910. return {
  911. "nodes": list(nodes.values()),
  912. "relationships": relationships
  913. }
  914. except Exception as e:
  915. logger.error(f"获取数据资源亲缘关系图谱失败: {str(e)}")
  916. return {"nodes": [], "relationships": []}
  917. def resource_impact_all_graph(resource_id, include_meta=True):
  918. """获取数据资源影响关系图谱"""
  919. try:
  920. with neo4j_driver.get_session() as session:
  921. # 确保resource_id为整数
  922. try:
  923. resource_id_int = int(resource_id)
  924. except (ValueError, TypeError):
  925. logger.error(f"资源ID不是有效的整数: {resource_id}")
  926. return {"nodes": [], "lines": []}
  927. # 根据meta参数决定查询深度,限制为一层
  928. if include_meta:
  929. cypher = """
  930. MATCH path = (n:DataResource)-[*1..1]-(m)
  931. WHERE id(n) = $resource_id
  932. RETURN path
  933. """
  934. else:
  935. cypher = """
  936. MATCH path = (n:DataResource)-[*1..1]-(m)
  937. WHERE id(n) = $resource_id
  938. AND NOT (m:DataMeta) AND NOT (m:Metadata)
  939. RETURN path
  940. """
  941. result = session.run(cypher, {'resource_id': resource_id_int})
  942. # 收集节点和关系
  943. nodes = {}
  944. lines = {}
  945. for record in result:
  946. path = record["path"]
  947. # 处理路径中的所有节点
  948. for node in path.nodes:
  949. if node.id not in nodes:
  950. node_dict = serialize_node_properties(node)
  951. node_dict["id"] = str(node.id)
  952. node_dict["node_type"] = list(node.labels)[0] if node.labels else ""
  953. nodes[node.id] = node_dict
  954. # 处理路径中的所有关系。Neo4j的路径对象(path)中,关系集合的属性名是relationships
  955. for rel in path.relationships:
  956. if rel.id not in lines:
  957. rel_dict = {
  958. "id": str(rel.id),
  959. "from": str(rel.start_node.id),
  960. "to": str(rel.end_node.id),
  961. "text": rel.type
  962. }
  963. lines[rel.id] = rel_dict
  964. logger.info(f"成功获取完整图谱,ID: {resource_id_int}, 节点数: {len(nodes)}")
  965. return {
  966. "nodes": list(nodes.values()),
  967. "lines": list(lines.values())
  968. }
  969. except Exception as e:
  970. logger.error(f"获取数据资源影响关系图谱失败: {str(e)}")
  971. return {"nodes": [], "lines": []}
  972. def clean_type(type_str):
  973. """清洗SQL类型字符串"""
  974. # 提取基本类型,不包括长度或精度信息
  975. basic_type = re.sub(r'\(.*?\)', '', type_str).strip().upper()
  976. # 移除 VARYING 这样的后缀
  977. basic_type = re.sub(r'\s+VARYING$', '', basic_type)
  978. # 标准化常见类型
  979. type_mapping = {
  980. 'INT': 'INTEGER',
  981. 'INT4': 'INTEGER',
  982. 'INT8': 'BIGINT',
  983. 'SMALLINT': 'SMALLINT',
  984. 'BIGINT': 'BIGINT',
  985. 'FLOAT4': 'FLOAT',
  986. 'FLOAT8': 'DOUBLE',
  987. 'REAL': 'FLOAT',
  988. 'DOUBLE PRECISION': 'DOUBLE',
  989. 'NUMERIC': 'DECIMAL',
  990. 'BOOL': 'BOOLEAN',
  991. 'CHARACTER': 'CHAR',
  992. 'CHAR VARYING': 'VARCHAR',
  993. 'CHARACTER VARYING': 'VARCHAR',
  994. 'TEXT': 'TEXT',
  995. 'DATE': 'DATE',
  996. 'TIME': 'TIME',
  997. 'TIMESTAMP': 'TIMESTAMP',
  998. 'TIMESTAMPTZ': 'TIMESTAMP WITH TIME ZONE',
  999. 'BYTEA': 'BINARY',
  1000. 'JSON': 'JSON',
  1001. 'JSONB': 'JSONB',
  1002. 'UUID': 'UUID',
  1003. 'SERIAL': 'SERIAL',
  1004. 'SERIAL4': 'SERIAL',
  1005. 'SERIAL8': 'BIGSERIAL',
  1006. 'BIGSERIAL': 'BIGSERIAL'
  1007. }
  1008. # 尝试从映射表中获取标准化的类型
  1009. return type_mapping.get(basic_type, basic_type)
  1010. def clean_field_name(field_name):
  1011. """清洗字段名"""
  1012. return field_name.strip('`').strip('"').strip("'")
  1013. def select_create_ddl(sql_content):
  1014. """从SQL内容中提取创建表的DDL语句"""
  1015. try:
  1016. # 解析复杂的SQL文件,识别所有的CREATE TABLE语句及其关联的注释
  1017. # 找到所有以CREATE TABLE开头的语句块,每个语句块包含主语句和相关的注释
  1018. # 首先,分割 SQL 内容按分号
  1019. statements = []
  1020. current_statement = ""
  1021. in_string = False
  1022. string_quote = None
  1023. for char in sql_content:
  1024. if char in ["'", '"']:
  1025. if not in_string:
  1026. in_string = True
  1027. string_quote = char
  1028. elif char == string_quote:
  1029. in_string = False
  1030. string_quote = None
  1031. current_statement += char
  1032. elif char == ';' and not in_string:
  1033. current_statement += char
  1034. if current_statement.strip():
  1035. statements.append(current_statement.strip())
  1036. current_statement = ""
  1037. else:
  1038. current_statement += char
  1039. if current_statement.strip():
  1040. statements.append(current_statement.strip())
  1041. # 找出所有的CREATE TABLE语句和关联的注释
  1042. create_table_statements = []
  1043. create_index = -1
  1044. in_table_block = False
  1045. current_table = None
  1046. current_block = ""
  1047. for i, stmt in enumerate(statements):
  1048. if re.search(r'^\s*CREATE\s+TABLE', stmt, re.IGNORECASE):
  1049. # 如果已经在处理表,先保存当前块
  1050. if in_table_block and current_block:
  1051. create_table_statements.append(current_block)
  1052. # 开始新的表块
  1053. in_table_block = True
  1054. current_block = stmt
  1055. # 提取表名
  1056. table_match = re.search(r'CREATE\s+TABLE\s+(?:(?:"[^"]+"|\'[^\']+\'|[^"\'\s\.]+)\.)?(?:"([^"]+)"|\'([^\']+)\'|([^"\'\s\(]+))', stmt, re.IGNORECASE)
  1057. if table_match:
  1058. current_table = table_match.group(1) or table_match.group(2) or table_match.group(3)
  1059. current_table = current_table.strip('"\'') if current_table else ""
  1060. elif in_table_block and (re.search(r'COMMENT\s+ON\s+TABLE', stmt, re.IGNORECASE) or
  1061. re.search(r'COMMENT\s+ON\s+COLUMN', stmt, re.IGNORECASE)):
  1062. # 检查注释是否属于当前表
  1063. if current_table:
  1064. # 表注释处理
  1065. if re.search(r'COMMENT\s+ON\s+TABLE', stmt, re.IGNORECASE):
  1066. table_comment_match = re.search(r'COMMENT\s+ON\s+TABLE\s+[\'"]?(\w+)[\'"]?', stmt, re.IGNORECASE)
  1067. if table_comment_match:
  1068. comment_table = table_comment_match.group(1).strip('"\'')
  1069. if comment_table == current_table:
  1070. current_block += " " + stmt
  1071. else:
  1072. # 这是另一个表的注释,当前表的DDL到此结束
  1073. create_table_statements.append(current_block)
  1074. in_table_block = False
  1075. current_block = ""
  1076. current_table = None
  1077. # 列注释处理
  1078. elif re.search(r'COMMENT\s+ON\s+COLUMN', stmt, re.IGNORECASE):
  1079. column_comment_match = re.search(
  1080. r'COMMENT\s+ON\s+COLUMN\s+[\'"]?(\w+)[\'"]?\.[\'"]?(\w+)[\'"]?\s+IS\s+\'([^\']+)\'',
  1081. stmt,
  1082. re.IGNORECASE
  1083. )
  1084. if column_comment_match:
  1085. comment_table = column_comment_match.group(1)
  1086. if comment_table == current_table:
  1087. current_block += " " + stmt
  1088. else:
  1089. # 这是另一个表的注释,当前表的DDL到此结束
  1090. create_table_statements.append(current_block)
  1091. in_table_block = False
  1092. current_block = ""
  1093. current_table = None
  1094. elif in_table_block and re.search(r'^\s*CREATE\s+', stmt, re.IGNORECASE):
  1095. # 如果遇到新的CREATE语句(不是注释),保存当前块并结束
  1096. create_table_statements.append(current_block)
  1097. in_table_block = False
  1098. current_block = ""
  1099. current_table = None
  1100. # 添加最后一个块
  1101. if in_table_block and current_block:
  1102. create_table_statements.append(current_block)
  1103. # 日志记录
  1104. logger.debug(f"提取到 {len(create_table_statements)} 个DDL语句")
  1105. for i, stmt in enumerate(create_table_statements):
  1106. logger.debug(f"DDL语句 {i+1}: {stmt}")
  1107. return create_table_statements
  1108. except Exception as e:
  1109. logger.error(f"提取DDL语句失败: {str(e)}")
  1110. # logger.error(traceback.format_exc())
  1111. return []
  1112. def table_sql(sql):
  1113. """解析表定义SQL,支持带schema和不带schema两种格式"""
  1114. try:
  1115. # 支持以下格式:
  1116. # 1. CREATE TABLE tablename
  1117. # 2. CREATE TABLE "tablename"
  1118. # 3. CREATE TABLE 'tablename'
  1119. # 4. CREATE TABLE schema.tablename
  1120. # 5. CREATE TABLE "schema"."tablename"
  1121. # 6. CREATE TABLE 'schema'.'tablename'
  1122. # 匹配表名,支持带引号和不带引号的情况
  1123. table_pattern = r'CREATE\s+TABLE\s+(?:(?:"([^"]+)"|\'([^\']+)\'|([^"\'\s\.]+))\.)?(?:"([^"]+)"|\'([^\']+)\'|([^"\'\s\(]+))'
  1124. table_match = re.search(table_pattern, sql, re.IGNORECASE)
  1125. if not table_match:
  1126. logger.error(f"无法匹配CREATE TABLE语句: {sql[:100]}...")
  1127. return None
  1128. # 获取表名
  1129. schema = table_match.group(1) or table_match.group(2) or table_match.group(3)
  1130. table_name = table_match.group(4) or table_match.group(5) or table_match.group(6)
  1131. if not table_name:
  1132. logger.error("无法解析表名")
  1133. return None
  1134. logger.debug(f"解析到表名: {table_name}")
  1135. # 提取CREATE TABLE语句的主体部分(括号内的内容)
  1136. body_pattern = r'CREATE\s+TABLE\s+[^(]*\((.*?)\)(?=\s*;|\s*$)'
  1137. body_match = re.search(body_pattern, sql, re.DOTALL | re.IGNORECASE)
  1138. if not body_match:
  1139. logger.error("无法提取表主体内容")
  1140. return None
  1141. body_text = body_match.group(1).strip()
  1142. logger.debug(f"表定义主体部分: {body_text}")
  1143. # 解析字段定义
  1144. fields = []
  1145. # 分割字段定义,处理括号嵌套和引号
  1146. field_defs = []
  1147. pos = 0
  1148. in_parentheses = 0
  1149. in_quotes = False
  1150. quote_char = None
  1151. for i, char in enumerate(body_text):
  1152. if char in ["'", '"', '`'] and (not in_quotes or char == quote_char):
  1153. in_quotes = not in_quotes
  1154. if in_quotes:
  1155. quote_char = char
  1156. else:
  1157. quote_char = None
  1158. elif char == '(' and not in_quotes:
  1159. in_parentheses += 1
  1160. elif char == ')' and not in_quotes:
  1161. in_parentheses -= 1
  1162. elif char == ',' and in_parentheses == 0 and not in_quotes:
  1163. field_defs.append(body_text[pos:i].strip())
  1164. pos = i + 1
  1165. # 添加最后一个字段定义
  1166. if pos < len(body_text):
  1167. field_defs.append(body_text[pos:].strip())
  1168. logger.debug(f"解析出 {len(field_defs)} 个字段定义")
  1169. # 处理每个字段定义
  1170. for field_def in field_defs:
  1171. # 跳过约束定义
  1172. if re.match(r'^\s*(?:PRIMARY|UNIQUE|FOREIGN|CHECK|CONSTRAINT)\s+', field_def, re.IGNORECASE):
  1173. continue
  1174. # 提取字段名和类型
  1175. field_pattern = r'^\s*(?:"([^"]+)"|\'([^\']+)\'|`([^`]+)`|([a-zA-Z0-9_]+))\s+(.+?)(?:\s+DEFAULT\s+|\s+NOT\s+NULL|\s+REFERENCES|\s*$)'
  1176. field_match = re.search(field_pattern, field_def, re.IGNORECASE)
  1177. if field_match:
  1178. # 提取字段名
  1179. field_name = field_match.group(1) or field_match.group(2) or field_match.group(3) or field_match.group(4)
  1180. # 提取类型
  1181. field_type = field_match.group(5).strip()
  1182. # 处理类型中可能的括号
  1183. type_base = re.split(r'\s+', field_type)[0]
  1184. clean_type_value = clean_type(type_base)
  1185. fields.append((field_name, clean_type_value))
  1186. logger.debug(f"解析到字段: {field_name}, 类型: {clean_type_value}")
  1187. else:
  1188. logger.warning(f"无法解析字段定义: {field_def}")
  1189. # 提取表注释
  1190. table_comment = ""
  1191. table_comment_pattern = r"COMMENT\s+ON\s+TABLE\s+(?:['\"]?(\w+)['\"]?)\s+IS\s+'([^']+)'"
  1192. table_comment_match = re.search(table_comment_pattern, sql, re.IGNORECASE)
  1193. if table_comment_match:
  1194. comment_table = table_comment_match.group(1)
  1195. if comment_table.strip("'\"") == table_name.strip("'\""):
  1196. table_comment = table_comment_match.group(2)
  1197. logger.debug(f"找到表注释: {table_comment}")
  1198. # 提取列注释
  1199. comments = {}
  1200. column_comment_pattern = r"COMMENT\s+ON\s+COLUMN\s+['\"]?(\w+)['\"]?\.['\"]?(\w+)['\"]?\s+IS\s+'([^']+)'"
  1201. for match in re.finditer(column_comment_pattern, sql, re.IGNORECASE):
  1202. comment_table = match.group(1)
  1203. column_name = match.group(2)
  1204. comment = match.group(3)
  1205. # 检查表名是否匹配
  1206. if comment_table.strip("'\"") == table_name.strip("'\""):
  1207. comments[column_name] = comment
  1208. logger.debug(f"找到列注释: {column_name} - {comment}")
  1209. else:
  1210. logger.debug(f"忽略列注释,表名不匹配: {comment_table} vs {table_name}")
  1211. # 检查字段和注释匹配情况
  1212. logger.debug("========字段和注释匹配情况========")
  1213. field_names = [f[0] for f in fields]
  1214. logger.debug(f"字段列表 ({len(field_names)}): {field_names}")
  1215. logger.debug(f"注释字段 ({len(comments)}): {list(comments.keys())}")
  1216. # 构建返回结果
  1217. meta_list = []
  1218. for field_name, field_type in fields:
  1219. chinese_name = comments.get(field_name, "")
  1220. meta_list.append({
  1221. "name_en": field_name,
  1222. "data_type": field_type,
  1223. "name_zh": chinese_name if chinese_name else field_name
  1224. })
  1225. # 检查表是否存在
  1226. try:
  1227. status = status_query([table_name])
  1228. except Exception as e:
  1229. logger.error(f"检查表存在状态失败: {str(e)}")
  1230. status = [False]
  1231. # 构建返回结果
  1232. result = {
  1233. table_name: {
  1234. "exist": status[0] if status else False,
  1235. "meta": meta_list
  1236. }
  1237. }
  1238. logger.debug(f"解析结果: {json.dumps(result, ensure_ascii=False)}")
  1239. return result
  1240. except Exception as e:
  1241. logger.error(f"解析表定义SQL失败: {str(e)}")
  1242. logger.error(f"异常详情: {e}")
  1243. import traceback
  1244. logger.error(traceback.format_exc())
  1245. return None
  1246. # 判断英文表名是否在图谱中存在
  1247. def status_query(key_list):
  1248. query = """
  1249. unwind $Key_list as name
  1250. OPTIONAL MATCH (n:DataModel {name_en: name})
  1251. OPTIONAL MATCH (n:DataResource {name_en: name})
  1252. OPTIONAL MATCH (n:DataMetric {name_en: name})
  1253. WITH name, CASE
  1254. WHEN n IS NOT NULL THEN True
  1255. ELSE False
  1256. END AS exist
  1257. return collect(exist)AS exist
  1258. """
  1259. with neo4j_driver.get_session() as session:
  1260. result = session.run(query, {'Key_list': key_list})
  1261. data = result.value() # 获取单个值
  1262. return data
  1263. def select_sql(sql_query):
  1264. """解析SELECT查询语句"""
  1265. try:
  1266. # 提取SELECT子句
  1267. select_pattern = r'SELECT\s+(.*?)\s+FROM'
  1268. select_match = re.search(select_pattern, sql_query, re.IGNORECASE | re.DOTALL)
  1269. if not select_match:
  1270. return None
  1271. select_clause = select_match.group(1)
  1272. # 分割字段
  1273. fields = []
  1274. # 处理字段列表,避免在函数调用中的逗号导致错误分割
  1275. in_parenthesis = 0
  1276. current_field = ""
  1277. for char in select_clause:
  1278. if char == '(':
  1279. in_parenthesis += 1
  1280. current_field += char
  1281. elif char == ')':
  1282. in_parenthesis -= 1
  1283. current_field += char
  1284. elif char == ',' and in_parenthesis == 0:
  1285. fields.append(current_field.strip())
  1286. current_field = ""
  1287. else:
  1288. current_field += char
  1289. if current_field.strip():
  1290. fields.append(current_field.strip())
  1291. # 解析每个字段
  1292. parsed_fields = []
  1293. for field in fields:
  1294. # 检查是否有字段别名
  1295. alias_pattern = r'(.*?)\s+[aA][sS]\s+(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))$'
  1296. alias_match = re.search(alias_pattern, field)
  1297. if alias_match:
  1298. field_expr = alias_match.group(1).strip()
  1299. field_alias = next((g for g in alias_match.groups()[1:] if g is not None), "")
  1300. parsed_fields.append({
  1301. "expression": field_expr,
  1302. "alias": field_alias
  1303. })
  1304. else:
  1305. # 没有别名的情况
  1306. parsed_fields.append({
  1307. "expression": field.strip(),
  1308. "alias": None
  1309. })
  1310. # 提取FROM子句和表名
  1311. from_pattern = r'FROM\s+(.*?)(?:\s+WHERE|\s+GROUP|\s+HAVING|\s+ORDER|\s+LIMIT|$)'
  1312. from_match = re.search(from_pattern, sql_query, re.IGNORECASE | re.DOTALL)
  1313. tables = []
  1314. if from_match:
  1315. from_clause = from_match.group(1).strip()
  1316. # 分析FROM子句中的表
  1317. table_pattern = r'(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))(?:\s+(?:AS\s+)?(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))?'
  1318. for match in re.finditer(table_pattern, from_clause):
  1319. table_name = match.group(1) or match.group(2) or match.group(3) or match.group(4)
  1320. if table_name:
  1321. tables.append(table_name)
  1322. return tables
  1323. except Exception as e:
  1324. logger.error(f"解析SELECT查询语句失败: {str(e)}")
  1325. # logger.error(traceback.format_exc())
  1326. return None
  1327. def model_resource_list(page, page_size, name_filter=None):
  1328. """获取模型资源列表"""
  1329. try:
  1330. with neo4j_driver.get_session() as session:
  1331. # 构建查询条件
  1332. match_clause = "MATCH (n:model_resource)"
  1333. where_clause = ""
  1334. if name_filter:
  1335. where_clause = f" WHERE n.name_zh CONTAINS '{name_filter}'"
  1336. # 计算总数
  1337. count_cypher = f"{match_clause}{where_clause} RETURN count(n) as count"
  1338. count_result = session.run(count_cypher) # type: ignore[arg-type]
  1339. count_record = count_result.single()
  1340. total_count = count_record["count"] if count_record else 0
  1341. # 分页查询
  1342. skip = (page - 1) * page_size
  1343. cypher = f"""
  1344. {match_clause}{where_clause}
  1345. RETURN n
  1346. ORDER BY n.create_time DESC
  1347. SKIP {skip} LIMIT {page_size}
  1348. """
  1349. result = session.run(cypher) # type: ignore[arg-type]
  1350. # 格式化结果
  1351. resources = []
  1352. for record in result:
  1353. node = serialize_node_properties(record["n"])
  1354. node["id"] = record["n"].id
  1355. resources.append(node)
  1356. return resources, total_count
  1357. except Exception as e:
  1358. logger.error(f"获取模型资源列表失败: {str(e)}")
  1359. return [], 0
  1360. def data_resource_edit(data):
  1361. """编辑数据资源"""
  1362. try:
  1363. resource_id = data.get("id")
  1364. if not resource_id:
  1365. raise ValueError("缺少资源ID")
  1366. with neo4j_driver.get_session() as session:
  1367. # 更新节点属性
  1368. update_fields = {}
  1369. for key, value in data.items():
  1370. if key != "id" and key != "parsed_data" and value is not None:
  1371. update_fields[key] = value
  1372. # 记录describe字段是否存在于待更新数据中
  1373. if "describe" in data:
  1374. logger.info(f"编辑资源,describe字段将被更新为: {data.get('describe')}")
  1375. else:
  1376. logger.info("编辑资源,describe字段不在更新数据中")
  1377. # 添加更新时间
  1378. update_fields["create_time"] = get_formatted_time()
  1379. # 构建更新语句,确保至少有 updateTime 字段要更新
  1380. if update_fields:
  1381. set_clause = ", ".join([f"n.{k} = ${k}" for k in update_fields.keys()])
  1382. cypher = f"""
  1383. MATCH (n:DataResource)
  1384. WHERE id(n) = $resource_id
  1385. SET {set_clause}
  1386. RETURN n
  1387. """
  1388. params = {'resource_id': int(resource_id)}
  1389. params.update(update_fields)
  1390. result = session.run(cypher, params) # type: ignore[arg-type]
  1391. else:
  1392. # 如果没有字段需要更新,只查询节点
  1393. cypher = """
  1394. MATCH (n:DataResource)
  1395. WHERE id(n) = $resource_id
  1396. RETURN n
  1397. """
  1398. result = session.run(cypher, {'resource_id': int(resource_id)})
  1399. updated_node = result.single()
  1400. if not updated_node:
  1401. raise ValueError("资源不存在")
  1402. # 记录更新后的节点数据
  1403. logger.info(f"更新后的节点数据,describe字段: {updated_node['n'].get('describe')}")
  1404. # 处理标签关系
  1405. tag_id = data.get("tag")
  1406. if tag_id:
  1407. # 删除旧的标签关系
  1408. delete_rel_cypher = """
  1409. MATCH (n:DataResource)-[r:LABEL]->()
  1410. WHERE id(n) = $resource_id
  1411. DELETE r
  1412. """
  1413. session.run(delete_rel_cypher, {'resource_id': int(resource_id)})
  1414. # 创建新的标签关系
  1415. create_rel_cypher = """
  1416. MATCH (n:DataResource), (t:DataLabel)
  1417. WHERE id(n) = $resource_id AND id(t) = $tag_id
  1418. CREATE (n)-[r:LABEL]->(t)
  1419. RETURN r
  1420. """
  1421. session.run(create_rel_cypher, {'resource_id': int(resource_id), 'tag_id': int(tag_id)})
  1422. # 处理元数据关系
  1423. parsed_data = data.get("parsed_data", [])
  1424. # 首先删除旧的元数据关系和清洗资源关系(无论parsed_data是否为空都要执行)
  1425. delete_meta_cypher = """
  1426. MATCH (n:DataResource)-[r:INCLUDES]->()
  1427. WHERE id(n) = $resource_id
  1428. DELETE r
  1429. """
  1430. session.run(delete_meta_cypher, {'resource_id': int(resource_id)})
  1431. delete_clean_cypher = """
  1432. MATCH (n:DataResource)-[r:clean_resource]->()
  1433. WHERE id(n) = $resource_id
  1434. DELETE r
  1435. """
  1436. session.run(delete_clean_cypher, {'resource_id': int(resource_id)})
  1437. # 根据parsed_data是否为空来决定是否执行预处理和关系新建操作
  1438. if parsed_data:
  1439. # 预处理 parsed_data,确保每个 metadata 都有有效的 ID
  1440. for meta in parsed_data:
  1441. meta_id = meta.get("id")
  1442. meta_name = meta.get("name_zh")
  1443. if not meta_id and meta_name:
  1444. # 如果没有 ID 但有 name_zh,先根据 name_zh 查找是否存在对应的 DataMeta 节点
  1445. find_meta_cypher = """
  1446. MATCH (m:DataMeta {name_zh: $meta_name})
  1447. RETURN m
  1448. """
  1449. find_result = session.run(find_meta_cypher, {'meta_name': meta_name})
  1450. existing_meta = find_result.single()
  1451. if existing_meta:
  1452. # 如果找到了,使用现有的 ID
  1453. meta_id = existing_meta["m"].id
  1454. meta["id"] = meta_id
  1455. logger.info(f"找到现有的DataMeta节点: {meta_name}, ID: {meta_id}")
  1456. else:
  1457. # 如果没有找到,创建新的 DataMeta 节点
  1458. create_meta_cypher = """
  1459. CREATE (m:DataMeta {
  1460. name_zh: $name_zh,
  1461. name_en: $name_en,
  1462. data_type: $data_type,
  1463. create_time: $create_time
  1464. })
  1465. RETURN m
  1466. """
  1467. create_time = get_formatted_time()
  1468. new_meta_result = session.run(create_meta_cypher, {
  1469. 'name_zh': meta_name,
  1470. 'name_en': meta.get("name_en", meta_name),
  1471. 'data_type': meta.get("data_type", "varchar(255)"),
  1472. 'create_time': create_time
  1473. })
  1474. new_meta = new_meta_result.single()
  1475. if new_meta:
  1476. meta_id = new_meta["m"].id
  1477. meta["id"] = meta_id
  1478. logger.info(f"创建新的DataMeta节点: {meta_name}, ID: {meta_id}")
  1479. else:
  1480. logger.error(f"创建DataMeta节点失败: {meta_name}")
  1481. continue
  1482. elif not meta_id:
  1483. logger.warning(f"跳过没有ID和name的metadata: {meta}")
  1484. continue
  1485. # 创建新的元数据关系和相关关系
  1486. for meta in parsed_data:
  1487. meta_id = meta.get("id")
  1488. if meta_id:
  1489. # 创建元数据关系
  1490. create_meta_cypher = """
  1491. MATCH (n:DataResource), (m:DataMeta)
  1492. WHERE id(n) = $resource_id AND id(m) = $meta_id
  1493. CREATE (n)-[r:INCLUDES]->(m)
  1494. RETURN r
  1495. """
  1496. session.run(create_meta_cypher, {'resource_id': int(resource_id), 'meta_id': int(meta_id)})
  1497. # 处理主数据关系
  1498. master_data = meta.get("master_data")
  1499. if master_data:
  1500. # 创建主数据关系
  1501. create_master_cypher = """
  1502. MATCH (master), (meta:DataMeta)
  1503. WHERE id(master) = $master_id AND id(meta) = $meta_id
  1504. MERGE (master)-[r:master]->(meta)
  1505. RETURN r
  1506. """
  1507. session.run(create_master_cypher, {'master_id': int(master_data), 'meta_id': int(meta_id)})
  1508. # 处理数据标准关系
  1509. data_standard = meta.get("data_standard")
  1510. if data_standard and isinstance(data_standard, dict):
  1511. standard_id = data_standard.get("id")
  1512. if standard_id:
  1513. # 创建数据标准与元数据的关系
  1514. create_standard_meta_cypher = """
  1515. MATCH (standard), (meta:DataMeta)
  1516. WHERE id(standard) = $standard_id AND id(meta) = $meta_id
  1517. MERGE (standard)-[r:clean_resource]->(meta)
  1518. RETURN r
  1519. """
  1520. session.run(create_standard_meta_cypher, {'standard_id': int(standard_id), 'meta_id': int(meta_id)})
  1521. # 创建数据资源与数据标准的关系
  1522. create_resource_standard_cypher = """
  1523. MATCH (resource:DataResource), (standard)
  1524. WHERE id(resource) = $resource_id AND id(standard) = $standard_id
  1525. MERGE (resource)-[r:clean_resource]->(standard)
  1526. RETURN r
  1527. """
  1528. session.run(create_resource_standard_cypher, {'resource_id': int(resource_id), 'standard_id': int(standard_id)})
  1529. else:
  1530. logger.info(f"parsed_data为空,只删除旧的元数据关系,不创建新的关系")
  1531. # 返回更新后的节点
  1532. node_data = serialize_node_properties(updated_node["n"])
  1533. node_data["id"] = updated_node["n"].id
  1534. # 记录最终返回的describe字段
  1535. logger.info(f"data_resource_edit返回数据,describe字段: {node_data.get('describe')}")
  1536. return node_data
  1537. except Exception as e:
  1538. logger.error(f"编辑数据资源失败: {str(e)}")
  1539. raise
  1540. def handle_data_source(data_source):
  1541. """处理数据源信息,创建或获取数据源节点"""
  1542. try:
  1543. with neo4j_driver.get_session() as session:
  1544. # 获取英文名称作为唯一标识
  1545. ds_name_en = data_source.get("name_en")
  1546. if not ds_name_en:
  1547. logger.error("数据源缺少必要的name_en属性")
  1548. return None
  1549. # 如果没有设置name_zh,使用name_en作为name_zh
  1550. if "name_zh" not in data_source or not data_source["name_zh"]:
  1551. data_source["name_zh"] = ds_name_en
  1552. # 检查必填字段
  1553. required_fields = ["type", "host", "port", "database", "username"]
  1554. has_required_fields = all(data_source.get(field) for field in required_fields)
  1555. # 查询是否已存在相同name_en的数据源
  1556. existing_cypher = """
  1557. MATCH (ds:DataSource {name_en: $name_en})
  1558. RETURN ds
  1559. """
  1560. existing_result = session.run(existing_cypher, {'name_en': ds_name_en})
  1561. existing_record = existing_result.single()
  1562. if existing_record:
  1563. existing_data_source = serialize_node_properties(existing_record["ds"])
  1564. logger.info(f"根据名称找到现有数据源: {existing_data_source.get('name_en')}")
  1565. return existing_data_source.get("name_en")
  1566. else:
  1567. # 数据源不存在,抛出异常
  1568. raise ValueError(f"未找到名称为 {ds_name_en} 的数据源,请先创建该数据源或提供完整的数据源信息")
  1569. except Exception as e:
  1570. logger.error(f"处理数据源失败: {str(e)}")
  1571. raise RuntimeError(f"处理数据源失败: {str(e)}")