resource.py 48 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225
  1. import json
  2. import re
  3. import logging
  4. from py2neo import Relationship
  5. import pandas as pd
  6. from app.services.neo4j_driver import neo4j_driver
  7. from app.services.package_function import create_or_get_node, relationship_exists, get_node
  8. logger = logging.getLogger("app")
  9. def get_formatted_time():
  10. """获取格式化的当前时间"""
  11. import time
  12. return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  13. def get_node_by_id(label, id):
  14. """根据ID获取指定标签的节点"""
  15. try:
  16. with neo4j_driver.get_session() as session:
  17. cypher = f"MATCH (n:{label}) WHERE elementId(n) = $id RETURN n"
  18. result = session.run(cypher, id=int(id))
  19. record = result.single()
  20. return record["n"] if record else None
  21. except Exception as e:
  22. logger.error(f"根据ID获取节点失败: {str(e)}")
  23. return None
  24. def get_node_by_id_no_label(id):
  25. """根据ID获取节点,不限制标签"""
  26. try:
  27. with neo4j_driver.get_session() as session:
  28. cypher = "MATCH (n) WHERE elementId(n) = $id RETURN n"
  29. result = session.run(cypher, id=int(id))
  30. record = result.single()
  31. return record["n"] if record else None
  32. except Exception as e:
  33. logger.error(f"根据ID获取节点失败: {str(e)}")
  34. return None
  35. def delete_relationships(start_node, rel_type=None, end_node=None):
  36. """删除关系"""
  37. try:
  38. with neo4j_driver.get_session() as session:
  39. if rel_type and end_node:
  40. cypher = "MATCH (a)-[r:`{rel_type}`]->(b) WHERE elementId(a) = $start_id AND elementId(b) = $end_id DELETE r"
  41. cypher = cypher.replace("{rel_type}", rel_type)
  42. session.run(cypher, start_id=start_node.id, end_id=end_node.id)
  43. elif rel_type:
  44. cypher = "MATCH (a)-[r:`{rel_type}`]->() WHERE elementId(a) = $start_id DELETE r"
  45. cypher = cypher.replace("{rel_type}", rel_type)
  46. session.run(cypher, start_id=start_node.id)
  47. else:
  48. cypher = "MATCH (a)-[r]->() WHERE elementId(a) = $start_id DELETE r"
  49. session.run(cypher, start_id=start_node.id)
  50. return True
  51. except Exception as e:
  52. logger.error(f"删除关系失败: {str(e)}")
  53. return False
  54. def update_or_create_node(label, **properties):
  55. """更新或创建节点"""
  56. try:
  57. with neo4j_driver.get_session() as session:
  58. node_id = properties.pop('id', None)
  59. if node_id:
  60. # 更新现有节点
  61. set_clause = ", ".join([f"n.{k} = ${k}" for k in properties.keys()])
  62. cypher = f"MATCH (n:{label}) WHERE elementId(n) = $id SET {set_clause} RETURN n"
  63. result = session.run(cypher, id=int(node_id), **properties)
  64. else:
  65. # 创建新节点
  66. props_str = ", ".join([f"{k}: ${k}" for k in properties.keys()])
  67. cypher = f"CREATE (n:{label} {{{props_str}}}) RETURN n"
  68. result = session.run(cypher, **properties)
  69. record = result.single()
  70. return record["n"] if record else None
  71. except Exception as e:
  72. logger.error(f"更新或创建节点失败: {str(e)}")
  73. return None
  74. # 数据资源-元数据 关系节点创建、查询
  75. def handle_node(receiver, head_data, data_resource, data_source=None):
  76. """处理数据资源节点创建和关系建立"""
  77. try:
  78. # 更新属性
  79. update_attributes = {
  80. 'en_name': data_resource['en_name'],
  81. 'time': get_formatted_time(),
  82. 'type': 'structure' # 结构化文件没有type
  83. }
  84. if 'additional_info' in receiver:
  85. del receiver['additional_info']
  86. # 从receiver中移除data_source属性,避免将复杂对象作为节点属性
  87. if 'data_source' in receiver:
  88. del receiver['data_source']
  89. tag_list = receiver.get('tag')
  90. receiver.update(update_attributes)
  91. # 创建或获取 data_resource 节点
  92. with neo4j_driver.get_session() as session:
  93. props_str = ", ".join([f"{k}: ${k}" for k in receiver.keys()])
  94. cypher = f"""
  95. MERGE (n:data_resource {{name: $name}})
  96. ON CREATE SET n = {{{props_str}}}
  97. ON MATCH SET {", ".join([f"n.{k} = ${k}" for k in receiver.keys()])}
  98. RETURN n
  99. """
  100. result = session.run(cypher, **receiver)
  101. data_resource_node = result.single()["n"]
  102. resource_id = data_resource_node.element_id # 使用element_id属性获取完整ID
  103. # 处理标签关系
  104. if tag_list:
  105. tag_node = get_node_by_id('data_label', tag_list)
  106. if tag_node:
  107. # 检查关系是否存在
  108. rel_check = """
  109. MATCH (a:data_resource)-[r:label]->(b:data_label)
  110. WHERE elementId(a) = $resource_id AND elementId(b) = $tag_id
  111. RETURN r
  112. """
  113. rel_result = session.run(rel_check, resource_id=resource_id, tag_id=tag_node.element_id) # 使用element_id
  114. # 如果关系不存在则创建
  115. if not rel_result.single():
  116. rel_create = """
  117. MATCH (a:data_resource), (b:data_label)
  118. WHERE elementId(a) = $resource_id AND elementId(b) = $tag_id
  119. CREATE (a)-[r:label]->(b)
  120. RETURN r
  121. """
  122. session.run(rel_create, resource_id=resource_id, tag_id=tag_node.id)
  123. # 处理头部数据(元数据,字段)
  124. if head_data:
  125. for item in head_data:
  126. # 创建元数据节点
  127. meta_cypher = """
  128. MERGE (m:Metadata {name: $name})
  129. ON CREATE SET m.en_name = $en_name,
  130. m.createTime = $create_time,
  131. m.type = $type
  132. ON MATCH SET m.type = $type
  133. RETURN m
  134. """
  135. create_time = get_formatted_time()
  136. meta_result = session.run(
  137. meta_cypher,
  138. name=item['name'],
  139. en_name=item['en_name'],
  140. create_time=create_time,
  141. type=item['data_type'] # 使用data_type作为type属性
  142. )
  143. meta_record = meta_result.single()
  144. if meta_record and meta_record["m"]:
  145. meta_node = meta_record["m"]
  146. meta_id = meta_node.element_id
  147. # 打印日志确认节点创建成功和ID
  148. logger.info(f"创建或获取到元数据节点: ID={meta_id}, name={item['name']}")
  149. # 确认数据资源节点是否可以正确查询到
  150. check_resource_cypher = """
  151. MATCH (n:data_resource)
  152. WHERE elementId(n) = $resource_id
  153. RETURN n
  154. """
  155. check_resource = session.run(check_resource_cypher, resource_id=resource_id)
  156. if check_resource.single():
  157. logger.info(f"找到数据资源节点: ID={resource_id}")
  158. else:
  159. logger.error(f"无法找到数据资源节点: ID={resource_id}")
  160. continue
  161. # 创建关系
  162. rel_cypher = """
  163. MATCH (a:data_resource), (m:Metadata)
  164. WHERE elementId(a) = $resource_id AND elementId(m) = $meta_id
  165. MERGE (a)-[r:contain]->(m)
  166. RETURN r
  167. """
  168. rel_result = session.run(
  169. rel_cypher,
  170. resource_id=resource_id,
  171. meta_id=meta_id
  172. )
  173. rel_record = rel_result.single()
  174. if rel_record:
  175. logger.info(f"成功创建数据资源与元数据的关系: {resource_id} -> {meta_id}")
  176. else:
  177. logger.warning(f"创建数据资源与元数据的关系失败: {resource_id} -> {meta_id}")
  178. else:
  179. logger.error(f"未能创建或获取元数据节点: {item['name']}")
  180. # 处理数据源关系
  181. if data_source:
  182. try:
  183. # 创建或获取数据源节点
  184. data_source_en_name = handle_data_source(data_source)
  185. # 创建数据资源与数据源的关系
  186. if data_source_en_name:
  187. # 创建 isbelongto 关系
  188. rel_data_source_cypher = """
  189. MATCH (a:data_resource), (b:data_source)
  190. WHERE elementId(a) = $resource_id AND b.en_name = $ds_en_name
  191. MERGE (a)-[r:isbelongto]->(b)
  192. RETURN r
  193. """
  194. session.run(
  195. rel_data_source_cypher,
  196. resource_id=resource_id,
  197. ds_en_name=data_source_en_name
  198. )
  199. logger.info(f"已创建数据资源与数据源的关系: {resource_id} -> {data_source_en_name}")
  200. except Exception as e:
  201. logger.error(f"处理数据源关系失败: {str(e)}")
  202. raise RuntimeError(f"处理数据源关系失败: {str(e)}")
  203. return resource_id
  204. except Exception as e:
  205. logger.error(f"处理数据资源节点创建和关系建立失败: {str(e)}")
  206. raise
  207. def handle_id_resource(resource_id):
  208. """处理单个数据资源查询"""
  209. try:
  210. with neo4j_driver.get_session() as session:
  211. # 查询数据资源节点
  212. cypher = """
  213. MATCH (n:data_resource)
  214. WHERE elementId(n) = $resource_id
  215. RETURN n
  216. """
  217. result = session.run(cypher, resource_id=int(resource_id))
  218. record = result.single()
  219. if not record:
  220. return None
  221. data_resource = dict(record["n"])
  222. data_resource["id"] = record["n"].id
  223. # 查询关联的标签
  224. tag_cypher = """
  225. MATCH (n:data_resource)-[:label]->(t:data_label)
  226. WHERE elementId(n) = $resource_id
  227. RETURN t
  228. """
  229. tag_result = session.run(tag_cypher, resource_id=int(resource_id))
  230. tag_record = tag_result.single()
  231. if tag_record:
  232. tag = dict(tag_record["t"])
  233. tag["id"] = tag_record["t"].id
  234. data_resource["tag_info"] = tag
  235. # 查询关联的元数据
  236. meta_cypher = """
  237. MATCH (n:data_resource)-[:contain]->(m:Metadata)
  238. WHERE elementId(n) = $resource_id
  239. RETURN m
  240. """
  241. meta_result = session.run(meta_cypher, resource_id=int(resource_id))
  242. meta_list = []
  243. for meta_record in meta_result:
  244. meta = dict(meta_record["m"])
  245. meta["id"] = meta_record["m"].id
  246. meta_list.append(meta)
  247. data_resource["meta_list"] = meta_list
  248. return data_resource
  249. except Exception as e:
  250. logger.error(f"处理单个数据资源查询失败: {str(e)}")
  251. return None
  252. def id_resource_graph(resource_id):
  253. """获取数据资源图谱"""
  254. try:
  255. with neo4j_driver.get_session() as session:
  256. # 查询数据资源节点及其关系
  257. cypher = """
  258. MATCH (n:data_resource)-[r]-(m)
  259. WHERE elementId(n) = $resource_id
  260. RETURN n, r, m
  261. """
  262. result = session.run(cypher, resource_id=int(resource_id))
  263. # 收集节点和关系
  264. nodes = {}
  265. relationships = []
  266. for record in result:
  267. # 处理源节点
  268. source_node = dict(record["n"])
  269. source_node["id"] = record["n"].id
  270. nodes[source_node["id"]] = source_node
  271. # 处理目标节点
  272. target_node = dict(record["m"])
  273. target_node["id"] = record["m"].id
  274. nodes[target_node["id"]] = target_node
  275. # 处理关系
  276. rel = record["r"]
  277. relationship = {
  278. "id": rel.id,
  279. "source": record["n"].id,
  280. "target": record["m"].id,
  281. "type": rel.type
  282. }
  283. relationships.append(relationship)
  284. return {
  285. "nodes": list(nodes.values()),
  286. "relationships": relationships
  287. }
  288. except Exception as e:
  289. logger.error(f"获取数据资源图谱失败: {str(e)}")
  290. return {"nodes": [], "relationships": []}
  291. def resource_list(page, page_size, en_name_filter=None, name_filter=None,
  292. type_filter='all', category_filter=None, tag_filter=None):
  293. """获取数据资源列表"""
  294. try:
  295. with neo4j_driver.get_session() as session:
  296. # 构建查询条件
  297. match_clause = "MATCH (n:data_resource)"
  298. where_conditions = []
  299. if en_name_filter:
  300. where_conditions.append(f"n.en_name CONTAINS '{en_name_filter}'")
  301. if name_filter:
  302. where_conditions.append(f"n.name CONTAINS '{name_filter}'")
  303. if type_filter and type_filter != 'all':
  304. where_conditions.append(f"n.type = '{type_filter}'")
  305. if category_filter:
  306. where_conditions.append(f"n.category = '{category_filter}'")
  307. # 标签过滤需要额外的匹配
  308. if tag_filter:
  309. match_clause += "-[:label]->(t:data_label)"
  310. where_conditions.append(f"t.name = '{tag_filter}'")
  311. where_clause = " WHERE " + " AND ".join(where_conditions) if where_conditions else ""
  312. # 计算总数
  313. count_cypher = f"{match_clause}{where_clause} RETURN count(n) as count"
  314. count_result = session.run(count_cypher)
  315. total_count = count_result.single()["count"]
  316. # 分页查询
  317. skip = (page - 1) * page_size
  318. cypher = f"""
  319. {match_clause}{where_clause}
  320. RETURN n
  321. ORDER BY n.time DESC
  322. SKIP {skip} LIMIT {page_size}
  323. """
  324. result = session.run(cypher)
  325. # 格式化结果
  326. resources = []
  327. for record in result:
  328. node = dict(record["n"])
  329. node["id"] = record["n"].id
  330. # 查询关联的标签
  331. tag_cypher = """
  332. MATCH (n:data_resource)-[:label]->(t:data_label)
  333. WHERE elementId(n) = $resource_id
  334. RETURN t
  335. """
  336. tag_result = session.run(tag_cypher, resource_id=node["id"])
  337. tag_record = tag_result.single()
  338. if tag_record:
  339. tag = dict(tag_record["t"])
  340. tag["id"] = tag_record["t"].id
  341. node["tag_info"] = tag
  342. resources.append(node)
  343. return resources, total_count
  344. except Exception as e:
  345. logger.error(f"获取数据资源列表失败: {str(e)}")
  346. return [], 0
  347. def id_data_search_list(resource_id, page, page_size, en_name_filter=None,
  348. name_filter=None, category_filter=None, tag_filter=None):
  349. """获取特定数据资源关联的元数据列表"""
  350. try:
  351. with neo4j_driver.get_session() as session:
  352. # 基本匹配语句
  353. match_clause = """
  354. MATCH (n:data_resource)-[:contain]->(m:Metadata)
  355. WHERE elementId(n) = $resource_id
  356. """
  357. where_conditions = []
  358. if en_name_filter:
  359. where_conditions.append(f"m.en_name CONTAINS '{en_name_filter}'")
  360. if name_filter:
  361. where_conditions.append(f"m.name CONTAINS '{name_filter}'")
  362. if category_filter:
  363. where_conditions.append(f"m.category = '{category_filter}'")
  364. # 标签过滤需要额外的匹配
  365. tag_match = ""
  366. if tag_filter:
  367. tag_match = "MATCH (m)-[:HAS_TAG]->(t:Tag) WHERE t.name = $tag_filter"
  368. where_clause = " AND " + " AND ".join(where_conditions) if where_conditions else ""
  369. # 计算总数
  370. count_cypher = f"""
  371. {match_clause}{where_clause}
  372. {tag_match}
  373. RETURN count(m) as count
  374. """
  375. count_params = {"resource_id": int(resource_id)}
  376. if tag_filter:
  377. count_params["tag_filter"] = tag_filter
  378. count_result = session.run(count_cypher, **count_params)
  379. total_count = count_result.single()["count"]
  380. # 分页查询
  381. skip = (page - 1) * page_size
  382. cypher = f"""
  383. {match_clause}{where_clause}
  384. {tag_match}
  385. RETURN m
  386. ORDER BY m.name
  387. SKIP {skip} LIMIT {page_size}
  388. """
  389. result = session.run(cypher, **count_params)
  390. # 格式化结果
  391. metadata_list = []
  392. for record in result:
  393. node = dict(record["m"])
  394. node["id"] = record["m"].id
  395. metadata_list.append(node)
  396. return metadata_list, total_count
  397. except Exception as e:
  398. logger.error(f"获取数据资源关联的元数据列表失败: {str(e)}")
  399. return [], 0
  400. def resource_kinship_graph(resource_id, include_meta=True):
  401. """获取数据资源亲缘关系图谱"""
  402. try:
  403. with neo4j_driver.get_session() as session:
  404. # 基本查询
  405. cypher_parts = [
  406. "MATCH (n:data_resource) WHERE elementId(n) = $resource_id",
  407. "OPTIONAL MATCH (n)-[:label]->(l:data_label)",
  408. ]
  409. # 是否包含元数据
  410. if include_meta:
  411. cypher_parts.append("OPTIONAL MATCH (n)-[:contain]->(m:Metadata)")
  412. cypher_parts.append("RETURN n, l, collect(m) as metadata")
  413. cypher = "\n".join(cypher_parts)
  414. result = session.run(cypher, resource_id=int(resource_id))
  415. record = result.single()
  416. if not record:
  417. return {"nodes": [], "relationships": []}
  418. # 收集节点和关系
  419. nodes = {}
  420. relationships = []
  421. # 处理数据资源节点
  422. resource_node = dict(record["n"])
  423. resource_node["id"] = record["n"].id
  424. resource_node["labels"] = list(record["n"].labels)
  425. nodes[resource_node["id"]] = resource_node
  426. # 处理标签节点
  427. if record["l"]:
  428. label_node = dict(record["l"])
  429. label_node["id"] = record["l"].id
  430. label_node["labels"] = list(record["l"].labels)
  431. nodes[label_node["id"]] = label_node
  432. # 添加资源-标签关系
  433. relationships.append({
  434. "id": f"rel-{resource_node['id']}-label-{label_node['id']}",
  435. "source": resource_node["id"],
  436. "target": label_node["id"],
  437. "type": "label"
  438. })
  439. # 处理元数据节点
  440. if include_meta and record["metadata"]:
  441. for meta in record["metadata"]:
  442. if meta: # 检查元数据节点是否存在
  443. meta_node = dict(meta)
  444. meta_node["id"] = meta.id
  445. meta_node["labels"] = list(meta.labels)
  446. nodes[meta_node["id"]] = meta_node
  447. # 添加资源-元数据关系
  448. relationships.append({
  449. "id": f"rel-{resource_node['id']}-contain-{meta_node['id']}",
  450. "source": resource_node["id"],
  451. "target": meta_node["id"],
  452. "type": "contain"
  453. })
  454. return {
  455. "nodes": list(nodes.values()),
  456. "relationships": relationships
  457. }
  458. except Exception as e:
  459. logger.error(f"获取数据资源亲缘关系图谱失败: {str(e)}")
  460. return {"nodes": [], "relationships": []}
  461. def resource_impact_all_graph(resource_id, include_meta=True):
  462. """获取数据资源影响关系图谱"""
  463. try:
  464. with neo4j_driver.get_session() as session:
  465. # 根据meta参数决定查询深度
  466. if include_meta:
  467. cypher = """
  468. MATCH path = (n:data_resource)-[*1..3]-(m)
  469. WHERE elementId(n) = $resource_id
  470. RETURN path
  471. """
  472. else:
  473. cypher = """
  474. MATCH path = (n:data_resource)-[*1..2]-(m)
  475. WHERE elementId(n) = $resource_id
  476. AND NOT (m:Metadata)
  477. RETURN path
  478. """
  479. result = session.run(cypher, resource_id=int(resource_id))
  480. # 收集节点和关系
  481. nodes = {}
  482. relationships = {}
  483. for record in result:
  484. path = record["path"]
  485. # 处理路径中的所有节点
  486. for node in path.nodes:
  487. if node.id not in nodes:
  488. node_dict = dict(node)
  489. node_dict["id"] = node.id
  490. node_dict["labels"] = list(node.labels)
  491. nodes[node.id] = node_dict
  492. # 处理路径中的所有关系
  493. for rel in path.relationships:
  494. if rel.id not in relationships:
  495. rel_dict = {
  496. "id": rel.id,
  497. "source": rel.start_node.id,
  498. "target": rel.end_node.id,
  499. "type": rel.type
  500. }
  501. relationships[rel.id] = rel_dict
  502. return {
  503. "nodes": list(nodes.values()),
  504. "relationships": list(relationships.values())
  505. }
  506. except Exception as e:
  507. logger.error(f"获取数据资源影响关系图谱失败: {str(e)}")
  508. return {"nodes": [], "relationships": []}
  509. def clean_type(type_str):
  510. """清洗SQL类型字符串"""
  511. # 提取基本类型,不包括长度或精度信息
  512. basic_type = re.sub(r'\(.*?\)', '', type_str).strip().upper()
  513. # 移除 VARYING 这样的后缀
  514. basic_type = re.sub(r'\s+VARYING$', '', basic_type)
  515. # 标准化常见类型
  516. type_mapping = {
  517. 'INT': 'INTEGER',
  518. 'INT4': 'INTEGER',
  519. 'INT8': 'BIGINT',
  520. 'SMALLINT': 'SMALLINT',
  521. 'BIGINT': 'BIGINT',
  522. 'FLOAT4': 'FLOAT',
  523. 'FLOAT8': 'DOUBLE',
  524. 'REAL': 'FLOAT',
  525. 'DOUBLE PRECISION': 'DOUBLE',
  526. 'NUMERIC': 'DECIMAL',
  527. 'BOOL': 'BOOLEAN',
  528. 'CHARACTER': 'CHAR',
  529. 'CHAR VARYING': 'VARCHAR',
  530. 'CHARACTER VARYING': 'VARCHAR',
  531. 'TEXT': 'TEXT',
  532. 'DATE': 'DATE',
  533. 'TIME': 'TIME',
  534. 'TIMESTAMP': 'TIMESTAMP',
  535. 'TIMESTAMPTZ': 'TIMESTAMP WITH TIME ZONE',
  536. 'BYTEA': 'BINARY',
  537. 'JSON': 'JSON',
  538. 'JSONB': 'JSONB',
  539. 'UUID': 'UUID',
  540. 'SERIAL': 'SERIAL',
  541. 'SERIAL4': 'SERIAL',
  542. 'SERIAL8': 'BIGSERIAL',
  543. 'BIGSERIAL': 'BIGSERIAL'
  544. }
  545. # 尝试从映射表中获取标准化的类型
  546. return type_mapping.get(basic_type, basic_type)
  547. def clean_field_name(field_name):
  548. """清洗字段名"""
  549. return field_name.strip('`').strip('"').strip("'")
  550. def select_create_ddl(sql_content):
  551. """从SQL内容中提取创建表的DDL语句"""
  552. try:
  553. # 解析复杂的SQL文件,识别所有的CREATE TABLE语句及其关联的注释
  554. # 找到所有以CREATE TABLE开头的语句块,每个语句块包含主语句和相关的注释
  555. # 首先,分割 SQL 内容按分号
  556. statements = []
  557. current_statement = ""
  558. in_string = False
  559. string_quote = None
  560. for char in sql_content:
  561. if char in ["'", '"']:
  562. if not in_string:
  563. in_string = True
  564. string_quote = char
  565. elif char == string_quote:
  566. in_string = False
  567. string_quote = None
  568. current_statement += char
  569. elif char == ';' and not in_string:
  570. current_statement += char
  571. if current_statement.strip():
  572. statements.append(current_statement.strip())
  573. current_statement = ""
  574. else:
  575. current_statement += char
  576. if current_statement.strip():
  577. statements.append(current_statement.strip())
  578. # 找出所有的CREATE TABLE语句和关联的注释
  579. create_table_statements = []
  580. create_index = -1
  581. in_table_block = False
  582. current_table = None
  583. current_block = ""
  584. for i, stmt in enumerate(statements):
  585. if re.search(r'^\s*CREATE\s+TABLE', stmt, re.IGNORECASE):
  586. # 如果已经在处理表,先保存当前块
  587. if in_table_block and current_block:
  588. create_table_statements.append(current_block)
  589. # 开始新的表块
  590. in_table_block = True
  591. current_block = stmt
  592. # 提取表名
  593. table_match = re.search(r'CREATE\s+TABLE\s+(?:(?:"[^"]+"|\'[^\']+\'|[^"\'\s\.]+)\.)?(?:"([^"]+)"|\'([^\']+)\'|([^"\'\s\(]+))', stmt, re.IGNORECASE)
  594. if table_match:
  595. current_table = table_match.group(1) or table_match.group(2) or table_match.group(3)
  596. current_table = current_table.strip('"\'') if current_table else ""
  597. elif in_table_block and (re.search(r'COMMENT\s+ON\s+TABLE', stmt, re.IGNORECASE) or
  598. re.search(r'COMMENT\s+ON\s+COLUMN', stmt, re.IGNORECASE)):
  599. # 检查注释是否属于当前表
  600. if current_table:
  601. # 表注释处理
  602. if re.search(r'COMMENT\s+ON\s+TABLE', stmt, re.IGNORECASE):
  603. table_comment_match = re.search(r'COMMENT\s+ON\s+TABLE\s+[\'"]?(\w+)[\'"]?', stmt, re.IGNORECASE)
  604. if table_comment_match:
  605. comment_table = table_comment_match.group(1).strip('"\'')
  606. if comment_table == current_table:
  607. current_block += " " + stmt
  608. else:
  609. # 这是另一个表的注释,当前表的DDL到此结束
  610. create_table_statements.append(current_block)
  611. in_table_block = False
  612. current_block = ""
  613. current_table = None
  614. # 列注释处理
  615. elif re.search(r'COMMENT\s+ON\s+COLUMN', stmt, re.IGNORECASE):
  616. column_comment_match = re.search(
  617. r'COMMENT\s+ON\s+COLUMN\s+[\'"]?(\w+)[\'"]?\.[\'"]?(\w+)[\'"]?\s+IS\s+\'([^\']+)\'',
  618. stmt,
  619. re.IGNORECASE
  620. )
  621. if column_comment_match:
  622. comment_table = column_comment_match.group(1)
  623. if comment_table == current_table:
  624. current_block += " " + stmt
  625. else:
  626. # 这是另一个表的注释,当前表的DDL到此结束
  627. create_table_statements.append(current_block)
  628. in_table_block = False
  629. current_block = ""
  630. current_table = None
  631. elif in_table_block and re.search(r'^\s*CREATE\s+', stmt, re.IGNORECASE):
  632. # 如果遇到新的CREATE语句(不是注释),保存当前块并结束
  633. create_table_statements.append(current_block)
  634. in_table_block = False
  635. current_block = ""
  636. current_table = None
  637. # 添加最后一个块
  638. if in_table_block and current_block:
  639. create_table_statements.append(current_block)
  640. # 日志记录
  641. logger.debug(f"提取到 {len(create_table_statements)} 个DDL语句")
  642. for i, stmt in enumerate(create_table_statements):
  643. logger.debug(f"DDL语句 {i+1}: {stmt}")
  644. return create_table_statements
  645. except Exception as e:
  646. logger.error(f"提取DDL语句失败: {str(e)}")
  647. # logger.error(traceback.format_exc())
  648. return []
  649. def table_sql(sql):
  650. """解析表定义SQL,支持带schema和不带schema两种格式"""
  651. try:
  652. # 支持以下格式:
  653. # 1. CREATE TABLE tablename
  654. # 2. CREATE TABLE "tablename"
  655. # 3. CREATE TABLE 'tablename'
  656. # 4. CREATE TABLE schema.tablename
  657. # 5. CREATE TABLE "schema"."tablename"
  658. # 6. CREATE TABLE 'schema'.'tablename'
  659. # 匹配表名,支持带引号和不带引号的情况
  660. table_pattern = r'CREATE\s+TABLE\s+(?:(?:"([^"]+)"|\'([^\']+)\'|([^"\'\s\.]+))\.)?(?:"([^"]+)"|\'([^\']+)\'|([^"\'\s\(]+))'
  661. table_match = re.search(table_pattern, sql, re.IGNORECASE)
  662. if not table_match:
  663. logger.error(f"无法匹配CREATE TABLE语句: {sql[:100]}...")
  664. return None
  665. # 获取表名
  666. schema = table_match.group(1) or table_match.group(2) or table_match.group(3)
  667. table_name = table_match.group(4) or table_match.group(5) or table_match.group(6)
  668. if not table_name:
  669. logger.error("无法解析表名")
  670. return None
  671. logger.debug(f"解析到表名: {table_name}")
  672. # 提取CREATE TABLE语句的主体部分(括号内的内容)
  673. body_pattern = r'CREATE\s+TABLE\s+[^(]*\((.*?)\)(?=\s*;|\s*$)'
  674. body_match = re.search(body_pattern, sql, re.DOTALL | re.IGNORECASE)
  675. if not body_match:
  676. logger.error("无法提取表主体内容")
  677. return None
  678. body_text = body_match.group(1).strip()
  679. logger.debug(f"表定义主体部分: {body_text}")
  680. # 解析字段定义
  681. fields = []
  682. # 分割字段定义,处理括号嵌套和引号
  683. field_defs = []
  684. pos = 0
  685. in_parentheses = 0
  686. in_quotes = False
  687. quote_char = None
  688. for i, char in enumerate(body_text):
  689. if char in ["'", '"', '`'] and (not in_quotes or char == quote_char):
  690. in_quotes = not in_quotes
  691. if in_quotes:
  692. quote_char = char
  693. else:
  694. quote_char = None
  695. elif char == '(' and not in_quotes:
  696. in_parentheses += 1
  697. elif char == ')' and not in_quotes:
  698. in_parentheses -= 1
  699. elif char == ',' and in_parentheses == 0 and not in_quotes:
  700. field_defs.append(body_text[pos:i].strip())
  701. pos = i + 1
  702. # 添加最后一个字段定义
  703. if pos < len(body_text):
  704. field_defs.append(body_text[pos:].strip())
  705. logger.debug(f"解析出 {len(field_defs)} 个字段定义")
  706. # 处理每个字段定义
  707. for field_def in field_defs:
  708. # 跳过约束定义
  709. if re.match(r'^\s*(?:PRIMARY|UNIQUE|FOREIGN|CHECK|CONSTRAINT)\s+', field_def, re.IGNORECASE):
  710. continue
  711. # 提取字段名和类型
  712. field_pattern = r'^\s*(?:"([^"]+)"|\'([^\']+)\'|`([^`]+)`|([a-zA-Z0-9_]+))\s+(.+?)(?:\s+DEFAULT\s+|\s+NOT\s+NULL|\s+REFERENCES|\s*$)'
  713. field_match = re.search(field_pattern, field_def, re.IGNORECASE)
  714. if field_match:
  715. # 提取字段名
  716. field_name = field_match.group(1) or field_match.group(2) or field_match.group(3) or field_match.group(4)
  717. # 提取类型
  718. field_type = field_match.group(5).strip()
  719. # 处理类型中可能的括号
  720. type_base = re.split(r'\s+', field_type)[0]
  721. clean_type_value = clean_type(type_base)
  722. fields.append((field_name, clean_type_value))
  723. logger.debug(f"解析到字段: {field_name}, 类型: {clean_type_value}")
  724. else:
  725. logger.warning(f"无法解析字段定义: {field_def}")
  726. # 提取表注释
  727. table_comment = ""
  728. table_comment_pattern = r"COMMENT\s+ON\s+TABLE\s+(?:['\"]?(\w+)['\"]?)\s+IS\s+'([^']+)'"
  729. table_comment_match = re.search(table_comment_pattern, sql, re.IGNORECASE)
  730. if table_comment_match:
  731. comment_table = table_comment_match.group(1)
  732. if comment_table.strip("'\"") == table_name.strip("'\""):
  733. table_comment = table_comment_match.group(2)
  734. logger.debug(f"找到表注释: {table_comment}")
  735. # 提取列注释
  736. comments = {}
  737. column_comment_pattern = r"COMMENT\s+ON\s+COLUMN\s+['\"]?(\w+)['\"]?\.['\"]?(\w+)['\"]?\s+IS\s+'([^']+)'"
  738. for match in re.finditer(column_comment_pattern, sql, re.IGNORECASE):
  739. comment_table = match.group(1)
  740. column_name = match.group(2)
  741. comment = match.group(3)
  742. # 检查表名是否匹配
  743. if comment_table.strip("'\"") == table_name.strip("'\""):
  744. comments[column_name] = comment
  745. logger.debug(f"找到列注释: {column_name} - {comment}")
  746. else:
  747. logger.debug(f"忽略列注释,表名不匹配: {comment_table} vs {table_name}")
  748. # 检查字段和注释匹配情况
  749. logger.debug("========字段和注释匹配情况========")
  750. field_names = [f[0] for f in fields]
  751. logger.debug(f"字段列表 ({len(field_names)}): {field_names}")
  752. logger.debug(f"注释字段 ({len(comments)}): {list(comments.keys())}")
  753. # 构建返回结果
  754. meta_list = []
  755. for field_name, field_type in fields:
  756. chinese_name = comments.get(field_name, "")
  757. meta_list.append({
  758. "en_name": field_name,
  759. "data_type": field_type,
  760. "name": chinese_name if chinese_name else field_name
  761. })
  762. # 检查表是否存在
  763. try:
  764. status = status_query([table_name])
  765. except Exception as e:
  766. logger.error(f"检查表存在状态失败: {str(e)}")
  767. status = [False]
  768. # 构建返回结果
  769. result = {
  770. table_name: {
  771. "exist": status[0] if status else False,
  772. "meta": meta_list
  773. }
  774. }
  775. logger.debug(f"解析结果: {json.dumps(result, ensure_ascii=False)}")
  776. return result
  777. except Exception as e:
  778. logger.error(f"解析表定义SQL失败: {str(e)}")
  779. logger.error(f"异常详情: {e}")
  780. import traceback
  781. logger.error(traceback.format_exc())
  782. return None
  783. # 判断英文表名是否在图谱中存在
  784. def status_query(key_list):
  785. query = """
  786. unwind $Key_list as name
  787. OPTIONAL MATCH (n:data_model {en_name: name})
  788. OPTIONAL MATCH (n:data_resource {en_name: name})
  789. OPTIONAL MATCH (n:data_metric {en_name: name})
  790. WITH name, CASE
  791. WHEN n IS NOT NULL THEN True
  792. ELSE False
  793. END AS exist
  794. return collect(exist)AS exist
  795. """
  796. with neo4j_driver.get_session() as session:
  797. result = session.run(query, Key_list=key_list)
  798. data = result.value() # 获取单个值
  799. return data
  800. def select_sql(sql_query):
  801. """解析SELECT查询语句"""
  802. try:
  803. # 提取SELECT子句
  804. select_pattern = r'SELECT\s+(.*?)\s+FROM'
  805. select_match = re.search(select_pattern, sql_query, re.IGNORECASE | re.DOTALL)
  806. if not select_match:
  807. return None
  808. select_clause = select_match.group(1)
  809. # 分割字段
  810. fields = []
  811. # 处理字段列表,避免在函数调用中的逗号导致错误分割
  812. in_parenthesis = 0
  813. current_field = ""
  814. for char in select_clause:
  815. if char == '(':
  816. in_parenthesis += 1
  817. current_field += char
  818. elif char == ')':
  819. in_parenthesis -= 1
  820. current_field += char
  821. elif char == ',' and in_parenthesis == 0:
  822. fields.append(current_field.strip())
  823. current_field = ""
  824. else:
  825. current_field += char
  826. if current_field.strip():
  827. fields.append(current_field.strip())
  828. # 解析每个字段
  829. parsed_fields = []
  830. for field in fields:
  831. # 检查是否有字段别名
  832. alias_pattern = r'(.*?)\s+[aA][sS]\s+(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))$'
  833. alias_match = re.search(alias_pattern, field)
  834. if alias_match:
  835. field_expr = alias_match.group(1).strip()
  836. field_alias = next((g for g in alias_match.groups()[1:] if g is not None), "")
  837. parsed_fields.append({
  838. "expression": field_expr,
  839. "alias": field_alias
  840. })
  841. else:
  842. # 没有别名的情况
  843. parsed_fields.append({
  844. "expression": field.strip(),
  845. "alias": None
  846. })
  847. # 提取FROM子句和表名
  848. from_pattern = r'FROM\s+(.*?)(?:\s+WHERE|\s+GROUP|\s+HAVING|\s+ORDER|\s+LIMIT|$)'
  849. from_match = re.search(from_pattern, sql_query, re.IGNORECASE | re.DOTALL)
  850. tables = []
  851. if from_match:
  852. from_clause = from_match.group(1).strip()
  853. # 分析FROM子句中的表
  854. table_pattern = r'(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))(?:\s+(?:AS\s+)?(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))?'
  855. for match in re.finditer(table_pattern, from_clause):
  856. table_name = match.group(1) or match.group(2) or match.group(3) or match.group(4)
  857. if table_name:
  858. tables.append(table_name)
  859. return tables
  860. except Exception as e:
  861. logger.error(f"解析SELECT查询语句失败: {str(e)}")
  862. # logger.error(traceback.format_exc())
  863. return None
  864. def model_resource_list(page, page_size, name_filter=None):
  865. """获取模型资源列表"""
  866. try:
  867. with neo4j_driver.get_session() as session:
  868. # 构建查询条件
  869. match_clause = "MATCH (n:model_resource)"
  870. where_clause = ""
  871. if name_filter:
  872. where_clause = f" WHERE n.name CONTAINS '{name_filter}'"
  873. # 计算总数
  874. count_cypher = f"{match_clause}{where_clause} RETURN count(n) as count"
  875. count_result = session.run(count_cypher)
  876. total_count = count_result.single()["count"]
  877. # 分页查询
  878. skip = (page - 1) * page_size
  879. cypher = f"""
  880. {match_clause}{where_clause}
  881. RETURN n
  882. ORDER BY n.createTime DESC
  883. SKIP {skip} LIMIT {page_size}
  884. """
  885. result = session.run(cypher)
  886. # 格式化结果
  887. resources = []
  888. for record in result:
  889. node = dict(record["n"])
  890. node["id"] = record["n"].id
  891. resources.append(node)
  892. return resources, total_count
  893. except Exception as e:
  894. logger.error(f"获取模型资源列表失败: {str(e)}")
  895. return [], 0
  896. def data_resource_edit(data):
  897. """编辑数据资源"""
  898. try:
  899. resource_id = data.get("id")
  900. if not resource_id:
  901. raise ValueError("缺少资源ID")
  902. with neo4j_driver.get_session() as session:
  903. # 更新节点属性
  904. update_fields = {}
  905. for key, value in data.items():
  906. if key != "id" and key != "tag":
  907. update_fields[key] = value
  908. # 添加更新时间
  909. update_fields["updateTime"] = get_formatted_time()
  910. # 构建更新语句
  911. set_clause = ", ".join([f"n.{k} = ${k}" for k in update_fields.keys()])
  912. cypher = f"""
  913. MATCH (n:data_resource)
  914. WHERE elementId(n) = $resource_id
  915. SET {set_clause}
  916. RETURN n
  917. """
  918. result = session.run(cypher, resource_id=int(resource_id), **update_fields)
  919. updated_node = result.single()
  920. if not updated_node:
  921. raise ValueError("资源不存在")
  922. # 处理标签关系
  923. tag_id = data.get("tag")
  924. if tag_id:
  925. # 删除旧的标签关系
  926. delete_rel_cypher = """
  927. MATCH (n:data_resource)-[r:label]->()
  928. WHERE elementId(n) = $resource_id
  929. DELETE r
  930. """
  931. session.run(delete_rel_cypher, resource_id=int(resource_id))
  932. # 创建新的标签关系
  933. create_rel_cypher = """
  934. MATCH (n:data_resource), (t:data_label)
  935. WHERE elementId(n) = $resource_id AND elementId(t) = $tag_id
  936. CREATE (n)-[r:label]->(t)
  937. RETURN r
  938. """
  939. session.run(create_rel_cypher, resource_id=int(resource_id), tag_id=int(tag_id))
  940. # 返回更新后的节点
  941. return dict(updated_node["n"])
  942. except Exception as e:
  943. logger.error(f"编辑数据资源失败: {str(e)}")
  944. raise
  945. def handle_data_source(data_source):
  946. """处理数据源的检查和创建
  947. 参数:
  948. data_source: 包含数据源信息的字典,支持两种情况:
  949. 1. 只包含名称的情况,如:
  950. {
  951. "en_name": "10-52-31-104_5432_inventory"
  952. }
  953. 2. 完整的数据源信息,如:
  954. {
  955. "en_name": "10-52-31-104_5432_inventory",
  956. "name": "",
  957. "type": "postgresql",
  958. "host": "10.52.31.104",
  959. "port": 5432,
  960. "database": "inventory_management",
  961. "username": "app_user",
  962. "password": "pG$ecur3P@ss789"
  963. }
  964. 返回:
  965. 成功时返回数据源的名称,失败时抛出异常
  966. """
  967. try:
  968. # 获取数据源名称
  969. ds_en_name = data_source.get("en_name")
  970. if not ds_en_name:
  971. raise ValueError("数据源信息不完整,缺少名称")
  972. # 检查是否只有名称参数
  973. only_en_name = len(data_source) == 1 and "en_name" in data_source
  974. with neo4j_driver.get_session() as session:
  975. if only_en_name:
  976. # 仅有英文名称的情况:查询是否存在该名称的数据源
  977. check_name_cypher = """
  978. MATCH (ds:data_source {en_name: $en_name})
  979. RETURN ds
  980. """
  981. check_result = session.run(check_name_cypher, en_name=ds_en_name)
  982. existing_record = check_result.single()
  983. if existing_record:
  984. # 数据源已存在,返回其名称
  985. existing_data_source = dict(existing_record["ds"])
  986. logger.info(f"根据名称找到现有数据源: {existing_data_source.get('en_name')}")
  987. return existing_data_source.get("en_name")
  988. else:
  989. # 数据源不存在,抛出异常
  990. raise ValueError(f"未找到名称为 {ds_en_name} 的数据源")
  991. else:
  992. # 完整的数据源信息:提取其它属性
  993. ds_type = data_source.get("type")
  994. ds_host = data_source.get("host")
  995. ds_port = data_source.get("port")
  996. ds_database = data_source.get("database")
  997. ds_username = data_source.get("username")
  998. # 检查必要的属性是否存在
  999. if not all([ds_type, ds_host, ds_port, ds_database, ds_username]):
  1000. raise ValueError("数据源信息不完整,缺少必要的属性")
  1001. # 检查是否已存在相同数据源(除名称和密码外属性相同)
  1002. check_cypher = """
  1003. MATCH (ds:data_source)
  1004. WHERE ds.type = $type AND
  1005. ds.host = $host AND
  1006. ds.port = $port AND
  1007. ds.database = $database AND
  1008. ds.username = $username
  1009. RETURN ds
  1010. """
  1011. check_result = session.run(
  1012. check_cypher,
  1013. type=ds_type,
  1014. host=ds_host,
  1015. port=ds_port,
  1016. database=ds_database,
  1017. username=ds_username
  1018. )
  1019. existing_record = check_result.single()
  1020. if existing_record:
  1021. # 数据源已存在,返回其名称
  1022. existing_data_source = dict(existing_record["ds"])
  1023. logger.info(f"找到现有数据源: {existing_data_source.get('en_name')}")
  1024. return existing_data_source.get("en_name")
  1025. # 数据源不存在,创建新节点
  1026. create_cypher = """
  1027. CREATE (ds:data_source $properties)
  1028. RETURN ds
  1029. """
  1030. # 添加创建时间
  1031. data_source["createTime"] = get_formatted_time()
  1032. create_result = session.run(create_cypher, properties=data_source)
  1033. created_record = create_result.single()
  1034. if not created_record:
  1035. raise RuntimeError("创建数据源节点失败")
  1036. new_data_source = dict(created_record["ds"])
  1037. logger.info(f"创建新数据源: {new_data_source.get('en_name')}")
  1038. return new_data_source.get("en_name")
  1039. except Exception as e:
  1040. logger.error(f"处理数据源失败: {str(e)}")
  1041. raise RuntimeError(f"处理数据源失败: {str(e)}")