resource.py 57 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410
  1. import json
  2. import re
  3. import logging
  4. from py2neo import Relationship
  5. import pandas as pd
  6. from app.services.neo4j_driver import neo4j_driver
  7. from app.core.graph.graph_operations import create_or_get_node, relationship_exists, get_node, connect_graph
  8. import time
  9. from datetime import datetime
  10. from app.core.meta_data import get_formatted_time, translate_and_parse
  11. from app import db
  12. from sqlalchemy import text
  13. logger = logging.getLogger("app")
  14. def serialize_neo4j_object(obj):
  15. """
  16. 将Neo4j对象转换为可JSON序列化的格式
  17. Args:
  18. obj: Neo4j节点或属性值
  19. Returns:
  20. 序列化后的对象
  21. """
  22. if hasattr(obj, 'year'): # DateTime对象
  23. # 将Neo4j DateTime转换为字符串
  24. return obj.strftime("%Y-%m-%d %H:%M:%S") if hasattr(obj, 'strftime') else str(obj)
  25. elif hasattr(obj, '__dict__'): # 复杂对象
  26. return str(obj)
  27. else:
  28. return obj
  29. def serialize_node_properties(node):
  30. """
  31. 将Neo4j节点属性序列化为可JSON化的字典
  32. Args:
  33. node: Neo4j节点对象
  34. Returns:
  35. dict: 序列化后的属性字典
  36. """
  37. properties = {}
  38. for key, value in dict(node).items():
  39. properties[key] = serialize_neo4j_object(value)
  40. return properties
  41. def get_formatted_time():
  42. """获取格式化的当前时间"""
  43. return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  44. def get_node_by_id(label, id):
  45. """根据ID获取指定标签的节点"""
  46. try:
  47. with neo4j_driver.get_session() as session:
  48. # 确保id为整数
  49. try:
  50. id_int = int(id)
  51. except (ValueError, TypeError):
  52. logger.error(f"节点ID不是有效的整数: {id}")
  53. return None
  54. cypher = f"MATCH (n:{label}) WHERE id(n) = $id RETURN n "
  55. result = session.run(cypher, id=id_int)
  56. record = result.single()
  57. return record["n"] if record else None
  58. except Exception as e:
  59. logger.error(f"根据ID获取节点失败: {str(e)}")
  60. return None
  61. def get_node_by_id_no_label(id):
  62. """根据ID获取节点,不限制标签"""
  63. try:
  64. with neo4j_driver.get_session() as session:
  65. # 确保id为整数
  66. try:
  67. id_int = int(id)
  68. except (ValueError, TypeError):
  69. logger.error(f"节点ID不是有效的整数: {id}")
  70. return None
  71. cypher = "MATCH (n) WHERE id(n) = $id RETURN n"
  72. result = session.run(cypher, id=id_int)
  73. record = result.single()
  74. return record["n"] if record else None
  75. except Exception as e:
  76. logger.error(f"根据ID获取节点失败: {str(e)}")
  77. return None
  78. def delete_relationships(start_node, rel_type=None, end_node=None):
  79. """删除关系"""
  80. try:
  81. with neo4j_driver.get_session() as session:
  82. if rel_type and end_node:
  83. cypher = "MATCH (a)-[r:`{rel_type}`]->(b) WHERE id(a) = $start_id AND id(b) = $end_id DELETE r"
  84. cypher = cypher.replace("{rel_type}", rel_type)
  85. session.run(cypher, start_id=start_node.id, end_id=end_node.id)
  86. elif rel_type:
  87. cypher = "MATCH (a)-[r:`{rel_type}`]->() WHERE id(a) = $start_id DELETE r"
  88. cypher = cypher.replace("{rel_type}", rel_type)
  89. session.run(cypher, start_id=start_node.id)
  90. else:
  91. cypher = "MATCH (a)-[r]->() WHERE id(a) = $start_id DELETE r"
  92. session.run(cypher, start_id=start_node.id)
  93. return True
  94. except Exception as e:
  95. logger.error(f"删除关系失败: {str(e)}")
  96. return False
  97. def update_or_create_node(label, **properties):
  98. """更新或创建节点"""
  99. try:
  100. with neo4j_driver.get_session() as session:
  101. node_id = properties.pop('id', None)
  102. if node_id:
  103. # 更新现有节点
  104. set_clause = ", ".join([f"n.{k} = ${k}" for k in properties.keys()])
  105. cypher = f"MATCH (n:{label}) WHERE id(n) = $id SET {set_clause} RETURN n"
  106. result = session.run(cypher, id=int(node_id), **properties)
  107. else:
  108. # 创建新节点
  109. props_str = ", ".join([f"{k}: ${k}" for k in properties.keys()])
  110. cypher = f"CREATE (n:{label} {{{props_str}}}) RETURN n"
  111. result = session.run(cypher, **properties)
  112. record = result.single()
  113. return record["n"] if record else None
  114. except Exception as e:
  115. logger.error(f"更新或创建节点失败: {str(e)}")
  116. return None
  117. # 数据资源-元数据 关系节点创建、查询
  118. def handle_node(receiver, head_data, data_source=None, resource_type=None):
  119. """处理数据资源节点创建和关系建立"""
  120. try:
  121. # 根据resource_type设置type属性的值
  122. if resource_type == 'ddl':
  123. type_value = 'ddl'
  124. else:
  125. type_value = 'structure'
  126. # 更新属性
  127. update_attributes = {
  128. 'en_name': receiver.get('en_name', receiver.get('name', '')),
  129. 'time': get_formatted_time(),
  130. 'type': type_value # 根据资源类型设置不同的type值
  131. }
  132. # 记录describe字段是否存在于创建数据中
  133. if "describe" in receiver:
  134. logger.info(f"创建资源,describe字段将被设置为: {receiver.get('describe')}")
  135. else:
  136. logger.info("创建资源,describe字段不在创建数据中")
  137. if 'additional_info' in receiver:
  138. del receiver['additional_info']
  139. # 从receiver中移除data_source属性,避免将复杂对象作为节点属性
  140. if 'data_source' in receiver:
  141. del receiver['data_source']
  142. tag_list = receiver.get('tag')
  143. receiver.update(update_attributes)
  144. # 创建或获取 DataResource 节点
  145. with neo4j_driver.get_session() as session:
  146. props_str = ", ".join([f"{k}: ${k}" for k in receiver.keys()])
  147. cypher = f"""
  148. MERGE (n:DataResource {{name: $name}})
  149. ON CREATE SET n = {{{props_str}}}
  150. ON MATCH SET {", ".join([f"n.{k} = ${k}" for k in receiver.keys()])}
  151. RETURN n
  152. """
  153. result = session.run(cypher, **receiver)
  154. data_resource_node = result.single()["n"]
  155. resource_id = data_resource_node.id # 使用id属性获取数值ID
  156. # 记录创建后的节点数据
  157. logger.info(f"创建后的节点数据,describe字段: {data_resource_node.get('describe')}")
  158. # 处理标签关系
  159. if tag_list:
  160. tag_node = get_node_by_id('DataLabel', tag_list)
  161. if tag_node:
  162. # 检查关系是否存在
  163. rel_check = """
  164. MATCH (a:DataResource)-[r:label]->(b:DataLabel)
  165. WHERE id(a) = $resource_id AND id(b) = $tag_id
  166. RETURN r
  167. """
  168. rel_result = session.run(rel_check, resource_id=resource_id, tag_id=tag_node.id) # 使用数值id
  169. # 如果关系不存在则创建
  170. if not rel_result.single():
  171. rel_create = """
  172. MATCH (a:DataResource), (b:DataLabel)
  173. WHERE id(a) = $resource_id AND id(b) = $tag_id
  174. CREATE (a)-[r:label]->(b)
  175. RETURN r
  176. """
  177. session.run(rel_create, resource_id=resource_id, tag_id=tag_node.id)
  178. # 处理头部数据(元数据,字段)
  179. if head_data:
  180. for item in head_data:
  181. # 创建元数据节点
  182. meta_cypher = """
  183. MERGE (m:DataMeta {name: $name})
  184. ON CREATE SET m.en_name = $en_name,
  185. m.create_time = $create_time,
  186. m.data_type = $type,
  187. m.status = 'true'
  188. ON MATCH SET m.data_type = $type,
  189. m.status = 'true'
  190. RETURN m
  191. """
  192. create_time = get_formatted_time()
  193. meta_result = session.run(
  194. meta_cypher,
  195. name=item['name'],
  196. en_name=item['en_name'],
  197. create_time=create_time,
  198. type=item['data_type'] # 使用data_type作为data_type属性
  199. )
  200. meta_record = meta_result.single()
  201. if meta_record and meta_record["m"]:
  202. meta_node = meta_record["m"]
  203. meta_id = meta_node.id # 使用数值ID
  204. # 打印日志确认节点创建成功和ID
  205. logger.info(f"创建或获取到元数据节点: ID={meta_id}, name={item['name']}")
  206. # 确认数据资源节点是否可以正确查询到
  207. check_resource_cypher = """
  208. MATCH (n:DataResource)
  209. WHERE id(n) = $resource_id
  210. RETURN n
  211. """
  212. check_resource = session.run(check_resource_cypher, resource_id=resource_id)
  213. if check_resource.single():
  214. logger.info(f"找到数据资源节点: ID={resource_id}")
  215. else:
  216. logger.error(f"无法找到数据资源节点: ID={resource_id}")
  217. continue
  218. # 创建关系
  219. rel_cypher = """
  220. MATCH (a:DataResource), (m:DataMeta)
  221. WHERE id(a) = $resource_id AND id(m) = $meta_id
  222. MERGE (a)-[r:contain]->(m)
  223. RETURN r
  224. """
  225. rel_result = session.run(
  226. rel_cypher,
  227. resource_id=resource_id,
  228. meta_id=meta_id
  229. )
  230. rel_record = rel_result.single()
  231. if rel_record:
  232. logger.info(f"成功创建数据资源与元数据的关系: {resource_id} -> {meta_id}")
  233. else:
  234. logger.warning(f"创建数据资源与元数据的关系失败: {resource_id} -> {meta_id}")
  235. else:
  236. logger.error(f"未能创建或获取元数据节点: {item['name']}")
  237. # 处理数据源关系
  238. if data_source and resource_type == 'ddl':
  239. try:
  240. # 创建或获取数据源节点
  241. # data_source_en_name = handle_data_source(data_source)
  242. data_source_en_name = data_source['en_name']
  243. # 创建数据资源与数据源的关系
  244. if data_source_en_name:
  245. # 创建 originates_from 关系
  246. rel_data_source_cypher = """
  247. MATCH (a:DataResource), (b:DataSource)
  248. WHERE id(a) = $resource_id AND b.en_name = $ds_en_name
  249. MERGE (a)-[r:originates_from]->(b)
  250. RETURN r
  251. """
  252. rel_result = session.run(
  253. rel_data_source_cypher,
  254. resource_id=resource_id,
  255. ds_en_name=data_source_en_name
  256. )
  257. rel_record = rel_result.single()
  258. if rel_record:
  259. logger.info(f"已创建数据资源与数据源的关系: {resource_id} -> {data_source_en_name}")
  260. else:
  261. # 添加严重错误日志
  262. error_msg = f"创建数据资源与数据源的关系失败: {resource_id} -> {data_source_en_name}"
  263. logger.error(error_msg)
  264. # 检查数据源节点是否存在
  265. check_ds_cypher = "MATCH (b:DataSource) WHERE b.en_name = $ds_en_name RETURN b"
  266. check_ds_result = session.run(check_ds_cypher, ds_en_name=data_source_en_name)
  267. if not check_ds_result.single():
  268. logger.error(f"数据源节点不存在: en_name={data_source_en_name}")
  269. # 检查数据资源节点是否存在
  270. check_res_cypher = "MATCH (a:DataResource) WHERE id(a) = $resource_id RETURN a"
  271. check_res_result = session.run(check_res_cypher, resource_id=resource_id)
  272. if not check_res_result.single():
  273. logger.error(f"数据资源节点不存在: id={resource_id}")
  274. # 严重错误应该抛出异常
  275. raise RuntimeError(error_msg)
  276. except Exception as e:
  277. logger.error(f"处理数据源关系失败: {str(e)}")
  278. raise RuntimeError(f"处理数据源关系失败: {str(e)}")
  279. return resource_id
  280. except Exception as e:
  281. logger.error(f"处理数据资源节点创建和关系建立失败: {str(e)}")
  282. raise
  283. def handle_id_resource(resource_id):
  284. """处理单个数据资源查询"""
  285. try:
  286. with neo4j_driver.get_session() as session:
  287. # 确保resource_id为整数
  288. try:
  289. resource_id_int = int(resource_id)
  290. except (ValueError, TypeError):
  291. logger.error(f"资源ID不是有效的整数: {resource_id}")
  292. return None
  293. # 使用数值ID查询
  294. cypher = """
  295. MATCH (n:DataResource)
  296. WHERE id(n) = $resource_id
  297. RETURN n
  298. """
  299. result = session.run(cypher, resource_id=resource_id_int)
  300. record = result.single()
  301. if not record:
  302. logger.error(f"未找到资源,ID: {resource_id_int}")
  303. return None
  304. # 构建返回数据
  305. logger.info(f"record: {record}")
  306. data_resource = serialize_node_properties(record["n"])
  307. logger.info(f"data_resource: {data_resource}")
  308. logger.info(f"describe field in node: {record['n'].get('describe')}")
  309. # 确保describe字段存在,即使为null也记录下来
  310. if 'describe' in record["n"]:
  311. data_resource["describe"] = record["n"].get('describe')
  312. logger.info(f"设置describe字段: {data_resource['describe']}")
  313. data_resource["id"] = record["n"].id
  314. # 查询关联的标签
  315. tag_cypher = """
  316. MATCH (n:DataResource)-[r:label]->(t:DataLabel)
  317. WHERE id(n) = $resource_id
  318. RETURN t
  319. """
  320. tag_result = session.run(tag_cypher, resource_id=resource_id_int)
  321. tag_record = tag_result.single()
  322. # 设置标签信息
  323. if tag_record:
  324. tag = {
  325. "name": tag_record["t"].get("name"),
  326. "id": tag_record["t"].id
  327. }
  328. else:
  329. tag = {
  330. "name": None,
  331. "id": None
  332. }
  333. data_resource["tag"] = tag
  334. # 查询关联的元数据 - 支持meta_data和Metadata两种标签
  335. meta_cypher = """
  336. MATCH (n:DataResource)-[:contain]->(m)
  337. WHERE id(n) = $resource_id
  338. AND (m:DataMeta OR m:Metadata)
  339. RETURN m
  340. """
  341. meta_result = session.run(meta_cypher, resource_id=resource_id_int)
  342. parsed_data = []
  343. for meta_record in meta_result:
  344. meta = serialize_node_properties(meta_record["m"])
  345. meta_data = {
  346. "id": meta_record["m"].id,
  347. "name": meta.get("name"),
  348. "en_name": meta.get("en_name"),
  349. "data_type": meta.get("data_type"),
  350. "data_standard": {
  351. "name": None,
  352. "id": None
  353. }
  354. }
  355. parsed_data.append(meta_data)
  356. data_resource["parsed_data"] = parsed_data
  357. # 确保所有必需字段都有默认值
  358. required_fields = {
  359. "leader": "",
  360. "organization": "",
  361. "name": "",
  362. "en_name": "",
  363. "data_sensitivity": "",
  364. "storage_location": "/",
  365. "time": "",
  366. "type": "",
  367. "category": "",
  368. "url": "",
  369. "frequency": "",
  370. "status": True,
  371. "keywords": [],
  372. "describe": ""
  373. }
  374. for field, default_value in required_fields.items():
  375. if field not in data_resource or data_resource[field] is None:
  376. data_resource[field] = default_value
  377. logger.info(f"成功获取资源详情,ID: {resource_id_int}, describe: {data_resource.get('describe')}")
  378. return data_resource
  379. except Exception as e:
  380. logger.error(f"处理单个数据资源查询失败: {str(e)}")
  381. return None
  382. def id_resource_graph(resource_id):
  383. """获取数据资源图谱"""
  384. try:
  385. with neo4j_driver.get_session() as session:
  386. # 查询数据资源节点及其关系
  387. cypher = """
  388. MATCH (n:DataResource)-[r]-(m)
  389. WHERE id(n) = $resource_id
  390. RETURN n, r, m
  391. """
  392. result = session.run(cypher, resource_id=int(resource_id))
  393. # 收集节点和关系
  394. nodes = {}
  395. relationships = []
  396. for record in result:
  397. # 处理源节点
  398. source_node = serialize_node_properties(record["n"])
  399. source_node["id"] = record["n"].id
  400. nodes[source_node["id"]] = source_node
  401. # 处理目标节点
  402. target_node = serialize_node_properties(record["m"])
  403. target_node["id"] = record["m"].id
  404. nodes[target_node["id"]] = target_node
  405. # 处理关系
  406. rel = record["r"]
  407. relationship = {
  408. "id": rel.id,
  409. "source": record["n"].id,
  410. "target": record["m"].id,
  411. "type": rel.type
  412. }
  413. relationships.append(relationship)
  414. return {
  415. "nodes": list(nodes.values()),
  416. "relationships": relationships
  417. }
  418. except Exception as e:
  419. logger.error(f"获取数据资源图谱失败: {str(e)}")
  420. return {"nodes": [], "relationships": []}
  421. def resource_list(page, page_size, en_name_filter=None, name_filter=None,
  422. type_filter='all', category_filter=None, tag_filter=None):
  423. """获取数据资源列表"""
  424. try:
  425. with neo4j_driver.get_session() as session:
  426. # 构建查询条件
  427. match_clause = "MATCH (n:DataResource)"
  428. where_conditions = []
  429. if en_name_filter:
  430. where_conditions.append(f"n.en_name CONTAINS '{en_name_filter}'")
  431. if name_filter:
  432. where_conditions.append(f"n.name CONTAINS '{name_filter}'")
  433. if type_filter and type_filter != 'all':
  434. where_conditions.append(f"n.type = '{type_filter}'")
  435. if category_filter:
  436. where_conditions.append(f"n.category = '{category_filter}'")
  437. # 标签过滤需要额外的匹配
  438. if tag_filter:
  439. match_clause += "-[:label]->(t:DataLabel)"
  440. where_conditions.append(f"t.name = '{tag_filter}'")
  441. where_clause = " WHERE " + " AND ".join(where_conditions) if where_conditions else ""
  442. # 计算总数
  443. count_cypher = f"{match_clause}{where_clause} RETURN count(n) as count"
  444. count_result = session.run(count_cypher)
  445. total_count = count_result.single()["count"]
  446. # 分页查询
  447. skip = (page - 1) * page_size
  448. cypher = f"""
  449. {match_clause}{where_clause}
  450. RETURN n
  451. ORDER BY n.time DESC
  452. SKIP {skip} LIMIT {page_size}
  453. """
  454. result = session.run(cypher)
  455. # 格式化结果
  456. resources = []
  457. for record in result:
  458. node = serialize_node_properties(record["n"])
  459. node["id"] = record["n"].id
  460. # 查询关联的标签
  461. tag_cypher = """
  462. MATCH (n:DataResource)-[r:label]->(t:DataLabel)
  463. WHERE id(n) = $resource_id
  464. RETURN t
  465. """
  466. tag_result = session.run(tag_cypher, resource_id=node["id"])
  467. tag_record = tag_result.single()
  468. if tag_record:
  469. tag = serialize_node_properties(tag_record["t"])
  470. tag["id"] = tag_record["t"].id
  471. node["tag_info"] = tag
  472. resources.append(node)
  473. return resources, total_count
  474. except Exception as e:
  475. logger.error(f"获取数据资源列表失败: {str(e)}")
  476. return [], 0
  477. def id_data_search_list(resource_id, page, page_size, en_name_filter=None,
  478. name_filter=None, category_filter=None, tag_filter=None):
  479. """获取特定数据资源关联的元数据列表"""
  480. try:
  481. with neo4j_driver.get_session() as session:
  482. # 确保resource_id为整数
  483. try:
  484. resource_id_int = int(resource_id)
  485. except (ValueError, TypeError):
  486. logger.error(f"资源ID不是有效的整数: {resource_id}")
  487. return [], 0
  488. # 基本匹配语句 - 支持meta_data和Metadata标签
  489. match_clause = """
  490. MATCH (n:DataResource)-[:contain]->(m)
  491. WHERE id(n) = $resource_id
  492. AND (m:DataMeta OR m:Metadata)
  493. """
  494. where_conditions = []
  495. if en_name_filter:
  496. where_conditions.append(f"m.en_name CONTAINS '{en_name_filter}'")
  497. if name_filter:
  498. where_conditions.append(f"m.name CONTAINS '{name_filter}'")
  499. if category_filter:
  500. where_conditions.append(f"m.category = '{category_filter}'")
  501. # 标签过滤需要额外的匹配
  502. tag_match = ""
  503. if tag_filter:
  504. tag_match = "MATCH (m)-[:HAS_TAG]->(t:Tag) WHERE t.name = $tag_filter"
  505. where_clause = " AND " + " AND ".join(where_conditions) if where_conditions else ""
  506. # 计算总数
  507. count_cypher = f"""
  508. {match_clause}{where_clause}
  509. {tag_match}
  510. RETURN count(m) as count
  511. """
  512. count_params = {"resource_id": resource_id_int}
  513. if tag_filter:
  514. count_params["tag_filter"] = tag_filter
  515. count_result = session.run(count_cypher, **count_params)
  516. total_count = count_result.single()["count"]
  517. # 分页查询
  518. skip = (page - 1) * page_size
  519. cypher = f"""
  520. {match_clause}{where_clause}
  521. {tag_match}
  522. RETURN m
  523. ORDER BY m.name
  524. SKIP {skip} LIMIT {page_size}
  525. """
  526. result = session.run(cypher, **count_params)
  527. # 格式化结果
  528. metadata_list = []
  529. for record in result:
  530. meta = serialize_node_properties(record["m"])
  531. meta["id"] = record["m"].id
  532. metadata_list.append(meta)
  533. logger.info(f"成功获取资源关联元数据,ID: {resource_id_int}, 元数据数量: {total_count}")
  534. return metadata_list, total_count
  535. except Exception as e:
  536. logger.error(f"获取数据资源关联的元数据列表失败: {str(e)}")
  537. return [], 0
  538. def resource_kinship_graph(resource_id, include_meta=True):
  539. """获取数据资源亲缘关系图谱"""
  540. try:
  541. with neo4j_driver.get_session() as session:
  542. # 确保resource_id为整数
  543. try:
  544. resource_id_int = int(resource_id)
  545. except (ValueError, TypeError):
  546. logger.error(f"资源ID不是有效的整数: {resource_id}")
  547. return {"nodes": [], "relationships": []}
  548. # 基本查询
  549. cypher_parts = [
  550. f"MATCH (n:DataResource) WHERE id(n) = $resource_id",
  551. "OPTIONAL MATCH (n)-[:label]->(l:DataLabel)",
  552. ]
  553. # 是否包含元数据 - 支持meta_data和Metadata两种标签
  554. if include_meta:
  555. cypher_parts.append("OPTIONAL MATCH (n)-[:contain]->(m) WHERE (m:DataMeta OR m:Metadata)")
  556. cypher_parts.append("RETURN n, l, collect(m) as metadata")
  557. cypher = "\n".join(cypher_parts)
  558. result = session.run(cypher, resource_id=resource_id_int)
  559. record = result.single()
  560. if not record:
  561. logger.error(f"未找到资源图谱数据,ID: {resource_id_int}")
  562. return {"nodes": [], "relationships": []}
  563. # 收集节点和关系
  564. nodes = {}
  565. relationships = []
  566. # 处理数据资源节点
  567. resource_node = serialize_node_properties(record["n"])
  568. resource_node["id"] = record["n"].id
  569. resource_node["labels"] = list(record["n"].labels)
  570. nodes[resource_node["id"]] = resource_node
  571. # 处理标签节点
  572. if record["l"]:
  573. label_node = serialize_node_properties(record["l"])
  574. label_node["id"] = record["l"].id
  575. label_node["labels"] = list(record["l"].labels)
  576. nodes[label_node["id"]] = label_node
  577. # 添加资源-标签关系
  578. relationships.append({
  579. "id": f"rel-{resource_node['id']}-label-{label_node['id']}",
  580. "source": resource_node["id"],
  581. "target": label_node["id"],
  582. "type": "label"
  583. })
  584. # 处理元数据节点
  585. if include_meta and record["metadata"]:
  586. for meta in record["metadata"]:
  587. if meta: # 检查元数据节点是否存在
  588. meta_node = serialize_node_properties(meta)
  589. meta_node["id"] = meta.id
  590. meta_node["labels"] = list(meta.labels)
  591. nodes[meta_node["id"]] = meta_node
  592. # 添加资源-元数据关系
  593. relationships.append({
  594. "id": f"rel-{resource_node['id']}-contain-{meta_node['id']}",
  595. "source": resource_node["id"],
  596. "target": meta_node["id"],
  597. "type": "contain"
  598. })
  599. logger.info(f"成功获取资源图谱,ID: {resource_id_int}, 节点数: {len(nodes)}")
  600. return {
  601. "nodes": list(nodes.values()),
  602. "relationships": relationships
  603. }
  604. except Exception as e:
  605. logger.error(f"获取数据资源亲缘关系图谱失败: {str(e)}")
  606. return {"nodes": [], "relationships": []}
  607. def resource_impact_all_graph(resource_id, include_meta=True):
  608. """获取数据资源影响关系图谱"""
  609. try:
  610. with neo4j_driver.get_session() as session:
  611. # 确保resource_id为整数
  612. try:
  613. resource_id_int = int(resource_id)
  614. except (ValueError, TypeError):
  615. logger.error(f"资源ID不是有效的整数: {resource_id}")
  616. return {"nodes": [], "lines": []}
  617. # 根据meta参数决定查询深度
  618. if include_meta:
  619. cypher = """
  620. MATCH path = (n:DataResource)-[*1..3]-(m)
  621. WHERE id(n) = $resource_id
  622. RETURN path
  623. """
  624. else:
  625. cypher = """
  626. MATCH path = (n:DataResource)-[*1..2]-(m)
  627. WHERE id(n) = $resource_id
  628. AND NOT (m:DataMeta) AND NOT (m:Metadata)
  629. RETURN path
  630. """
  631. result = session.run(cypher, resource_id=resource_id_int)
  632. # 收集节点和关系
  633. nodes = {}
  634. lines = {}
  635. for record in result:
  636. path = record["path"]
  637. # 处理路径中的所有节点
  638. for node in path.nodes:
  639. if node.id not in nodes:
  640. node_dict = serialize_node_properties(node)
  641. node_dict["id"] = str(node.id)
  642. node_dict["node_type"] = list(node.labels)[0] if node.labels else ""
  643. nodes[node.id] = node_dict
  644. # 处理路径中的所有关系。Neo4j的路径对象(path)中,关系集合的属性名是relationships
  645. for rel in path.relationships:
  646. if rel.id not in lines:
  647. rel_dict = {
  648. "id": str(rel.id),
  649. "from": str(rel.start_node.id),
  650. "to": str(rel.end_node.id),
  651. "type": rel.type
  652. }
  653. lines[rel.id] = rel_dict
  654. logger.info(f"成功获取完整图谱,ID: {resource_id_int}, 节点数: {len(nodes)}")
  655. return {
  656. "nodes": list(nodes.values()),
  657. "lines": list(lines.values())
  658. }
  659. except Exception as e:
  660. logger.error(f"获取数据资源影响关系图谱失败: {str(e)}")
  661. return {"nodes": [], "lines": []}
  662. def clean_type(type_str):
  663. """清洗SQL类型字符串"""
  664. # 提取基本类型,不包括长度或精度信息
  665. basic_type = re.sub(r'\(.*?\)', '', type_str).strip().upper()
  666. # 移除 VARYING 这样的后缀
  667. basic_type = re.sub(r'\s+VARYING$', '', basic_type)
  668. # 标准化常见类型
  669. type_mapping = {
  670. 'INT': 'INTEGER',
  671. 'INT4': 'INTEGER',
  672. 'INT8': 'BIGINT',
  673. 'SMALLINT': 'SMALLINT',
  674. 'BIGINT': 'BIGINT',
  675. 'FLOAT4': 'FLOAT',
  676. 'FLOAT8': 'DOUBLE',
  677. 'REAL': 'FLOAT',
  678. 'DOUBLE PRECISION': 'DOUBLE',
  679. 'NUMERIC': 'DECIMAL',
  680. 'BOOL': 'BOOLEAN',
  681. 'CHARACTER': 'CHAR',
  682. 'CHAR VARYING': 'VARCHAR',
  683. 'CHARACTER VARYING': 'VARCHAR',
  684. 'TEXT': 'TEXT',
  685. 'DATE': 'DATE',
  686. 'TIME': 'TIME',
  687. 'TIMESTAMP': 'TIMESTAMP',
  688. 'TIMESTAMPTZ': 'TIMESTAMP WITH TIME ZONE',
  689. 'BYTEA': 'BINARY',
  690. 'JSON': 'JSON',
  691. 'JSONB': 'JSONB',
  692. 'UUID': 'UUID',
  693. 'SERIAL': 'SERIAL',
  694. 'SERIAL4': 'SERIAL',
  695. 'SERIAL8': 'BIGSERIAL',
  696. 'BIGSERIAL': 'BIGSERIAL'
  697. }
  698. # 尝试从映射表中获取标准化的类型
  699. return type_mapping.get(basic_type, basic_type)
  700. def clean_field_name(field_name):
  701. """清洗字段名"""
  702. return field_name.strip('`').strip('"').strip("'")
  703. def select_create_ddl(sql_content):
  704. """从SQL内容中提取创建表的DDL语句"""
  705. try:
  706. # 解析复杂的SQL文件,识别所有的CREATE TABLE语句及其关联的注释
  707. # 找到所有以CREATE TABLE开头的语句块,每个语句块包含主语句和相关的注释
  708. # 首先,分割 SQL 内容按分号
  709. statements = []
  710. current_statement = ""
  711. in_string = False
  712. string_quote = None
  713. for char in sql_content:
  714. if char in ["'", '"']:
  715. if not in_string:
  716. in_string = True
  717. string_quote = char
  718. elif char == string_quote:
  719. in_string = False
  720. string_quote = None
  721. current_statement += char
  722. elif char == ';' and not in_string:
  723. current_statement += char
  724. if current_statement.strip():
  725. statements.append(current_statement.strip())
  726. current_statement = ""
  727. else:
  728. current_statement += char
  729. if current_statement.strip():
  730. statements.append(current_statement.strip())
  731. # 找出所有的CREATE TABLE语句和关联的注释
  732. create_table_statements = []
  733. create_index = -1
  734. in_table_block = False
  735. current_table = None
  736. current_block = ""
  737. for i, stmt in enumerate(statements):
  738. if re.search(r'^\s*CREATE\s+TABLE', stmt, re.IGNORECASE):
  739. # 如果已经在处理表,先保存当前块
  740. if in_table_block and current_block:
  741. create_table_statements.append(current_block)
  742. # 开始新的表块
  743. in_table_block = True
  744. current_block = stmt
  745. # 提取表名
  746. table_match = re.search(r'CREATE\s+TABLE\s+(?:(?:"[^"]+"|\'[^\']+\'|[^"\'\s\.]+)\.)?(?:"([^"]+)"|\'([^\']+)\'|([^"\'\s\(]+))', stmt, re.IGNORECASE)
  747. if table_match:
  748. current_table = table_match.group(1) or table_match.group(2) or table_match.group(3)
  749. current_table = current_table.strip('"\'') if current_table else ""
  750. elif in_table_block and (re.search(r'COMMENT\s+ON\s+TABLE', stmt, re.IGNORECASE) or
  751. re.search(r'COMMENT\s+ON\s+COLUMN', stmt, re.IGNORECASE)):
  752. # 检查注释是否属于当前表
  753. if current_table:
  754. # 表注释处理
  755. if re.search(r'COMMENT\s+ON\s+TABLE', stmt, re.IGNORECASE):
  756. table_comment_match = re.search(r'COMMENT\s+ON\s+TABLE\s+[\'"]?(\w+)[\'"]?', stmt, re.IGNORECASE)
  757. if table_comment_match:
  758. comment_table = table_comment_match.group(1).strip('"\'')
  759. if comment_table == current_table:
  760. current_block += " " + stmt
  761. else:
  762. # 这是另一个表的注释,当前表的DDL到此结束
  763. create_table_statements.append(current_block)
  764. in_table_block = False
  765. current_block = ""
  766. current_table = None
  767. # 列注释处理
  768. elif re.search(r'COMMENT\s+ON\s+COLUMN', stmt, re.IGNORECASE):
  769. column_comment_match = re.search(
  770. r'COMMENT\s+ON\s+COLUMN\s+[\'"]?(\w+)[\'"]?\.[\'"]?(\w+)[\'"]?\s+IS\s+\'([^\']+)\'',
  771. stmt,
  772. re.IGNORECASE
  773. )
  774. if column_comment_match:
  775. comment_table = column_comment_match.group(1)
  776. if comment_table == current_table:
  777. current_block += " " + stmt
  778. else:
  779. # 这是另一个表的注释,当前表的DDL到此结束
  780. create_table_statements.append(current_block)
  781. in_table_block = False
  782. current_block = ""
  783. current_table = None
  784. elif in_table_block and re.search(r'^\s*CREATE\s+', stmt, re.IGNORECASE):
  785. # 如果遇到新的CREATE语句(不是注释),保存当前块并结束
  786. create_table_statements.append(current_block)
  787. in_table_block = False
  788. current_block = ""
  789. current_table = None
  790. # 添加最后一个块
  791. if in_table_block and current_block:
  792. create_table_statements.append(current_block)
  793. # 日志记录
  794. logger.debug(f"提取到 {len(create_table_statements)} 个DDL语句")
  795. for i, stmt in enumerate(create_table_statements):
  796. logger.debug(f"DDL语句 {i+1}: {stmt}")
  797. return create_table_statements
  798. except Exception as e:
  799. logger.error(f"提取DDL语句失败: {str(e)}")
  800. # logger.error(traceback.format_exc())
  801. return []
  802. def table_sql(sql):
  803. """解析表定义SQL,支持带schema和不带schema两种格式"""
  804. try:
  805. # 支持以下格式:
  806. # 1. CREATE TABLE tablename
  807. # 2. CREATE TABLE "tablename"
  808. # 3. CREATE TABLE 'tablename'
  809. # 4. CREATE TABLE schema.tablename
  810. # 5. CREATE TABLE "schema"."tablename"
  811. # 6. CREATE TABLE 'schema'.'tablename'
  812. # 匹配表名,支持带引号和不带引号的情况
  813. table_pattern = r'CREATE\s+TABLE\s+(?:(?:"([^"]+)"|\'([^\']+)\'|([^"\'\s\.]+))\.)?(?:"([^"]+)"|\'([^\']+)\'|([^"\'\s\(]+))'
  814. table_match = re.search(table_pattern, sql, re.IGNORECASE)
  815. if not table_match:
  816. logger.error(f"无法匹配CREATE TABLE语句: {sql[:100]}...")
  817. return None
  818. # 获取表名
  819. schema = table_match.group(1) or table_match.group(2) or table_match.group(3)
  820. table_name = table_match.group(4) or table_match.group(5) or table_match.group(6)
  821. if not table_name:
  822. logger.error("无法解析表名")
  823. return None
  824. logger.debug(f"解析到表名: {table_name}")
  825. # 提取CREATE TABLE语句的主体部分(括号内的内容)
  826. body_pattern = r'CREATE\s+TABLE\s+[^(]*\((.*?)\)(?=\s*;|\s*$)'
  827. body_match = re.search(body_pattern, sql, re.DOTALL | re.IGNORECASE)
  828. if not body_match:
  829. logger.error("无法提取表主体内容")
  830. return None
  831. body_text = body_match.group(1).strip()
  832. logger.debug(f"表定义主体部分: {body_text}")
  833. # 解析字段定义
  834. fields = []
  835. # 分割字段定义,处理括号嵌套和引号
  836. field_defs = []
  837. pos = 0
  838. in_parentheses = 0
  839. in_quotes = False
  840. quote_char = None
  841. for i, char in enumerate(body_text):
  842. if char in ["'", '"', '`'] and (not in_quotes or char == quote_char):
  843. in_quotes = not in_quotes
  844. if in_quotes:
  845. quote_char = char
  846. else:
  847. quote_char = None
  848. elif char == '(' and not in_quotes:
  849. in_parentheses += 1
  850. elif char == ')' and not in_quotes:
  851. in_parentheses -= 1
  852. elif char == ',' and in_parentheses == 0 and not in_quotes:
  853. field_defs.append(body_text[pos:i].strip())
  854. pos = i + 1
  855. # 添加最后一个字段定义
  856. if pos < len(body_text):
  857. field_defs.append(body_text[pos:].strip())
  858. logger.debug(f"解析出 {len(field_defs)} 个字段定义")
  859. # 处理每个字段定义
  860. for field_def in field_defs:
  861. # 跳过约束定义
  862. if re.match(r'^\s*(?:PRIMARY|UNIQUE|FOREIGN|CHECK|CONSTRAINT)\s+', field_def, re.IGNORECASE):
  863. continue
  864. # 提取字段名和类型
  865. field_pattern = r'^\s*(?:"([^"]+)"|\'([^\']+)\'|`([^`]+)`|([a-zA-Z0-9_]+))\s+(.+?)(?:\s+DEFAULT\s+|\s+NOT\s+NULL|\s+REFERENCES|\s*$)'
  866. field_match = re.search(field_pattern, field_def, re.IGNORECASE)
  867. if field_match:
  868. # 提取字段名
  869. field_name = field_match.group(1) or field_match.group(2) or field_match.group(3) or field_match.group(4)
  870. # 提取类型
  871. field_type = field_match.group(5).strip()
  872. # 处理类型中可能的括号
  873. type_base = re.split(r'\s+', field_type)[0]
  874. clean_type_value = clean_type(type_base)
  875. fields.append((field_name, clean_type_value))
  876. logger.debug(f"解析到字段: {field_name}, 类型: {clean_type_value}")
  877. else:
  878. logger.warning(f"无法解析字段定义: {field_def}")
  879. # 提取表注释
  880. table_comment = ""
  881. table_comment_pattern = r"COMMENT\s+ON\s+TABLE\s+(?:['\"]?(\w+)['\"]?)\s+IS\s+'([^']+)'"
  882. table_comment_match = re.search(table_comment_pattern, sql, re.IGNORECASE)
  883. if table_comment_match:
  884. comment_table = table_comment_match.group(1)
  885. if comment_table.strip("'\"") == table_name.strip("'\""):
  886. table_comment = table_comment_match.group(2)
  887. logger.debug(f"找到表注释: {table_comment}")
  888. # 提取列注释
  889. comments = {}
  890. column_comment_pattern = r"COMMENT\s+ON\s+COLUMN\s+['\"]?(\w+)['\"]?\.['\"]?(\w+)['\"]?\s+IS\s+'([^']+)'"
  891. for match in re.finditer(column_comment_pattern, sql, re.IGNORECASE):
  892. comment_table = match.group(1)
  893. column_name = match.group(2)
  894. comment = match.group(3)
  895. # 检查表名是否匹配
  896. if comment_table.strip("'\"") == table_name.strip("'\""):
  897. comments[column_name] = comment
  898. logger.debug(f"找到列注释: {column_name} - {comment}")
  899. else:
  900. logger.debug(f"忽略列注释,表名不匹配: {comment_table} vs {table_name}")
  901. # 检查字段和注释匹配情况
  902. logger.debug("========字段和注释匹配情况========")
  903. field_names = [f[0] for f in fields]
  904. logger.debug(f"字段列表 ({len(field_names)}): {field_names}")
  905. logger.debug(f"注释字段 ({len(comments)}): {list(comments.keys())}")
  906. # 构建返回结果
  907. meta_list = []
  908. for field_name, field_type in fields:
  909. chinese_name = comments.get(field_name, "")
  910. meta_list.append({
  911. "en_name": field_name,
  912. "data_type": field_type,
  913. "name": chinese_name if chinese_name else field_name
  914. })
  915. # 检查表是否存在
  916. try:
  917. status = status_query([table_name])
  918. except Exception as e:
  919. logger.error(f"检查表存在状态失败: {str(e)}")
  920. status = [False]
  921. # 构建返回结果
  922. result = {
  923. table_name: {
  924. "exist": status[0] if status else False,
  925. "meta": meta_list
  926. }
  927. }
  928. logger.debug(f"解析结果: {json.dumps(result, ensure_ascii=False)}")
  929. return result
  930. except Exception as e:
  931. logger.error(f"解析表定义SQL失败: {str(e)}")
  932. logger.error(f"异常详情: {e}")
  933. import traceback
  934. logger.error(traceback.format_exc())
  935. return None
  936. # 判断英文表名是否在图谱中存在
  937. def status_query(key_list):
  938. query = """
  939. unwind $Key_list as name
  940. OPTIONAL MATCH (n:DataModel {en_name: name})
  941. OPTIONAL MATCH (n:DataResource {en_name: name})
  942. OPTIONAL MATCH (n:DataMetric {en_name: name})
  943. WITH name, CASE
  944. WHEN n IS NOT NULL THEN True
  945. ELSE False
  946. END AS exist
  947. return collect(exist)AS exist
  948. """
  949. with neo4j_driver.get_session() as session:
  950. result = session.run(query, Key_list=key_list)
  951. data = result.value() # 获取单个值
  952. return data
  953. def select_sql(sql_query):
  954. """解析SELECT查询语句"""
  955. try:
  956. # 提取SELECT子句
  957. select_pattern = r'SELECT\s+(.*?)\s+FROM'
  958. select_match = re.search(select_pattern, sql_query, re.IGNORECASE | re.DOTALL)
  959. if not select_match:
  960. return None
  961. select_clause = select_match.group(1)
  962. # 分割字段
  963. fields = []
  964. # 处理字段列表,避免在函数调用中的逗号导致错误分割
  965. in_parenthesis = 0
  966. current_field = ""
  967. for char in select_clause:
  968. if char == '(':
  969. in_parenthesis += 1
  970. current_field += char
  971. elif char == ')':
  972. in_parenthesis -= 1
  973. current_field += char
  974. elif char == ',' and in_parenthesis == 0:
  975. fields.append(current_field.strip())
  976. current_field = ""
  977. else:
  978. current_field += char
  979. if current_field.strip():
  980. fields.append(current_field.strip())
  981. # 解析每个字段
  982. parsed_fields = []
  983. for field in fields:
  984. # 检查是否有字段别名
  985. alias_pattern = r'(.*?)\s+[aA][sS]\s+(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))$'
  986. alias_match = re.search(alias_pattern, field)
  987. if alias_match:
  988. field_expr = alias_match.group(1).strip()
  989. field_alias = next((g for g in alias_match.groups()[1:] if g is not None), "")
  990. parsed_fields.append({
  991. "expression": field_expr,
  992. "alias": field_alias
  993. })
  994. else:
  995. # 没有别名的情况
  996. parsed_fields.append({
  997. "expression": field.strip(),
  998. "alias": None
  999. })
  1000. # 提取FROM子句和表名
  1001. from_pattern = r'FROM\s+(.*?)(?:\s+WHERE|\s+GROUP|\s+HAVING|\s+ORDER|\s+LIMIT|$)'
  1002. from_match = re.search(from_pattern, sql_query, re.IGNORECASE | re.DOTALL)
  1003. tables = []
  1004. if from_match:
  1005. from_clause = from_match.group(1).strip()
  1006. # 分析FROM子句中的表
  1007. table_pattern = r'(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))(?:\s+(?:AS\s+)?(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))?'
  1008. for match in re.finditer(table_pattern, from_clause):
  1009. table_name = match.group(1) or match.group(2) or match.group(3) or match.group(4)
  1010. if table_name:
  1011. tables.append(table_name)
  1012. return tables
  1013. except Exception as e:
  1014. logger.error(f"解析SELECT查询语句失败: {str(e)}")
  1015. # logger.error(traceback.format_exc())
  1016. return None
  1017. def model_resource_list(page, page_size, name_filter=None):
  1018. """获取模型资源列表"""
  1019. try:
  1020. with neo4j_driver.get_session() as session:
  1021. # 构建查询条件
  1022. match_clause = "MATCH (n:model_resource)"
  1023. where_clause = ""
  1024. if name_filter:
  1025. where_clause = f" WHERE n.name CONTAINS '{name_filter}'"
  1026. # 计算总数
  1027. count_cypher = f"{match_clause}{where_clause} RETURN count(n) as count"
  1028. count_result = session.run(count_cypher)
  1029. total_count = count_result.single()["count"]
  1030. # 分页查询
  1031. skip = (page - 1) * page_size
  1032. cypher = f"""
  1033. {match_clause}{where_clause}
  1034. RETURN n
  1035. ORDER BY n.createTime DESC
  1036. SKIP {skip} LIMIT {page_size}
  1037. """
  1038. result = session.run(cypher)
  1039. # 格式化结果
  1040. resources = []
  1041. for record in result:
  1042. node = serialize_node_properties(record["n"])
  1043. node["id"] = record["n"].id
  1044. resources.append(node)
  1045. return resources, total_count
  1046. except Exception as e:
  1047. logger.error(f"获取模型资源列表失败: {str(e)}")
  1048. return [], 0
  1049. def data_resource_edit(data):
  1050. """编辑数据资源"""
  1051. try:
  1052. resource_id = data.get("id")
  1053. if not resource_id:
  1054. raise ValueError("缺少资源ID")
  1055. with neo4j_driver.get_session() as session:
  1056. # 更新节点属性
  1057. update_fields = {}
  1058. for key, value in data.items():
  1059. if key != "id" and key != "parsed_data" and value is not None:
  1060. update_fields[key] = value
  1061. # 记录describe字段是否存在于待更新数据中
  1062. if "describe" in data:
  1063. logger.info(f"编辑资源,describe字段将被更新为: {data.get('describe')}")
  1064. else:
  1065. logger.info("编辑资源,describe字段不在更新数据中")
  1066. # 添加更新时间
  1067. update_fields["updateTime"] = get_formatted_time()
  1068. # 构建更新语句
  1069. set_clause = ", ".join([f"n.{k} = ${k}" for k in update_fields.keys()])
  1070. cypher = f"""
  1071. MATCH (n:DataResource)
  1072. WHERE id(n) = $resource_id
  1073. SET {set_clause}
  1074. RETURN n
  1075. """
  1076. result = session.run(cypher, resource_id=int(resource_id), **update_fields)
  1077. updated_node = result.single()
  1078. if not updated_node:
  1079. raise ValueError("资源不存在")
  1080. # 记录更新后的节点数据
  1081. logger.info(f"更新后的节点数据,describe字段: {updated_node['n'].get('describe')}")
  1082. # 处理标签关系
  1083. tag_id = data.get("tag")
  1084. if tag_id:
  1085. # 删除旧的标签关系
  1086. delete_rel_cypher = """
  1087. MATCH (n:DataResource)-[r:label]->()
  1088. WHERE id(n) = $resource_id
  1089. DELETE r
  1090. """
  1091. session.run(delete_rel_cypher, resource_id=int(resource_id))
  1092. # 创建新的标签关系
  1093. create_rel_cypher = """
  1094. MATCH (n:DataResource), (t:DataLabel)
  1095. WHERE id(n) = $resource_id AND id(t) = $tag_id
  1096. CREATE (n)-[r:label]->(t)
  1097. RETURN r
  1098. """
  1099. session.run(create_rel_cypher, resource_id=int(resource_id), tag_id=int(tag_id))
  1100. # 处理元数据关系
  1101. parsed_data = data.get("parsed_data", [])
  1102. if parsed_data:
  1103. # 删除旧的元数据关系
  1104. delete_meta_cypher = """
  1105. MATCH (n:DataResource)-[r:contain]->()
  1106. WHERE id(n) = $resource_id
  1107. DELETE r
  1108. """
  1109. session.run(delete_meta_cypher, resource_id=int(resource_id))
  1110. # 删除旧的清洗资源关系
  1111. delete_clean_cypher = """
  1112. MATCH (n:DataResource)-[r:clean_resource]->()
  1113. WHERE id(n) = $resource_id
  1114. DELETE r
  1115. """
  1116. session.run(delete_clean_cypher, resource_id=int(resource_id))
  1117. # 创建新的元数据关系和相关关系
  1118. for meta in parsed_data:
  1119. meta_id = meta.get("id")
  1120. if meta_id:
  1121. # 创建元数据关系
  1122. create_meta_cypher = """
  1123. MATCH (n:DataResource), (m:DataMeta)
  1124. WHERE id(n) = $resource_id AND id(m) = $meta_id
  1125. CREATE (n)-[r:contain]->(m)
  1126. RETURN r
  1127. """
  1128. session.run(create_meta_cypher, resource_id=int(resource_id), meta_id=int(meta_id))
  1129. # 处理主数据关系
  1130. master_data = meta.get("master_data")
  1131. if master_data:
  1132. # 创建主数据关系
  1133. create_master_cypher = """
  1134. MATCH (master), (meta:DataMeta)
  1135. WHERE id(master) = $master_id AND id(meta) = $meta_id
  1136. MERGE (master)-[r:master]->(meta)
  1137. RETURN r
  1138. """
  1139. session.run(create_master_cypher, master_id=int(master_data), meta_id=int(meta_id))
  1140. # 处理数据标准关系
  1141. data_standard = meta.get("data_standard")
  1142. if data_standard and isinstance(data_standard, dict) and data_standard.get("id"):
  1143. standard_id = data_standard.get("id")
  1144. # 创建数据标准与元数据的关系
  1145. create_standard_meta_cypher = """
  1146. MATCH (standard), (meta:DataMeta)
  1147. WHERE id(standard) = $standard_id AND id(meta) = $meta_id
  1148. MERGE (standard)-[r:clean_resource]->(meta)
  1149. RETURN r
  1150. """
  1151. session.run(create_standard_meta_cypher, standard_id=int(standard_id), meta_id=int(meta_id))
  1152. # 创建数据资源与数据标准的关系
  1153. create_resource_standard_cypher = """
  1154. MATCH (resource:DataResource), (standard)
  1155. WHERE id(resource) = $resource_id AND id(standard) = $standard_id
  1156. MERGE (resource)-[r:clean_resource]->(standard)
  1157. RETURN r
  1158. """
  1159. session.run(create_resource_standard_cypher, resource_id=int(resource_id), standard_id=int(standard_id))
  1160. # 返回更新后的节点
  1161. node_data = serialize_node_properties(updated_node["n"])
  1162. node_data["id"] = updated_node["n"].id
  1163. # 记录最终返回的describe字段
  1164. logger.info(f"data_resource_edit返回数据,describe字段: {node_data.get('describe')}")
  1165. return node_data
  1166. except Exception as e:
  1167. logger.error(f"编辑数据资源失败: {str(e)}")
  1168. raise
  1169. def handle_data_source(data_source):
  1170. """处理数据源信息,创建或获取数据源节点"""
  1171. try:
  1172. with neo4j_driver.get_session() as session:
  1173. # 获取英文名称作为唯一标识
  1174. ds_en_name = data_source.get("en_name")
  1175. if not ds_en_name:
  1176. logger.error("数据源缺少必要的en_name属性")
  1177. return None
  1178. # 如果没有设置name,使用en_name作为name
  1179. if "name" not in data_source or not data_source["name"]:
  1180. data_source["name"] = ds_en_name
  1181. # 检查必填字段
  1182. required_fields = ["type", "host", "port", "database", "username"]
  1183. has_required_fields = all(data_source.get(field) for field in required_fields)
  1184. # 查询是否已存在相同en_name的数据源
  1185. existing_cypher = """
  1186. MATCH (ds:DataSource {en_name: $en_name})
  1187. RETURN ds
  1188. """
  1189. existing_result = session.run(existing_cypher, en_name=ds_en_name)
  1190. existing_record = existing_result.single()
  1191. if existing_record:
  1192. existing_data_source = serialize_node_properties(existing_record["ds"])
  1193. logger.info(f"根据名称找到现有数据源: {existing_data_source.get('en_name')}")
  1194. return existing_data_source.get("en_name")
  1195. else:
  1196. # 数据源不存在,抛出异常
  1197. raise ValueError(f"未找到名称为 {ds_en_name} 的数据源,请先创建该数据源或提供完整的数据源信息")
  1198. except Exception as e:
  1199. logger.error(f"处理数据源失败: {str(e)}")
  1200. raise RuntimeError(f"处理数据源失败: {str(e)}")