resource.py 50 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233
  1. import json
  2. import re
  3. import logging
  4. from py2neo import Relationship
  5. import pandas as pd
  6. from app.services.neo4j_driver import neo4j_driver
  7. from app.services.package_function import create_or_get_node, relationship_exists, get_node
  8. import time
  9. logger = logging.getLogger("app")
  10. def get_formatted_time():
  11. """获取格式化的当前时间"""
  12. return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  13. def get_node_by_id(label, id):
  14. """根据ID获取指定标签的节点"""
  15. try:
  16. with neo4j_driver.get_session() as session:
  17. # 确保id为整数
  18. try:
  19. id_int = int(id)
  20. except (ValueError, TypeError):
  21. logger.error(f"节点ID不是有效的整数: {id}")
  22. return None
  23. cypher = f"MATCH (n:{label}) WHERE id(n) = $id RETURN n"
  24. result = session.run(cypher, id=id_int)
  25. record = result.single()
  26. return record["n"] if record else None
  27. except Exception as e:
  28. logger.error(f"根据ID获取节点失败: {str(e)}")
  29. return None
  30. def get_node_by_id_no_label(id):
  31. """根据ID获取节点,不限制标签"""
  32. try:
  33. with neo4j_driver.get_session() as session:
  34. # 确保id为整数
  35. try:
  36. id_int = int(id)
  37. except (ValueError, TypeError):
  38. logger.error(f"节点ID不是有效的整数: {id}")
  39. return None
  40. cypher = "MATCH (n) WHERE id(n) = $id RETURN n"
  41. result = session.run(cypher, id=id_int)
  42. record = result.single()
  43. return record["n"] if record else None
  44. except Exception as e:
  45. logger.error(f"根据ID获取节点失败: {str(e)}")
  46. return None
  47. def delete_relationships(start_node, rel_type=None, end_node=None):
  48. """删除关系"""
  49. try:
  50. with neo4j_driver.get_session() as session:
  51. if rel_type and end_node:
  52. cypher = "MATCH (a)-[r:`{rel_type}`]->(b) WHERE id(a) = $start_id AND id(b) = $end_id DELETE r"
  53. cypher = cypher.replace("{rel_type}", rel_type)
  54. session.run(cypher, start_id=start_node.id, end_id=end_node.id)
  55. elif rel_type:
  56. cypher = "MATCH (a)-[r:`{rel_type}`]->() WHERE id(a) = $start_id DELETE r"
  57. cypher = cypher.replace("{rel_type}", rel_type)
  58. session.run(cypher, start_id=start_node.id)
  59. else:
  60. cypher = "MATCH (a)-[r]->() WHERE id(a) = $start_id DELETE r"
  61. session.run(cypher, start_id=start_node.id)
  62. return True
  63. except Exception as e:
  64. logger.error(f"删除关系失败: {str(e)}")
  65. return False
  66. def update_or_create_node(label, **properties):
  67. """更新或创建节点"""
  68. try:
  69. with neo4j_driver.get_session() as session:
  70. node_id = properties.pop('id', None)
  71. if node_id:
  72. # 更新现有节点
  73. set_clause = ", ".join([f"n.{k} = ${k}" for k in properties.keys()])
  74. cypher = f"MATCH (n:{label}) WHERE id(n) = $id SET {set_clause} RETURN n"
  75. result = session.run(cypher, id=int(node_id), **properties)
  76. else:
  77. # 创建新节点
  78. props_str = ", ".join([f"{k}: ${k}" for k in properties.keys()])
  79. cypher = f"CREATE (n:{label} {{{props_str}}}) RETURN n"
  80. result = session.run(cypher, **properties)
  81. record = result.single()
  82. return record["n"] if record else None
  83. except Exception as e:
  84. logger.error(f"更新或创建节点失败: {str(e)}")
  85. return None
  86. # 数据资源-元数据 关系节点创建、查询
  87. def handle_node(receiver, head_data, data_source=None, resource_type=None):
  88. """处理数据资源节点创建和关系建立"""
  89. try:
  90. # 根据resource_type设置type属性的值
  91. if resource_type == 'ddl':
  92. type_value = 'ddl'
  93. else:
  94. type_value = 'structure'
  95. # 更新属性
  96. update_attributes = {
  97. 'en_name': receiver.get('en_name', receiver.get('name', '')),
  98. 'time': get_formatted_time(),
  99. 'type': type_value # 根据资源类型设置不同的type值
  100. }
  101. if 'additional_info' in receiver:
  102. del receiver['additional_info']
  103. # 从receiver中移除data_source属性,避免将复杂对象作为节点属性
  104. if 'data_source' in receiver:
  105. del receiver['data_source']
  106. tag_list = receiver.get('tag')
  107. receiver.update(update_attributes)
  108. # 创建或获取 data_resource 节点
  109. with neo4j_driver.get_session() as session:
  110. props_str = ", ".join([f"{k}: ${k}" for k in receiver.keys()])
  111. cypher = f"""
  112. MERGE (n:data_resource {{name: $name}})
  113. ON CREATE SET n = {{{props_str}}}
  114. ON MATCH SET {", ".join([f"n.{k} = ${k}" for k in receiver.keys()])}
  115. RETURN n
  116. """
  117. result = session.run(cypher, **receiver)
  118. data_resource_node = result.single()["n"]
  119. resource_id = data_resource_node.id # 使用id属性获取数值ID
  120. # 处理标签关系
  121. if tag_list:
  122. tag_node = get_node_by_id('data_label', tag_list)
  123. if tag_node:
  124. # 检查关系是否存在
  125. rel_check = """
  126. MATCH (a:data_resource)-[r:label]->(b:data_label)
  127. WHERE id(a) = $resource_id AND id(b) = $tag_id
  128. RETURN r
  129. """
  130. rel_result = session.run(rel_check, resource_id=resource_id, tag_id=tag_node.id) # 使用数值id
  131. # 如果关系不存在则创建
  132. if not rel_result.single():
  133. rel_create = """
  134. MATCH (a:data_resource), (b:data_label)
  135. WHERE id(a) = $resource_id AND id(b) = $tag_id
  136. CREATE (a)-[r:label]->(b)
  137. RETURN r
  138. """
  139. session.run(rel_create, resource_id=resource_id, tag_id=tag_node.id)
  140. # 处理头部数据(元数据,字段)
  141. if head_data:
  142. for item in head_data:
  143. # 创建元数据节点
  144. meta_cypher = """
  145. MERGE (m:meta_data {name: $name})
  146. ON CREATE SET m.en_name = $en_name,
  147. m.create_time = $create_time,
  148. m.data_type = $type,
  149. m.status = 'true'
  150. ON MATCH SET m.data_type = $type,
  151. m.status = 'true'
  152. RETURN m
  153. """
  154. create_time = get_formatted_time()
  155. meta_result = session.run(
  156. meta_cypher,
  157. name=item['name'],
  158. en_name=item['en_name'],
  159. create_time=create_time,
  160. type=item['data_type'] # 使用data_type作为data_type属性
  161. )
  162. meta_record = meta_result.single()
  163. if meta_record and meta_record["m"]:
  164. meta_node = meta_record["m"]
  165. meta_id = meta_node.id # 使用数值ID
  166. # 打印日志确认节点创建成功和ID
  167. logger.info(f"创建或获取到元数据节点: ID={meta_id}, name={item['name']}")
  168. # 确认数据资源节点是否可以正确查询到
  169. check_resource_cypher = """
  170. MATCH (n:data_resource)
  171. WHERE id(n) = $resource_id
  172. RETURN n
  173. """
  174. check_resource = session.run(check_resource_cypher, resource_id=resource_id)
  175. if check_resource.single():
  176. logger.info(f"找到数据资源节点: ID={resource_id}")
  177. else:
  178. logger.error(f"无法找到数据资源节点: ID={resource_id}")
  179. continue
  180. # 创建关系
  181. rel_cypher = """
  182. MATCH (a:data_resource), (m:meta_data)
  183. WHERE id(a) = $resource_id AND id(m) = $meta_id
  184. MERGE (a)-[r:contain]->(m)
  185. RETURN r
  186. """
  187. rel_result = session.run(
  188. rel_cypher,
  189. resource_id=resource_id,
  190. meta_id=meta_id
  191. )
  192. rel_record = rel_result.single()
  193. if rel_record:
  194. logger.info(f"成功创建数据资源与元数据的关系: {resource_id} -> {meta_id}")
  195. else:
  196. logger.warning(f"创建数据资源与元数据的关系失败: {resource_id} -> {meta_id}")
  197. else:
  198. logger.error(f"未能创建或获取元数据节点: {item['name']}")
  199. # 处理数据源关系
  200. if data_source and resource_type == 'ddl':
  201. try:
  202. # 创建或获取数据源节点
  203. # data_source_en_name = handle_data_source(data_source)
  204. data_source_en_name = data_source['en_name']
  205. # 创建数据资源与数据源的关系
  206. if data_source_en_name:
  207. # 创建 originates_from 关系
  208. rel_data_source_cypher = """
  209. MATCH (a:data_resource), (b:DataSource)
  210. WHERE id(a) = $resource_id AND b.en_name = $ds_en_name
  211. MERGE (a)-[r:originates_from]->(b)
  212. RETURN r
  213. """
  214. rel_result = session.run(
  215. rel_data_source_cypher,
  216. resource_id=resource_id,
  217. ds_en_name=data_source_en_name
  218. )
  219. rel_record = rel_result.single()
  220. if rel_record:
  221. logger.info(f"已创建数据资源与数据源的关系: {resource_id} -> {data_source_en_name}")
  222. else:
  223. # 添加严重错误日志
  224. error_msg = f"创建数据资源与数据源的关系失败: {resource_id} -> {data_source_en_name}"
  225. logger.error(error_msg)
  226. # 检查数据源节点是否存在
  227. check_ds_cypher = "MATCH (b:DataSource) WHERE b.en_name = $ds_en_name RETURN b"
  228. check_ds_result = session.run(check_ds_cypher, ds_en_name=data_source_en_name)
  229. if not check_ds_result.single():
  230. logger.error(f"数据源节点不存在: en_name={data_source_en_name}")
  231. # 检查数据资源节点是否存在
  232. check_res_cypher = "MATCH (a:data_resource) WHERE id(a) = $resource_id RETURN a"
  233. check_res_result = session.run(check_res_cypher, resource_id=resource_id)
  234. if not check_res_result.single():
  235. logger.error(f"数据资源节点不存在: id={resource_id}")
  236. # 严重错误应该抛出异常
  237. raise RuntimeError(error_msg)
  238. except Exception as e:
  239. logger.error(f"处理数据源关系失败: {str(e)}")
  240. raise RuntimeError(f"处理数据源关系失败: {str(e)}")
  241. return resource_id
  242. except Exception as e:
  243. logger.error(f"处理数据资源节点创建和关系建立失败: {str(e)}")
  244. raise
  245. def handle_id_resource(resource_id):
  246. """处理单个数据资源查询"""
  247. try:
  248. with neo4j_driver.get_session() as session:
  249. # 确保resource_id为整数
  250. try:
  251. resource_id_int = int(resource_id)
  252. except (ValueError, TypeError):
  253. logger.error(f"资源ID不是有效的整数: {resource_id}")
  254. return None
  255. # 使用数值ID查询
  256. cypher = """
  257. MATCH (n:data_resource)
  258. WHERE id(n) = $resource_id
  259. RETURN n
  260. """
  261. result = session.run(cypher, resource_id=resource_id_int)
  262. record = result.single()
  263. if not record:
  264. logger.error(f"未找到资源,ID: {resource_id_int}")
  265. return None
  266. # 构建返回数据
  267. data_resource = dict(record["n"])
  268. data_resource["id"] = record["n"].id
  269. # 查询关联的标签
  270. tag_cypher = """
  271. MATCH (n:data_resource)-[r:label]->(t:data_label)
  272. WHERE id(n) = $resource_id
  273. RETURN t
  274. """
  275. tag_result = session.run(tag_cypher, resource_id=resource_id_int)
  276. tag_record = tag_result.single()
  277. if tag_record:
  278. tag = dict(tag_record["t"])
  279. tag["id"] = tag_record["t"].id
  280. data_resource["tag_info"] = tag
  281. # 查询关联的元数据 - 支持meta_data和Metadata两种标签
  282. meta_cypher = """
  283. MATCH (n:data_resource)-[:contain]->(m)
  284. WHERE id(n) = $resource_id
  285. AND (m:meta_data OR m:Metadata)
  286. RETURN m
  287. """
  288. meta_result = session.run(meta_cypher, resource_id=resource_id_int)
  289. meta_list = []
  290. for meta_record in meta_result:
  291. meta = dict(meta_record["m"])
  292. meta["id"] = meta_record["m"].id
  293. meta_list.append(meta)
  294. data_resource["meta_list"] = meta_list
  295. logger.info(f"成功获取资源详情,ID: {resource_id_int}")
  296. return data_resource
  297. except Exception as e:
  298. logger.error(f"处理单个数据资源查询失败: {str(e)}")
  299. return None
  300. def id_resource_graph(resource_id):
  301. """获取数据资源图谱"""
  302. try:
  303. with neo4j_driver.get_session() as session:
  304. # 查询数据资源节点及其关系
  305. cypher = """
  306. MATCH (n:data_resource)-[r]-(m)
  307. WHERE id(n) = $resource_id
  308. RETURN n, r, m
  309. """
  310. result = session.run(cypher, resource_id=int(resource_id))
  311. # 收集节点和关系
  312. nodes = {}
  313. relationships = []
  314. for record in result:
  315. # 处理源节点
  316. source_node = dict(record["n"])
  317. source_node["id"] = record["n"].id
  318. nodes[source_node["id"]] = source_node
  319. # 处理目标节点
  320. target_node = dict(record["m"])
  321. target_node["id"] = record["m"].id
  322. nodes[target_node["id"]] = target_node
  323. # 处理关系
  324. rel = record["r"]
  325. relationship = {
  326. "id": rel.id,
  327. "source": record["n"].id,
  328. "target": record["m"].id,
  329. "type": rel.type
  330. }
  331. relationships.append(relationship)
  332. return {
  333. "nodes": list(nodes.values()),
  334. "relationships": relationships
  335. }
  336. except Exception as e:
  337. logger.error(f"获取数据资源图谱失败: {str(e)}")
  338. return {"nodes": [], "relationships": []}
  339. def resource_list(page, page_size, en_name_filter=None, name_filter=None,
  340. type_filter='all', category_filter=None, tag_filter=None):
  341. """获取数据资源列表"""
  342. try:
  343. with neo4j_driver.get_session() as session:
  344. # 构建查询条件
  345. match_clause = "MATCH (n:data_resource)"
  346. where_conditions = []
  347. if en_name_filter:
  348. where_conditions.append(f"n.en_name CONTAINS '{en_name_filter}'")
  349. if name_filter:
  350. where_conditions.append(f"n.name CONTAINS '{name_filter}'")
  351. if type_filter and type_filter != 'all':
  352. where_conditions.append(f"n.type = '{type_filter}'")
  353. if category_filter:
  354. where_conditions.append(f"n.category = '{category_filter}'")
  355. # 标签过滤需要额外的匹配
  356. if tag_filter:
  357. match_clause += "-[:label]->(t:data_label)"
  358. where_conditions.append(f"t.name = '{tag_filter}'")
  359. where_clause = " WHERE " + " AND ".join(where_conditions) if where_conditions else ""
  360. # 计算总数
  361. count_cypher = f"{match_clause}{where_clause} RETURN count(n) as count"
  362. count_result = session.run(count_cypher)
  363. total_count = count_result.single()["count"]
  364. # 分页查询
  365. skip = (page - 1) * page_size
  366. cypher = f"""
  367. {match_clause}{where_clause}
  368. RETURN n
  369. ORDER BY n.time DESC
  370. SKIP {skip} LIMIT {page_size}
  371. """
  372. result = session.run(cypher)
  373. # 格式化结果
  374. resources = []
  375. for record in result:
  376. node = dict(record["n"])
  377. node["id"] = record["n"].id
  378. # 查询关联的标签
  379. tag_cypher = """
  380. MATCH (n:data_resource)-[r:label]->(t:data_label)
  381. WHERE id(n) = $resource_id
  382. RETURN t
  383. """
  384. tag_result = session.run(tag_cypher, resource_id=node["id"])
  385. tag_record = tag_result.single()
  386. if tag_record:
  387. tag = dict(tag_record["t"])
  388. tag["id"] = tag_record["t"].id
  389. node["tag_info"] = tag
  390. resources.append(node)
  391. return resources, total_count
  392. except Exception as e:
  393. logger.error(f"获取数据资源列表失败: {str(e)}")
  394. return [], 0
  395. def id_data_search_list(resource_id, page, page_size, en_name_filter=None,
  396. name_filter=None, category_filter=None, tag_filter=None):
  397. """获取特定数据资源关联的元数据列表"""
  398. try:
  399. with neo4j_driver.get_session() as session:
  400. # 确保resource_id为整数
  401. try:
  402. resource_id_int = int(resource_id)
  403. except (ValueError, TypeError):
  404. logger.error(f"资源ID不是有效的整数: {resource_id}")
  405. return [], 0
  406. # 基本匹配语句 - 支持meta_data和Metadata标签
  407. match_clause = """
  408. MATCH (n:data_resource)-[:contain]->(m)
  409. WHERE id(n) = $resource_id
  410. AND (m:meta_data OR m:Metadata)
  411. """
  412. where_conditions = []
  413. if en_name_filter:
  414. where_conditions.append(f"m.en_name CONTAINS '{en_name_filter}'")
  415. if name_filter:
  416. where_conditions.append(f"m.name CONTAINS '{name_filter}'")
  417. if category_filter:
  418. where_conditions.append(f"m.category = '{category_filter}'")
  419. # 标签过滤需要额外的匹配
  420. tag_match = ""
  421. if tag_filter:
  422. tag_match = "MATCH (m)-[:HAS_TAG]->(t:Tag) WHERE t.name = $tag_filter"
  423. where_clause = " AND " + " AND ".join(where_conditions) if where_conditions else ""
  424. # 计算总数
  425. count_cypher = f"""
  426. {match_clause}{where_clause}
  427. {tag_match}
  428. RETURN count(m) as count
  429. """
  430. count_params = {"resource_id": resource_id_int}
  431. if tag_filter:
  432. count_params["tag_filter"] = tag_filter
  433. count_result = session.run(count_cypher, **count_params)
  434. total_count = count_result.single()["count"]
  435. # 分页查询
  436. skip = (page - 1) * page_size
  437. cypher = f"""
  438. {match_clause}{where_clause}
  439. {tag_match}
  440. RETURN m
  441. ORDER BY m.name
  442. SKIP {skip} LIMIT {page_size}
  443. """
  444. result = session.run(cypher, **count_params)
  445. # 格式化结果
  446. metadata_list = []
  447. for record in result:
  448. meta = dict(record["m"])
  449. meta["id"] = record["m"].id
  450. metadata_list.append(meta)
  451. logger.info(f"成功获取资源关联元数据,ID: {resource_id_int}, 元数据数量: {total_count}")
  452. return metadata_list, total_count
  453. except Exception as e:
  454. logger.error(f"获取数据资源关联的元数据列表失败: {str(e)}")
  455. return [], 0
  456. def resource_kinship_graph(resource_id, include_meta=True):
  457. """获取数据资源亲缘关系图谱"""
  458. try:
  459. with neo4j_driver.get_session() as session:
  460. # 确保resource_id为整数
  461. try:
  462. resource_id_int = int(resource_id)
  463. except (ValueError, TypeError):
  464. logger.error(f"资源ID不是有效的整数: {resource_id}")
  465. return {"nodes": [], "relationships": []}
  466. # 基本查询
  467. cypher_parts = [
  468. f"MATCH (n:data_resource) WHERE id(n) = $resource_id",
  469. "OPTIONAL MATCH (n)-[:label]->(l:data_label)",
  470. ]
  471. # 是否包含元数据 - 支持meta_data和Metadata两种标签
  472. if include_meta:
  473. cypher_parts.append("OPTIONAL MATCH (n)-[:contain]->(m) WHERE (m:meta_data OR m:Metadata)")
  474. cypher_parts.append("RETURN n, l, collect(m) as metadata")
  475. cypher = "\n".join(cypher_parts)
  476. result = session.run(cypher, resource_id=resource_id_int)
  477. record = result.single()
  478. if not record:
  479. logger.error(f"未找到资源图谱数据,ID: {resource_id_int}")
  480. return {"nodes": [], "relationships": []}
  481. # 收集节点和关系
  482. nodes = {}
  483. relationships = []
  484. # 处理数据资源节点
  485. resource_node = dict(record["n"])
  486. resource_node["id"] = record["n"].id
  487. resource_node["labels"] = list(record["n"].labels)
  488. nodes[resource_node["id"]] = resource_node
  489. # 处理标签节点
  490. if record["l"]:
  491. label_node = dict(record["l"])
  492. label_node["id"] = record["l"].id
  493. label_node["labels"] = list(record["l"].labels)
  494. nodes[label_node["id"]] = label_node
  495. # 添加资源-标签关系
  496. relationships.append({
  497. "id": f"rel-{resource_node['id']}-label-{label_node['id']}",
  498. "source": resource_node["id"],
  499. "target": label_node["id"],
  500. "type": "label"
  501. })
  502. # 处理元数据节点
  503. if include_meta and record["metadata"]:
  504. for meta in record["metadata"]:
  505. if meta: # 检查元数据节点是否存在
  506. meta_node = dict(meta)
  507. meta_node["id"] = meta.id
  508. meta_node["labels"] = list(meta.labels)
  509. nodes[meta_node["id"]] = meta_node
  510. # 添加资源-元数据关系
  511. relationships.append({
  512. "id": f"rel-{resource_node['id']}-contain-{meta_node['id']}",
  513. "source": resource_node["id"],
  514. "target": meta_node["id"],
  515. "type": "contain"
  516. })
  517. logger.info(f"成功获取资源图谱,ID: {resource_id_int}, 节点数: {len(nodes)}")
  518. return {
  519. "nodes": list(nodes.values()),
  520. "relationships": relationships
  521. }
  522. except Exception as e:
  523. logger.error(f"获取数据资源亲缘关系图谱失败: {str(e)}")
  524. return {"nodes": [], "relationships": []}
  525. def resource_impact_all_graph(resource_id, include_meta=True):
  526. """获取数据资源影响关系图谱"""
  527. try:
  528. with neo4j_driver.get_session() as session:
  529. # 确保resource_id为整数
  530. try:
  531. resource_id_int = int(resource_id)
  532. except (ValueError, TypeError):
  533. logger.error(f"资源ID不是有效的整数: {resource_id}")
  534. return {"nodes": [], "relationships": []}
  535. # 根据meta参数决定查询深度
  536. if include_meta:
  537. cypher = """
  538. MATCH path = (n:data_resource)-[*1..3]-(m)
  539. WHERE id(n) = $resource_id
  540. RETURN path
  541. """
  542. else:
  543. cypher = """
  544. MATCH path = (n:data_resource)-[*1..2]-(m)
  545. WHERE id(n) = $resource_id
  546. AND NOT (m:meta_data) AND NOT (m:Metadata)
  547. RETURN path
  548. """
  549. result = session.run(cypher, resource_id=resource_id_int)
  550. # 收集节点和关系
  551. nodes = {}
  552. relationships = {}
  553. for record in result:
  554. path = record["path"]
  555. # 处理路径中的所有节点
  556. for node in path.nodes:
  557. if node.id not in nodes:
  558. node_dict = dict(node)
  559. node_dict["id"] = node.id
  560. node_dict["labels"] = list(node.labels)
  561. nodes[node.id] = node_dict
  562. # 处理路径中的所有关系
  563. for rel in path.relationships:
  564. if rel.id not in relationships:
  565. rel_dict = {
  566. "id": rel.id,
  567. "source": rel.start_node.id,
  568. "target": rel.end_node.id,
  569. "type": rel.type
  570. }
  571. relationships[rel.id] = rel_dict
  572. logger.info(f"成功获取完整图谱,ID: {resource_id_int}, 节点数: {len(nodes)}")
  573. return {
  574. "nodes": list(nodes.values()),
  575. "relationships": list(relationships.values())
  576. }
  577. except Exception as e:
  578. logger.error(f"获取数据资源影响关系图谱失败: {str(e)}")
  579. return {"nodes": [], "relationships": []}
  580. def clean_type(type_str):
  581. """清洗SQL类型字符串"""
  582. # 提取基本类型,不包括长度或精度信息
  583. basic_type = re.sub(r'\(.*?\)', '', type_str).strip().upper()
  584. # 移除 VARYING 这样的后缀
  585. basic_type = re.sub(r'\s+VARYING$', '', basic_type)
  586. # 标准化常见类型
  587. type_mapping = {
  588. 'INT': 'INTEGER',
  589. 'INT4': 'INTEGER',
  590. 'INT8': 'BIGINT',
  591. 'SMALLINT': 'SMALLINT',
  592. 'BIGINT': 'BIGINT',
  593. 'FLOAT4': 'FLOAT',
  594. 'FLOAT8': 'DOUBLE',
  595. 'REAL': 'FLOAT',
  596. 'DOUBLE PRECISION': 'DOUBLE',
  597. 'NUMERIC': 'DECIMAL',
  598. 'BOOL': 'BOOLEAN',
  599. 'CHARACTER': 'CHAR',
  600. 'CHAR VARYING': 'VARCHAR',
  601. 'CHARACTER VARYING': 'VARCHAR',
  602. 'TEXT': 'TEXT',
  603. 'DATE': 'DATE',
  604. 'TIME': 'TIME',
  605. 'TIMESTAMP': 'TIMESTAMP',
  606. 'TIMESTAMPTZ': 'TIMESTAMP WITH TIME ZONE',
  607. 'BYTEA': 'BINARY',
  608. 'JSON': 'JSON',
  609. 'JSONB': 'JSONB',
  610. 'UUID': 'UUID',
  611. 'SERIAL': 'SERIAL',
  612. 'SERIAL4': 'SERIAL',
  613. 'SERIAL8': 'BIGSERIAL',
  614. 'BIGSERIAL': 'BIGSERIAL'
  615. }
  616. # 尝试从映射表中获取标准化的类型
  617. return type_mapping.get(basic_type, basic_type)
  618. def clean_field_name(field_name):
  619. """清洗字段名"""
  620. return field_name.strip('`').strip('"').strip("'")
  621. def select_create_ddl(sql_content):
  622. """从SQL内容中提取创建表的DDL语句"""
  623. try:
  624. # 解析复杂的SQL文件,识别所有的CREATE TABLE语句及其关联的注释
  625. # 找到所有以CREATE TABLE开头的语句块,每个语句块包含主语句和相关的注释
  626. # 首先,分割 SQL 内容按分号
  627. statements = []
  628. current_statement = ""
  629. in_string = False
  630. string_quote = None
  631. for char in sql_content:
  632. if char in ["'", '"']:
  633. if not in_string:
  634. in_string = True
  635. string_quote = char
  636. elif char == string_quote:
  637. in_string = False
  638. string_quote = None
  639. current_statement += char
  640. elif char == ';' and not in_string:
  641. current_statement += char
  642. if current_statement.strip():
  643. statements.append(current_statement.strip())
  644. current_statement = ""
  645. else:
  646. current_statement += char
  647. if current_statement.strip():
  648. statements.append(current_statement.strip())
  649. # 找出所有的CREATE TABLE语句和关联的注释
  650. create_table_statements = []
  651. create_index = -1
  652. in_table_block = False
  653. current_table = None
  654. current_block = ""
  655. for i, stmt in enumerate(statements):
  656. if re.search(r'^\s*CREATE\s+TABLE', stmt, re.IGNORECASE):
  657. # 如果已经在处理表,先保存当前块
  658. if in_table_block and current_block:
  659. create_table_statements.append(current_block)
  660. # 开始新的表块
  661. in_table_block = True
  662. current_block = stmt
  663. # 提取表名
  664. table_match = re.search(r'CREATE\s+TABLE\s+(?:(?:"[^"]+"|\'[^\']+\'|[^"\'\s\.]+)\.)?(?:"([^"]+)"|\'([^\']+)\'|([^"\'\s\(]+))', stmt, re.IGNORECASE)
  665. if table_match:
  666. current_table = table_match.group(1) or table_match.group(2) or table_match.group(3)
  667. current_table = current_table.strip('"\'') if current_table else ""
  668. elif in_table_block and (re.search(r'COMMENT\s+ON\s+TABLE', stmt, re.IGNORECASE) or
  669. re.search(r'COMMENT\s+ON\s+COLUMN', stmt, re.IGNORECASE)):
  670. # 检查注释是否属于当前表
  671. if current_table:
  672. # 表注释处理
  673. if re.search(r'COMMENT\s+ON\s+TABLE', stmt, re.IGNORECASE):
  674. table_comment_match = re.search(r'COMMENT\s+ON\s+TABLE\s+[\'"]?(\w+)[\'"]?', stmt, re.IGNORECASE)
  675. if table_comment_match:
  676. comment_table = table_comment_match.group(1).strip('"\'')
  677. if comment_table == current_table:
  678. current_block += " " + stmt
  679. else:
  680. # 这是另一个表的注释,当前表的DDL到此结束
  681. create_table_statements.append(current_block)
  682. in_table_block = False
  683. current_block = ""
  684. current_table = None
  685. # 列注释处理
  686. elif re.search(r'COMMENT\s+ON\s+COLUMN', stmt, re.IGNORECASE):
  687. column_comment_match = re.search(
  688. r'COMMENT\s+ON\s+COLUMN\s+[\'"]?(\w+)[\'"]?\.[\'"]?(\w+)[\'"]?\s+IS\s+\'([^\']+)\'',
  689. stmt,
  690. re.IGNORECASE
  691. )
  692. if column_comment_match:
  693. comment_table = column_comment_match.group(1)
  694. if comment_table == current_table:
  695. current_block += " " + stmt
  696. else:
  697. # 这是另一个表的注释,当前表的DDL到此结束
  698. create_table_statements.append(current_block)
  699. in_table_block = False
  700. current_block = ""
  701. current_table = None
  702. elif in_table_block and re.search(r'^\s*CREATE\s+', stmt, re.IGNORECASE):
  703. # 如果遇到新的CREATE语句(不是注释),保存当前块并结束
  704. create_table_statements.append(current_block)
  705. in_table_block = False
  706. current_block = ""
  707. current_table = None
  708. # 添加最后一个块
  709. if in_table_block and current_block:
  710. create_table_statements.append(current_block)
  711. # 日志记录
  712. logger.debug(f"提取到 {len(create_table_statements)} 个DDL语句")
  713. for i, stmt in enumerate(create_table_statements):
  714. logger.debug(f"DDL语句 {i+1}: {stmt}")
  715. return create_table_statements
  716. except Exception as e:
  717. logger.error(f"提取DDL语句失败: {str(e)}")
  718. # logger.error(traceback.format_exc())
  719. return []
  720. def table_sql(sql):
  721. """解析表定义SQL,支持带schema和不带schema两种格式"""
  722. try:
  723. # 支持以下格式:
  724. # 1. CREATE TABLE tablename
  725. # 2. CREATE TABLE "tablename"
  726. # 3. CREATE TABLE 'tablename'
  727. # 4. CREATE TABLE schema.tablename
  728. # 5. CREATE TABLE "schema"."tablename"
  729. # 6. CREATE TABLE 'schema'.'tablename'
  730. # 匹配表名,支持带引号和不带引号的情况
  731. table_pattern = r'CREATE\s+TABLE\s+(?:(?:"([^"]+)"|\'([^\']+)\'|([^"\'\s\.]+))\.)?(?:"([^"]+)"|\'([^\']+)\'|([^"\'\s\(]+))'
  732. table_match = re.search(table_pattern, sql, re.IGNORECASE)
  733. if not table_match:
  734. logger.error(f"无法匹配CREATE TABLE语句: {sql[:100]}...")
  735. return None
  736. # 获取表名
  737. schema = table_match.group(1) or table_match.group(2) or table_match.group(3)
  738. table_name = table_match.group(4) or table_match.group(5) or table_match.group(6)
  739. if not table_name:
  740. logger.error("无法解析表名")
  741. return None
  742. logger.debug(f"解析到表名: {table_name}")
  743. # 提取CREATE TABLE语句的主体部分(括号内的内容)
  744. body_pattern = r'CREATE\s+TABLE\s+[^(]*\((.*?)\)(?=\s*;|\s*$)'
  745. body_match = re.search(body_pattern, sql, re.DOTALL | re.IGNORECASE)
  746. if not body_match:
  747. logger.error("无法提取表主体内容")
  748. return None
  749. body_text = body_match.group(1).strip()
  750. logger.debug(f"表定义主体部分: {body_text}")
  751. # 解析字段定义
  752. fields = []
  753. # 分割字段定义,处理括号嵌套和引号
  754. field_defs = []
  755. pos = 0
  756. in_parentheses = 0
  757. in_quotes = False
  758. quote_char = None
  759. for i, char in enumerate(body_text):
  760. if char in ["'", '"', '`'] and (not in_quotes or char == quote_char):
  761. in_quotes = not in_quotes
  762. if in_quotes:
  763. quote_char = char
  764. else:
  765. quote_char = None
  766. elif char == '(' and not in_quotes:
  767. in_parentheses += 1
  768. elif char == ')' and not in_quotes:
  769. in_parentheses -= 1
  770. elif char == ',' and in_parentheses == 0 and not in_quotes:
  771. field_defs.append(body_text[pos:i].strip())
  772. pos = i + 1
  773. # 添加最后一个字段定义
  774. if pos < len(body_text):
  775. field_defs.append(body_text[pos:].strip())
  776. logger.debug(f"解析出 {len(field_defs)} 个字段定义")
  777. # 处理每个字段定义
  778. for field_def in field_defs:
  779. # 跳过约束定义
  780. if re.match(r'^\s*(?:PRIMARY|UNIQUE|FOREIGN|CHECK|CONSTRAINT)\s+', field_def, re.IGNORECASE):
  781. continue
  782. # 提取字段名和类型
  783. field_pattern = r'^\s*(?:"([^"]+)"|\'([^\']+)\'|`([^`]+)`|([a-zA-Z0-9_]+))\s+(.+?)(?:\s+DEFAULT\s+|\s+NOT\s+NULL|\s+REFERENCES|\s*$)'
  784. field_match = re.search(field_pattern, field_def, re.IGNORECASE)
  785. if field_match:
  786. # 提取字段名
  787. field_name = field_match.group(1) or field_match.group(2) or field_match.group(3) or field_match.group(4)
  788. # 提取类型
  789. field_type = field_match.group(5).strip()
  790. # 处理类型中可能的括号
  791. type_base = re.split(r'\s+', field_type)[0]
  792. clean_type_value = clean_type(type_base)
  793. fields.append((field_name, clean_type_value))
  794. logger.debug(f"解析到字段: {field_name}, 类型: {clean_type_value}")
  795. else:
  796. logger.warning(f"无法解析字段定义: {field_def}")
  797. # 提取表注释
  798. table_comment = ""
  799. table_comment_pattern = r"COMMENT\s+ON\s+TABLE\s+(?:['\"]?(\w+)['\"]?)\s+IS\s+'([^']+)'"
  800. table_comment_match = re.search(table_comment_pattern, sql, re.IGNORECASE)
  801. if table_comment_match:
  802. comment_table = table_comment_match.group(1)
  803. if comment_table.strip("'\"") == table_name.strip("'\""):
  804. table_comment = table_comment_match.group(2)
  805. logger.debug(f"找到表注释: {table_comment}")
  806. # 提取列注释
  807. comments = {}
  808. column_comment_pattern = r"COMMENT\s+ON\s+COLUMN\s+['\"]?(\w+)['\"]?\.['\"]?(\w+)['\"]?\s+IS\s+'([^']+)'"
  809. for match in re.finditer(column_comment_pattern, sql, re.IGNORECASE):
  810. comment_table = match.group(1)
  811. column_name = match.group(2)
  812. comment = match.group(3)
  813. # 检查表名是否匹配
  814. if comment_table.strip("'\"") == table_name.strip("'\""):
  815. comments[column_name] = comment
  816. logger.debug(f"找到列注释: {column_name} - {comment}")
  817. else:
  818. logger.debug(f"忽略列注释,表名不匹配: {comment_table} vs {table_name}")
  819. # 检查字段和注释匹配情况
  820. logger.debug("========字段和注释匹配情况========")
  821. field_names = [f[0] for f in fields]
  822. logger.debug(f"字段列表 ({len(field_names)}): {field_names}")
  823. logger.debug(f"注释字段 ({len(comments)}): {list(comments.keys())}")
  824. # 构建返回结果
  825. meta_list = []
  826. for field_name, field_type in fields:
  827. chinese_name = comments.get(field_name, "")
  828. meta_list.append({
  829. "en_name": field_name,
  830. "data_type": field_type,
  831. "name": chinese_name if chinese_name else field_name
  832. })
  833. # 检查表是否存在
  834. try:
  835. status = status_query([table_name])
  836. except Exception as e:
  837. logger.error(f"检查表存在状态失败: {str(e)}")
  838. status = [False]
  839. # 构建返回结果
  840. result = {
  841. table_name: {
  842. "exist": status[0] if status else False,
  843. "meta": meta_list
  844. }
  845. }
  846. logger.debug(f"解析结果: {json.dumps(result, ensure_ascii=False)}")
  847. return result
  848. except Exception as e:
  849. logger.error(f"解析表定义SQL失败: {str(e)}")
  850. logger.error(f"异常详情: {e}")
  851. import traceback
  852. logger.error(traceback.format_exc())
  853. return None
  854. # 判断英文表名是否在图谱中存在
  855. def status_query(key_list):
  856. query = """
  857. unwind $Key_list as name
  858. OPTIONAL MATCH (n:data_model {en_name: name})
  859. OPTIONAL MATCH (n:data_resource {en_name: name})
  860. OPTIONAL MATCH (n:data_metric {en_name: name})
  861. WITH name, CASE
  862. WHEN n IS NOT NULL THEN True
  863. ELSE False
  864. END AS exist
  865. return collect(exist)AS exist
  866. """
  867. with neo4j_driver.get_session() as session:
  868. result = session.run(query, Key_list=key_list)
  869. data = result.value() # 获取单个值
  870. return data
  871. def select_sql(sql_query):
  872. """解析SELECT查询语句"""
  873. try:
  874. # 提取SELECT子句
  875. select_pattern = r'SELECT\s+(.*?)\s+FROM'
  876. select_match = re.search(select_pattern, sql_query, re.IGNORECASE | re.DOTALL)
  877. if not select_match:
  878. return None
  879. select_clause = select_match.group(1)
  880. # 分割字段
  881. fields = []
  882. # 处理字段列表,避免在函数调用中的逗号导致错误分割
  883. in_parenthesis = 0
  884. current_field = ""
  885. for char in select_clause:
  886. if char == '(':
  887. in_parenthesis += 1
  888. current_field += char
  889. elif char == ')':
  890. in_parenthesis -= 1
  891. current_field += char
  892. elif char == ',' and in_parenthesis == 0:
  893. fields.append(current_field.strip())
  894. current_field = ""
  895. else:
  896. current_field += char
  897. if current_field.strip():
  898. fields.append(current_field.strip())
  899. # 解析每个字段
  900. parsed_fields = []
  901. for field in fields:
  902. # 检查是否有字段别名
  903. alias_pattern = r'(.*?)\s+[aA][sS]\s+(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))$'
  904. alias_match = re.search(alias_pattern, field)
  905. if alias_match:
  906. field_expr = alias_match.group(1).strip()
  907. field_alias = next((g for g in alias_match.groups()[1:] if g is not None), "")
  908. parsed_fields.append({
  909. "expression": field_expr,
  910. "alias": field_alias
  911. })
  912. else:
  913. # 没有别名的情况
  914. parsed_fields.append({
  915. "expression": field.strip(),
  916. "alias": None
  917. })
  918. # 提取FROM子句和表名
  919. from_pattern = r'FROM\s+(.*?)(?:\s+WHERE|\s+GROUP|\s+HAVING|\s+ORDER|\s+LIMIT|$)'
  920. from_match = re.search(from_pattern, sql_query, re.IGNORECASE | re.DOTALL)
  921. tables = []
  922. if from_match:
  923. from_clause = from_match.group(1).strip()
  924. # 分析FROM子句中的表
  925. table_pattern = r'(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))(?:\s+(?:AS\s+)?(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))?'
  926. for match in re.finditer(table_pattern, from_clause):
  927. table_name = match.group(1) or match.group(2) or match.group(3) or match.group(4)
  928. if table_name:
  929. tables.append(table_name)
  930. return tables
  931. except Exception as e:
  932. logger.error(f"解析SELECT查询语句失败: {str(e)}")
  933. # logger.error(traceback.format_exc())
  934. return None
  935. def model_resource_list(page, page_size, name_filter=None):
  936. """获取模型资源列表"""
  937. try:
  938. with neo4j_driver.get_session() as session:
  939. # 构建查询条件
  940. match_clause = "MATCH (n:model_resource)"
  941. where_clause = ""
  942. if name_filter:
  943. where_clause = f" WHERE n.name CONTAINS '{name_filter}'"
  944. # 计算总数
  945. count_cypher = f"{match_clause}{where_clause} RETURN count(n) as count"
  946. count_result = session.run(count_cypher)
  947. total_count = count_result.single()["count"]
  948. # 分页查询
  949. skip = (page - 1) * page_size
  950. cypher = f"""
  951. {match_clause}{where_clause}
  952. RETURN n
  953. ORDER BY n.createTime DESC
  954. SKIP {skip} LIMIT {page_size}
  955. """
  956. result = session.run(cypher)
  957. # 格式化结果
  958. resources = []
  959. for record in result:
  960. node = dict(record["n"])
  961. node["id"] = record["n"].id
  962. resources.append(node)
  963. return resources, total_count
  964. except Exception as e:
  965. logger.error(f"获取模型资源列表失败: {str(e)}")
  966. return [], 0
  967. def data_resource_edit(data):
  968. """编辑数据资源"""
  969. try:
  970. resource_id = data.get("id")
  971. if not resource_id:
  972. raise ValueError("缺少资源ID")
  973. with neo4j_driver.get_session() as session:
  974. # 更新节点属性
  975. update_fields = {}
  976. for key, value in data.items():
  977. if key != "id" and key != "tag":
  978. update_fields[key] = value
  979. # 添加更新时间
  980. update_fields["updateTime"] = get_formatted_time()
  981. # 构建更新语句
  982. set_clause = ", ".join([f"n.{k} = ${k}" for k in update_fields.keys()])
  983. cypher = f"""
  984. MATCH (n:data_resource)
  985. WHERE id(n) = $resource_id
  986. SET {set_clause}
  987. RETURN n
  988. """
  989. result = session.run(cypher, resource_id=int(resource_id), **update_fields)
  990. updated_node = result.single()
  991. if not updated_node:
  992. raise ValueError("资源不存在")
  993. # 处理标签关系
  994. tag_id = data.get("tag")
  995. if tag_id:
  996. # 删除旧的标签关系
  997. delete_rel_cypher = """
  998. MATCH (n:data_resource)-[r:label]->()
  999. WHERE id(n) = $resource_id
  1000. DELETE r
  1001. """
  1002. session.run(delete_rel_cypher, resource_id=int(resource_id))
  1003. # 创建新的标签关系
  1004. create_rel_cypher = """
  1005. MATCH (n:data_resource), (t:data_label)
  1006. WHERE id(n) = $resource_id AND id(t) = $tag_id
  1007. CREATE (n)-[r:label]->(t)
  1008. RETURN r
  1009. """
  1010. session.run(create_rel_cypher, resource_id=int(resource_id), tag_id=int(tag_id))
  1011. # 返回更新后的节点
  1012. return dict(updated_node["n"])
  1013. except Exception as e:
  1014. logger.error(f"编辑数据资源失败: {str(e)}")
  1015. raise
  1016. def handle_data_source(data_source):
  1017. """处理数据源信息,创建或获取数据源节点"""
  1018. try:
  1019. with neo4j_driver.get_session() as session:
  1020. # 获取英文名称作为唯一标识
  1021. ds_en_name = data_source.get("en_name")
  1022. if not ds_en_name:
  1023. logger.error("数据源缺少必要的en_name属性")
  1024. return None
  1025. # 如果没有设置name,使用en_name作为name
  1026. if "name" not in data_source or not data_source["name"]:
  1027. data_source["name"] = ds_en_name
  1028. # 检查必填字段
  1029. required_fields = ["type", "host", "port", "database", "username"]
  1030. has_required_fields = all(data_source.get(field) for field in required_fields)
  1031. # 查询是否已存在相同en_name的数据源
  1032. existing_cypher = """
  1033. MATCH (ds:DataSource {en_name: $en_name})
  1034. RETURN ds
  1035. """
  1036. existing_result = session.run(existing_cypher, en_name=ds_en_name)
  1037. existing_record = existing_result.single()
  1038. if existing_record:
  1039. existing_data_source = dict(existing_record["ds"])
  1040. logger.info(f"根据名称找到现有数据源: {existing_data_source.get('en_name')}")
  1041. return existing_data_source.get("en_name")
  1042. else:
  1043. # 数据源不存在,抛出异常
  1044. raise ValueError(f"未找到名称为 {ds_en_name} 的数据源,请先创建该数据源或提供完整的数据源信息")
  1045. except Exception as e:
  1046. logger.error(f"处理数据源失败: {str(e)}")
  1047. raise RuntimeError(f"处理数据源失败: {str(e)}")