resource.py 51 KB


  1. import json
  2. import re
  3. import logging
  4. from py2neo import Relationship
  5. import pandas as pd
  6. from app.services.neo4j_driver import neo4j_driver
  7. from app.services.package_function import create_or_get_node, relationship_exists, get_node
  8. logger = logging.getLogger("app")
  9. def get_formatted_time():
  10. """获取格式化的当前时间"""
  11. import time
  12. return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  13. def get_node_by_id(label, id):
  14. """根据ID获取指定标签的节点"""
  15. try:
  16. with neo4j_driver.get_session() as session:
  17. # 确保id为整数
  18. try:
  19. id_int = int(id)
  20. except (ValueError, TypeError):
  21. logger.error(f"节点ID不是有效的整数: {id}")
  22. return None
  23. cypher = f"MATCH (n:{label}) WHERE id(n) = $id RETURN n"
  24. result = session.run(cypher, id=id_int)
  25. record = result.single()
  26. return record["n"] if record else None
  27. except Exception as e:
  28. logger.error(f"根据ID获取节点失败: {str(e)}")
  29. return None
  30. def get_node_by_id_no_label(id):
  31. """根据ID获取节点,不限制标签"""
  32. try:
  33. with neo4j_driver.get_session() as session:
  34. # 确保id为整数
  35. try:
  36. id_int = int(id)
  37. except (ValueError, TypeError):
  38. logger.error(f"节点ID不是有效的整数: {id}")
  39. return None
  40. cypher = "MATCH (n) WHERE id(n) = $id RETURN n"
  41. result = session.run(cypher, id=id_int)
  42. record = result.single()
  43. return record["n"] if record else None
  44. except Exception as e:
  45. logger.error(f"根据ID获取节点失败: {str(e)}")
  46. return None
  47. def delete_relationships(start_node, rel_type=None, end_node=None):
  48. """删除关系"""
  49. try:
  50. with neo4j_driver.get_session() as session:
  51. if rel_type and end_node:
  52. cypher = "MATCH (a)-[r:`{rel_type}`]->(b) WHERE id(a) = $start_id AND id(b) = $end_id DELETE r"
  53. cypher = cypher.replace("{rel_type}", rel_type)
  54. session.run(cypher, start_id=start_node.id, end_id=end_node.id)
  55. elif rel_type:
  56. cypher = "MATCH (a)-[r:`{rel_type}`]->() WHERE id(a) = $start_id DELETE r"
  57. cypher = cypher.replace("{rel_type}", rel_type)
  58. session.run(cypher, start_id=start_node.id)
  59. else:
  60. cypher = "MATCH (a)-[r]->() WHERE id(a) = $start_id DELETE r"
  61. session.run(cypher, start_id=start_node.id)
  62. return True
  63. except Exception as e:
  64. logger.error(f"删除关系失败: {str(e)}")
  65. return False
  66. def update_or_create_node(label, **properties):
  67. """更新或创建节点"""
  68. try:
  69. with neo4j_driver.get_session() as session:
  70. node_id = properties.pop('id', None)
  71. if node_id:
  72. # 更新现有节点
  73. set_clause = ", ".join([f"n.{k} = ${k}" for k in properties.keys()])
  74. cypher = f"MATCH (n:{label}) WHERE id(n) = $id SET {set_clause} RETURN n"
  75. result = session.run(cypher, id=int(node_id), **properties)
  76. else:
  77. # 创建新节点
  78. props_str = ", ".join([f"{k}: ${k}" for k in properties.keys()])
  79. cypher = f"CREATE (n:{label} {{{props_str}}}) RETURN n"
  80. result = session.run(cypher, **properties)
  81. record = result.single()
  82. return record["n"] if record else None
  83. except Exception as e:
  84. logger.error(f"更新或创建节点失败: {str(e)}")
  85. return None
  86. # 数据资源-元数据 关系节点创建、查询
  87. def handle_node(receiver, head_data, data_resource, data_source=None):
  88. """处理数据资源节点创建和关系建立"""
  89. try:
  90. # 更新属性
  91. update_attributes = {
  92. 'en_name': data_resource['en_name'],
  93. 'time': get_formatted_time(),
  94. 'type': 'structure' # 结构化文件没有type
  95. }
  96. if 'additional_info' in receiver:
  97. del receiver['additional_info']
  98. # 从receiver中移除data_source属性,避免将复杂对象作为节点属性
  99. if 'data_source' in receiver:
  100. del receiver['data_source']
  101. tag_list = receiver.get('tag')
  102. receiver.update(update_attributes)
  103. # 创建或获取 data_resource 节点
  104. with neo4j_driver.get_session() as session:
  105. props_str = ", ".join([f"{k}: ${k}" for k in receiver.keys()])
  106. cypher = f"""
  107. MERGE (n:data_resource {{name: $name}})
  108. ON CREATE SET n = {{{props_str}}}
  109. ON MATCH SET {", ".join([f"n.{k} = ${k}" for k in receiver.keys()])}
  110. RETURN n
  111. """
  112. result = session.run(cypher, **receiver)
  113. data_resource_node = result.single()["n"]
  114. resource_id = data_resource_node.id # 使用id属性获取数值ID
  115. # 处理标签关系
  116. if tag_list:
  117. tag_node = get_node_by_id('data_label', tag_list)
  118. if tag_node:
  119. # 检查关系是否存在
  120. rel_check = """
  121. MATCH (a:data_resource)-[r:label]->(b:data_label)
  122. WHERE id(a) = $resource_id AND id(b) = $tag_id
  123. RETURN r
  124. """
  125. rel_result = session.run(rel_check, resource_id=resource_id, tag_id=tag_node.id) # 使用数值id
  126. # 如果关系不存在则创建
  127. if not rel_result.single():
  128. rel_create = """
  129. MATCH (a:data_resource), (b:data_label)
  130. WHERE id(a) = $resource_id AND id(b) = $tag_id
  131. CREATE (a)-[r:label]->(b)
  132. RETURN r
  133. """
  134. session.run(rel_create, resource_id=resource_id, tag_id=tag_node.id)
  135. # 处理头部数据(元数据,字段)
  136. if head_data:
  137. for item in head_data:
  138. # 创建元数据节点
  139. meta_cypher = """
  140. MERGE (m:meta_data {name: $name})
  141. ON CREATE SET m.en_name = $en_name,
  142. m.createTime = $create_time,
  143. m.type = $type
  144. ON MATCH SET m.type = $type
  145. RETURN m
  146. """
  147. create_time = get_formatted_time()
  148. meta_result = session.run(
  149. meta_cypher,
  150. name=item['name'],
  151. en_name=item['en_name'],
  152. create_time=create_time,
  153. type=item['data_type'] # 使用data_type作为type属性
  154. )
  155. meta_record = meta_result.single()
  156. if meta_record and meta_record["m"]:
  157. meta_node = meta_record["m"]
  158. meta_id = meta_node.id # 使用数值ID
  159. # 打印日志确认节点创建成功和ID
  160. logger.info(f"创建或获取到元数据节点: ID={meta_id}, name={item['name']}")
  161. # 确认数据资源节点是否可以正确查询到
  162. check_resource_cypher = """
  163. MATCH (n:data_resource)
  164. WHERE id(n) = $resource_id
  165. RETURN n
  166. """
  167. check_resource = session.run(check_resource_cypher, resource_id=resource_id)
  168. if check_resource.single():
  169. logger.info(f"找到数据资源节点: ID={resource_id}")
  170. else:
  171. logger.error(f"无法找到数据资源节点: ID={resource_id}")
  172. continue
  173. # 创建关系
  174. rel_cypher = """
  175. MATCH (a:data_resource), (m:meta_data)
  176. WHERE id(a) = $resource_id AND id(m) = $meta_id
  177. MERGE (a)-[r:contain]->(m)
  178. RETURN r
  179. """
  180. rel_result = session.run(
  181. rel_cypher,
  182. resource_id=resource_id,
  183. meta_id=meta_id
  184. )
  185. rel_record = rel_result.single()
  186. if rel_record:
  187. logger.info(f"成功创建数据资源与元数据的关系: {resource_id} -> {meta_id}")
  188. else:
  189. logger.warning(f"创建数据资源与元数据的关系失败: {resource_id} -> {meta_id}")
  190. else:
  191. logger.error(f"未能创建或获取元数据节点: {item['name']}")
  192. # 处理数据源关系
  193. if data_source:
  194. try:
  195. # 创建或获取数据源节点
  196. data_source_en_name = handle_data_source(data_source)
  197. # 创建数据资源与数据源的关系
  198. if data_source_en_name:
  199. # 创建 isbelongto 关系
  200. rel_data_source_cypher = """
  201. MATCH (a:data_resource), (b:data_source)
  202. WHERE id(a) = $resource_id AND b.en_name = $ds_en_name
  203. MERGE (a)-[r:isbelongto]->(b)
  204. RETURN r
  205. """
  206. session.run(
  207. rel_data_source_cypher,
  208. resource_id=resource_id,
  209. ds_en_name=data_source_en_name
  210. )
  211. logger.info(f"已创建数据资源与数据源的关系: {resource_id} -> {data_source_en_name}")
  212. except Exception as e:
  213. logger.error(f"处理数据源关系失败: {str(e)}")
  214. raise RuntimeError(f"处理数据源关系失败: {str(e)}")
  215. return resource_id
  216. except Exception as e:
  217. logger.error(f"处理数据资源节点创建和关系建立失败: {str(e)}")
  218. raise
  219. def handle_id_resource(resource_id):
  220. """处理单个数据资源查询"""
  221. try:
  222. with neo4j_driver.get_session() as session:
  223. # 确保resource_id为整数
  224. try:
  225. resource_id_int = int(resource_id)
  226. except (ValueError, TypeError):
  227. logger.error(f"资源ID不是有效的整数: {resource_id}")
  228. return None
  229. # 使用数值ID查询
  230. cypher = """
  231. MATCH (n:data_resource)
  232. WHERE id(n) = $resource_id
  233. RETURN n
  234. """
  235. result = session.run(cypher, resource_id=resource_id_int)
  236. record = result.single()
  237. if not record:
  238. logger.error(f"未找到资源,ID: {resource_id_int}")
  239. return None
  240. # 构建返回数据
  241. data_resource = dict(record["n"])
  242. data_resource["id"] = record["n"].id
  243. # 查询关联的标签
  244. tag_cypher = """
  245. MATCH (n:data_resource)-[:label]->(t:data_label)
  246. WHERE id(n) = $resource_id
  247. RETURN t
  248. """
  249. tag_result = session.run(tag_cypher, resource_id=resource_id_int)
  250. tag_record = tag_result.single()
  251. if tag_record:
  252. tag = dict(tag_record["t"])
  253. tag["id"] = tag_record["t"].id
  254. data_resource["tag_info"] = tag
  255. # 查询关联的元数据 - 支持meta_data和Metadata两种标签
  256. meta_cypher = """
  257. MATCH (n:data_resource)-[:contain]->(m)
  258. WHERE id(n) = $resource_id
  259. AND (m:meta_data OR m:Metadata)
  260. RETURN m
  261. """
  262. meta_result = session.run(meta_cypher, resource_id=resource_id_int)
  263. meta_list = []
  264. for meta_record in meta_result:
  265. meta = dict(meta_record["m"])
  266. meta["id"] = meta_record["m"].id
  267. meta_list.append(meta)
  268. data_resource["meta_list"] = meta_list
  269. logger.info(f"成功获取资源详情,ID: {resource_id_int}")
  270. return data_resource
  271. except Exception as e:
  272. logger.error(f"处理单个数据资源查询失败: {str(e)}")
  273. return None
  274. def id_resource_graph(resource_id):
  275. """获取数据资源图谱"""
  276. try:
  277. with neo4j_driver.get_session() as session:
  278. # 查询数据资源节点及其关系
  279. cypher = """
  280. MATCH (n:data_resource)-[r]-(m)
  281. WHERE id(n) = $resource_id
  282. RETURN n, r, m
  283. """
  284. result = session.run(cypher, resource_id=int(resource_id))
  285. # 收集节点和关系
  286. nodes = {}
  287. relationships = []
  288. for record in result:
  289. # 处理源节点
  290. source_node = dict(record["n"])
  291. source_node["id"] = record["n"].id
  292. nodes[source_node["id"]] = source_node
  293. # 处理目标节点
  294. target_node = dict(record["m"])
  295. target_node["id"] = record["m"].id
  296. nodes[target_node["id"]] = target_node
  297. # 处理关系
  298. rel = record["r"]
  299. relationship = {
  300. "id": rel.id,
  301. "source": record["n"].id,
  302. "target": record["m"].id,
  303. "type": rel.type
  304. }
  305. relationships.append(relationship)
  306. return {
  307. "nodes": list(nodes.values()),
  308. "relationships": relationships
  309. }
  310. except Exception as e:
  311. logger.error(f"获取数据资源图谱失败: {str(e)}")
  312. return {"nodes": [], "relationships": []}
  313. def resource_list(page, page_size, en_name_filter=None, name_filter=None,
  314. type_filter='all', category_filter=None, tag_filter=None):
  315. """获取数据资源列表"""
  316. try:
  317. with neo4j_driver.get_session() as session:
  318. # 构建查询条件
  319. match_clause = "MATCH (n:data_resource)"
  320. where_conditions = []
  321. if en_name_filter:
  322. where_conditions.append(f"n.en_name CONTAINS '{en_name_filter}'")
  323. if name_filter:
  324. where_conditions.append(f"n.name CONTAINS '{name_filter}'")
  325. if type_filter and type_filter != 'all':
  326. where_conditions.append(f"n.type = '{type_filter}'")
  327. if category_filter:
  328. where_conditions.append(f"n.category = '{category_filter}'")
  329. # 标签过滤需要额外的匹配
  330. if tag_filter:
  331. match_clause += "-[:label]->(t:data_label)"
  332. where_conditions.append(f"t.name = '{tag_filter}'")
  333. where_clause = " WHERE " + " AND ".join(where_conditions) if where_conditions else ""
  334. # 计算总数
  335. count_cypher = f"{match_clause}{where_clause} RETURN count(n) as count"
  336. count_result = session.run(count_cypher)
  337. total_count = count_result.single()["count"]
  338. # 分页查询
  339. skip = (page - 1) * page_size
  340. cypher = f"""
  341. {match_clause}{where_clause}
  342. RETURN n
  343. ORDER BY n.time DESC
  344. SKIP {skip} LIMIT {page_size}
  345. """
  346. result = session.run(cypher)
  347. # 格式化结果
  348. resources = []
  349. for record in result:
  350. node = dict(record["n"])
  351. node["id"] = record["n"].id
  352. # 查询关联的标签
  353. tag_cypher = """
  354. MATCH (n:data_resource)-[:label]->(t:data_label)
  355. WHERE id(n) = $resource_id
  356. RETURN t
  357. """
  358. tag_result = session.run(tag_cypher, resource_id=node["id"])
  359. tag_record = tag_result.single()
  360. if tag_record:
  361. tag = dict(tag_record["t"])
  362. tag["id"] = tag_record["t"].id
  363. node["tag_info"] = tag
  364. resources.append(node)
  365. return resources, total_count
  366. except Exception as e:
  367. logger.error(f"获取数据资源列表失败: {str(e)}")
  368. return [], 0
  369. def id_data_search_list(resource_id, page, page_size, en_name_filter=None,
  370. name_filter=None, category_filter=None, tag_filter=None):
  371. """获取特定数据资源关联的元数据列表"""
  372. try:
  373. with neo4j_driver.get_session() as session:
  374. # 确保resource_id为整数
  375. try:
  376. resource_id_int = int(resource_id)
  377. except (ValueError, TypeError):
  378. logger.error(f"资源ID不是有效的整数: {resource_id}")
  379. return [], 0
  380. # 基本匹配语句 - 支持meta_data和Metadata标签
  381. match_clause = """
  382. MATCH (n:data_resource)-[:contain]->(m)
  383. WHERE id(n) = $resource_id
  384. AND (m:meta_data OR m:Metadata)
  385. """
  386. where_conditions = []
  387. if en_name_filter:
  388. where_conditions.append(f"m.en_name CONTAINS '{en_name_filter}'")
  389. if name_filter:
  390. where_conditions.append(f"m.name CONTAINS '{name_filter}'")
  391. if category_filter:
  392. where_conditions.append(f"m.category = '{category_filter}'")
  393. # 标签过滤需要额外的匹配
  394. tag_match = ""
  395. if tag_filter:
  396. tag_match = "MATCH (m)-[:HAS_TAG]->(t:Tag) WHERE t.name = $tag_filter"
  397. where_clause = " AND " + " AND ".join(where_conditions) if where_conditions else ""
  398. # 计算总数
  399. count_cypher = f"""
  400. {match_clause}{where_clause}
  401. {tag_match}
  402. RETURN count(m) as count
  403. """
  404. count_params = {"resource_id": resource_id_int}
  405. if tag_filter:
  406. count_params["tag_filter"] = tag_filter
  407. count_result = session.run(count_cypher, **count_params)
  408. total_count = count_result.single()["count"]
  409. # 分页查询
  410. skip = (page - 1) * page_size
  411. cypher = f"""
  412. {match_clause}{where_clause}
  413. {tag_match}
  414. RETURN m
  415. ORDER BY m.name
  416. SKIP {skip} LIMIT {page_size}
  417. """
  418. result = session.run(cypher, **count_params)
  419. # 格式化结果
  420. metadata_list = []
  421. for record in result:
  422. meta = dict(record["m"])
  423. meta["id"] = record["m"].id
  424. metadata_list.append(meta)
  425. logger.info(f"成功获取资源关联元数据,ID: {resource_id_int}, 元数据数量: {total_count}")
  426. return metadata_list, total_count
  427. except Exception as e:
  428. logger.error(f"获取数据资源关联的元数据列表失败: {str(e)}")
  429. return [], 0
  430. def resource_kinship_graph(resource_id, include_meta=True):
  431. """获取数据资源亲缘关系图谱"""
  432. try:
  433. with neo4j_driver.get_session() as session:
  434. # 确保resource_id为整数
  435. try:
  436. resource_id_int = int(resource_id)
  437. except (ValueError, TypeError):
  438. logger.error(f"资源ID不是有效的整数: {resource_id}")
  439. return {"nodes": [], "relationships": []}
  440. # 基本查询
  441. cypher_parts = [
  442. f"MATCH (n:data_resource) WHERE id(n) = $resource_id",
  443. "OPTIONAL MATCH (n)-[:label]->(l:data_label)",
  444. ]
  445. # 是否包含元数据 - 支持meta_data和Metadata两种标签
  446. if include_meta:
  447. cypher_parts.append("OPTIONAL MATCH (n)-[:contain]->(m) WHERE (m:meta_data OR m:Metadata)")
  448. cypher_parts.append("RETURN n, l, collect(m) as metadata")
  449. cypher = "\n".join(cypher_parts)
  450. result = session.run(cypher, resource_id=resource_id_int)
  451. record = result.single()
  452. if not record:
  453. logger.error(f"未找到资源图谱数据,ID: {resource_id_int}")
  454. return {"nodes": [], "relationships": []}
  455. # 收集节点和关系
  456. nodes = {}
  457. relationships = []
  458. # 处理数据资源节点
  459. resource_node = dict(record["n"])
  460. resource_node["id"] = record["n"].id
  461. resource_node["labels"] = list(record["n"].labels)
  462. nodes[resource_node["id"]] = resource_node
  463. # 处理标签节点
  464. if record["l"]:
  465. label_node = dict(record["l"])
  466. label_node["id"] = record["l"].id
  467. label_node["labels"] = list(record["l"].labels)
  468. nodes[label_node["id"]] = label_node
  469. # 添加资源-标签关系
  470. relationships.append({
  471. "id": f"rel-{resource_node['id']}-label-{label_node['id']}",
  472. "source": resource_node["id"],
  473. "target": label_node["id"],
  474. "type": "label"
  475. })
  476. # 处理元数据节点
  477. if include_meta and record["metadata"]:
  478. for meta in record["metadata"]:
  479. if meta: # 检查元数据节点是否存在
  480. meta_node = dict(meta)
  481. meta_node["id"] = meta.id
  482. meta_node["labels"] = list(meta.labels)
  483. nodes[meta_node["id"]] = meta_node
  484. # 添加资源-元数据关系
  485. relationships.append({
  486. "id": f"rel-{resource_node['id']}-contain-{meta_node['id']}",
  487. "source": resource_node["id"],
  488. "target": meta_node["id"],
  489. "type": "contain"
  490. })
  491. logger.info(f"成功获取资源图谱,ID: {resource_id_int}, 节点数: {len(nodes)}")
  492. return {
  493. "nodes": list(nodes.values()),
  494. "relationships": relationships
  495. }
  496. except Exception as e:
  497. logger.error(f"获取数据资源亲缘关系图谱失败: {str(e)}")
  498. return {"nodes": [], "relationships": []}
  499. def resource_impact_all_graph(resource_id, include_meta=True):
  500. """获取数据资源影响关系图谱"""
  501. try:
  502. with neo4j_driver.get_session() as session:
  503. # 确保resource_id为整数
  504. try:
  505. resource_id_int = int(resource_id)
  506. except (ValueError, TypeError):
  507. logger.error(f"资源ID不是有效的整数: {resource_id}")
  508. return {"nodes": [], "relationships": []}
  509. # 根据meta参数决定查询深度
  510. if include_meta:
  511. cypher = """
  512. MATCH path = (n:data_resource)-[*1..3]-(m)
  513. WHERE id(n) = $resource_id
  514. RETURN path
  515. """
  516. else:
  517. cypher = """
  518. MATCH path = (n:data_resource)-[*1..2]-(m)
  519. WHERE id(n) = $resource_id
  520. AND NOT (m:meta_data) AND NOT (m:Metadata)
  521. RETURN path
  522. """
  523. result = session.run(cypher, resource_id=resource_id_int)
  524. # 收集节点和关系
  525. nodes = {}
  526. relationships = {}
  527. for record in result:
  528. path = record["path"]
  529. # 处理路径中的所有节点
  530. for node in path.nodes:
  531. if node.id not in nodes:
  532. node_dict = dict(node)
  533. node_dict["id"] = node.id
  534. node_dict["labels"] = list(node.labels)
  535. nodes[node.id] = node_dict
  536. # 处理路径中的所有关系
  537. for rel in path.relationships:
  538. if rel.id not in relationships:
  539. rel_dict = {
  540. "id": rel.id,
  541. "source": rel.start_node.id,
  542. "target": rel.end_node.id,
  543. "type": rel.type
  544. }
  545. relationships[rel.id] = rel_dict
  546. logger.info(f"成功获取完整图谱,ID: {resource_id_int}, 节点数: {len(nodes)}")
  547. return {
  548. "nodes": list(nodes.values()),
  549. "relationships": list(relationships.values())
  550. }
  551. except Exception as e:
  552. logger.error(f"获取数据资源影响关系图谱失败: {str(e)}")
  553. return {"nodes": [], "relationships": []}
  554. def clean_type(type_str):
  555. """清洗SQL类型字符串"""
  556. # 提取基本类型,不包括长度或精度信息
  557. basic_type = re.sub(r'\(.*?\)', '', type_str).strip().upper()
  558. # 移除 VARYING 这样的后缀
  559. basic_type = re.sub(r'\s+VARYING$', '', basic_type)
  560. # 标准化常见类型
  561. type_mapping = {
  562. 'INT': 'INTEGER',
  563. 'INT4': 'INTEGER',
  564. 'INT8': 'BIGINT',
  565. 'SMALLINT': 'SMALLINT',
  566. 'BIGINT': 'BIGINT',
  567. 'FLOAT4': 'FLOAT',
  568. 'FLOAT8': 'DOUBLE',
  569. 'REAL': 'FLOAT',
  570. 'DOUBLE PRECISION': 'DOUBLE',
  571. 'NUMERIC': 'DECIMAL',
  572. 'BOOL': 'BOOLEAN',
  573. 'CHARACTER': 'CHAR',
  574. 'CHAR VARYING': 'VARCHAR',
  575. 'CHARACTER VARYING': 'VARCHAR',
  576. 'TEXT': 'TEXT',
  577. 'DATE': 'DATE',
  578. 'TIME': 'TIME',
  579. 'TIMESTAMP': 'TIMESTAMP',
  580. 'TIMESTAMPTZ': 'TIMESTAMP WITH TIME ZONE',
  581. 'BYTEA': 'BINARY',
  582. 'JSON': 'JSON',
  583. 'JSONB': 'JSONB',
  584. 'UUID': 'UUID',
  585. 'SERIAL': 'SERIAL',
  586. 'SERIAL4': 'SERIAL',
  587. 'SERIAL8': 'BIGSERIAL',
  588. 'BIGSERIAL': 'BIGSERIAL'
  589. }
  590. # 尝试从映射表中获取标准化的类型
  591. return type_mapping.get(basic_type, basic_type)
  592. def clean_field_name(field_name):
  593. """清洗字段名"""
  594. return field_name.strip('`').strip('"').strip("'")
  595. def select_create_ddl(sql_content):
  596. """从SQL内容中提取创建表的DDL语句"""
  597. try:
  598. # 解析复杂的SQL文件,识别所有的CREATE TABLE语句及其关联的注释
  599. # 找到所有以CREATE TABLE开头的语句块,每个语句块包含主语句和相关的注释
  600. # 首先,分割 SQL 内容按分号
  601. statements = []
  602. current_statement = ""
  603. in_string = False
  604. string_quote = None
  605. for char in sql_content:
  606. if char in ["'", '"']:
  607. if not in_string:
  608. in_string = True
  609. string_quote = char
  610. elif char == string_quote:
  611. in_string = False
  612. string_quote = None
  613. current_statement += char
  614. elif char == ';' and not in_string:
  615. current_statement += char
  616. if current_statement.strip():
  617. statements.append(current_statement.strip())
  618. current_statement = ""
  619. else:
  620. current_statement += char
  621. if current_statement.strip():
  622. statements.append(current_statement.strip())
  623. # 找出所有的CREATE TABLE语句和关联的注释
  624. create_table_statements = []
  625. create_index = -1
  626. in_table_block = False
  627. current_table = None
  628. current_block = ""
  629. for i, stmt in enumerate(statements):
  630. if re.search(r'^\s*CREATE\s+TABLE', stmt, re.IGNORECASE):
  631. # 如果已经在处理表,先保存当前块
  632. if in_table_block and current_block:
  633. create_table_statements.append(current_block)
  634. # 开始新的表块
  635. in_table_block = True
  636. current_block = stmt
  637. # 提取表名
  638. table_match = re.search(r'CREATE\s+TABLE\s+(?:(?:"[^"]+"|\'[^\']+\'|[^"\'\s\.]+)\.)?(?:"([^"]+)"|\'([^\']+)\'|([^"\'\s\(]+))', stmt, re.IGNORECASE)
  639. if table_match:
  640. current_table = table_match.group(1) or table_match.group(2) or table_match.group(3)
  641. current_table = current_table.strip('"\'') if current_table else ""
  642. elif in_table_block and (re.search(r'COMMENT\s+ON\s+TABLE', stmt, re.IGNORECASE) or
  643. re.search(r'COMMENT\s+ON\s+COLUMN', stmt, re.IGNORECASE)):
  644. # 检查注释是否属于当前表
  645. if current_table:
  646. # 表注释处理
  647. if re.search(r'COMMENT\s+ON\s+TABLE', stmt, re.IGNORECASE):
  648. table_comment_match = re.search(r'COMMENT\s+ON\s+TABLE\s+[\'"]?(\w+)[\'"]?', stmt, re.IGNORECASE)
  649. if table_comment_match:
  650. comment_table = table_comment_match.group(1).strip('"\'')
  651. if comment_table == current_table:
  652. current_block += " " + stmt
  653. else:
  654. # 这是另一个表的注释,当前表的DDL到此结束
  655. create_table_statements.append(current_block)
  656. in_table_block = False
  657. current_block = ""
  658. current_table = None
  659. # 列注释处理
  660. elif re.search(r'COMMENT\s+ON\s+COLUMN', stmt, re.IGNORECASE):
  661. column_comment_match = re.search(
  662. r'COMMENT\s+ON\s+COLUMN\s+[\'"]?(\w+)[\'"]?\.[\'"]?(\w+)[\'"]?\s+IS\s+\'([^\']+)\'',
  663. stmt,
  664. re.IGNORECASE
  665. )
  666. if column_comment_match:
  667. comment_table = column_comment_match.group(1)
  668. if comment_table == current_table:
  669. current_block += " " + stmt
  670. else:
  671. # 这是另一个表的注释,当前表的DDL到此结束
  672. create_table_statements.append(current_block)
  673. in_table_block = False
  674. current_block = ""
  675. current_table = None
  676. elif in_table_block and re.search(r'^\s*CREATE\s+', stmt, re.IGNORECASE):
  677. # 如果遇到新的CREATE语句(不是注释),保存当前块并结束
  678. create_table_statements.append(current_block)
  679. in_table_block = False
  680. current_block = ""
  681. current_table = None
  682. # 添加最后一个块
  683. if in_table_block and current_block:
  684. create_table_statements.append(current_block)
  685. # 日志记录
  686. logger.debug(f"提取到 {len(create_table_statements)} 个DDL语句")
  687. for i, stmt in enumerate(create_table_statements):
  688. logger.debug(f"DDL语句 {i+1}: {stmt}")
  689. return create_table_statements
  690. except Exception as e:
  691. logger.error(f"提取DDL语句失败: {str(e)}")
  692. # logger.error(traceback.format_exc())
  693. return []
  694. def table_sql(sql):
  695. """解析表定义SQL,支持带schema和不带schema两种格式"""
  696. try:
  697. # 支持以下格式:
  698. # 1. CREATE TABLE tablename
  699. # 2. CREATE TABLE "tablename"
  700. # 3. CREATE TABLE 'tablename'
  701. # 4. CREATE TABLE schema.tablename
  702. # 5. CREATE TABLE "schema"."tablename"
  703. # 6. CREATE TABLE 'schema'.'tablename'
  704. # 匹配表名,支持带引号和不带引号的情况
  705. table_pattern = r'CREATE\s+TABLE\s+(?:(?:"([^"]+)"|\'([^\']+)\'|([^"\'\s\.]+))\.)?(?:"([^"]+)"|\'([^\']+)\'|([^"\'\s\(]+))'
  706. table_match = re.search(table_pattern, sql, re.IGNORECASE)
  707. if not table_match:
  708. logger.error(f"无法匹配CREATE TABLE语句: {sql[:100]}...")
  709. return None
  710. # 获取表名
  711. schema = table_match.group(1) or table_match.group(2) or table_match.group(3)
  712. table_name = table_match.group(4) or table_match.group(5) or table_match.group(6)
  713. if not table_name:
  714. logger.error("无法解析表名")
  715. return None
  716. logger.debug(f"解析到表名: {table_name}")
  717. # 提取CREATE TABLE语句的主体部分(括号内的内容)
  718. body_pattern = r'CREATE\s+TABLE\s+[^(]*\((.*?)\)(?=\s*;|\s*$)'
  719. body_match = re.search(body_pattern, sql, re.DOTALL | re.IGNORECASE)
  720. if not body_match:
  721. logger.error("无法提取表主体内容")
  722. return None
  723. body_text = body_match.group(1).strip()
  724. logger.debug(f"表定义主体部分: {body_text}")
  725. # 解析字段定义
  726. fields = []
  727. # 分割字段定义,处理括号嵌套和引号
  728. field_defs = []
  729. pos = 0
  730. in_parentheses = 0
  731. in_quotes = False
  732. quote_char = None
  733. for i, char in enumerate(body_text):
  734. if char in ["'", '"', '`'] and (not in_quotes or char == quote_char):
  735. in_quotes = not in_quotes
  736. if in_quotes:
  737. quote_char = char
  738. else:
  739. quote_char = None
  740. elif char == '(' and not in_quotes:
  741. in_parentheses += 1
  742. elif char == ')' and not in_quotes:
  743. in_parentheses -= 1
  744. elif char == ',' and in_parentheses == 0 and not in_quotes:
  745. field_defs.append(body_text[pos:i].strip())
  746. pos = i + 1
  747. # 添加最后一个字段定义
  748. if pos < len(body_text):
  749. field_defs.append(body_text[pos:].strip())
  750. logger.debug(f"解析出 {len(field_defs)} 个字段定义")
  751. # 处理每个字段定义
  752. for field_def in field_defs:
  753. # 跳过约束定义
  754. if re.match(r'^\s*(?:PRIMARY|UNIQUE|FOREIGN|CHECK|CONSTRAINT)\s+', field_def, re.IGNORECASE):
  755. continue
  756. # 提取字段名和类型
  757. field_pattern = r'^\s*(?:"([^"]+)"|\'([^\']+)\'|`([^`]+)`|([a-zA-Z0-9_]+))\s+(.+?)(?:\s+DEFAULT\s+|\s+NOT\s+NULL|\s+REFERENCES|\s*$)'
  758. field_match = re.search(field_pattern, field_def, re.IGNORECASE)
  759. if field_match:
  760. # 提取字段名
  761. field_name = field_match.group(1) or field_match.group(2) or field_match.group(3) or field_match.group(4)
  762. # 提取类型
  763. field_type = field_match.group(5).strip()
  764. # 处理类型中可能的括号
  765. type_base = re.split(r'\s+', field_type)[0]
  766. clean_type_value = clean_type(type_base)
  767. fields.append((field_name, clean_type_value))
  768. logger.debug(f"解析到字段: {field_name}, 类型: {clean_type_value}")
  769. else:
  770. logger.warning(f"无法解析字段定义: {field_def}")
  771. # 提取表注释
  772. table_comment = ""
  773. table_comment_pattern = r"COMMENT\s+ON\s+TABLE\s+(?:['\"]?(\w+)['\"]?)\s+IS\s+'([^']+)'"
  774. table_comment_match = re.search(table_comment_pattern, sql, re.IGNORECASE)
  775. if table_comment_match:
  776. comment_table = table_comment_match.group(1)
  777. if comment_table.strip("'\"") == table_name.strip("'\""):
  778. table_comment = table_comment_match.group(2)
  779. logger.debug(f"找到表注释: {table_comment}")
  780. # 提取列注释
  781. comments = {}
  782. column_comment_pattern = r"COMMENT\s+ON\s+COLUMN\s+['\"]?(\w+)['\"]?\.['\"]?(\w+)['\"]?\s+IS\s+'([^']+)'"
  783. for match in re.finditer(column_comment_pattern, sql, re.IGNORECASE):
  784. comment_table = match.group(1)
  785. column_name = match.group(2)
  786. comment = match.group(3)
  787. # 检查表名是否匹配
  788. if comment_table.strip("'\"") == table_name.strip("'\""):
  789. comments[column_name] = comment
  790. logger.debug(f"找到列注释: {column_name} - {comment}")
  791. else:
  792. logger.debug(f"忽略列注释,表名不匹配: {comment_table} vs {table_name}")
  793. # 检查字段和注释匹配情况
  794. logger.debug("========字段和注释匹配情况========")
  795. field_names = [f[0] for f in fields]
  796. logger.debug(f"字段列表 ({len(field_names)}): {field_names}")
  797. logger.debug(f"注释字段 ({len(comments)}): {list(comments.keys())}")
  798. # 构建返回结果
  799. meta_list = []
  800. for field_name, field_type in fields:
  801. chinese_name = comments.get(field_name, "")
  802. meta_list.append({
  803. "en_name": field_name,
  804. "data_type": field_type,
  805. "name": chinese_name if chinese_name else field_name
  806. })
  807. # 检查表是否存在
  808. try:
  809. status = status_query([table_name])
  810. except Exception as e:
  811. logger.error(f"检查表存在状态失败: {str(e)}")
  812. status = [False]
  813. # 构建返回结果
  814. result = {
  815. table_name: {
  816. "exist": status[0] if status else False,
  817. "meta": meta_list
  818. }
  819. }
  820. logger.debug(f"解析结果: {json.dumps(result, ensure_ascii=False)}")
  821. return result
  822. except Exception as e:
  823. logger.error(f"解析表定义SQL失败: {str(e)}")
  824. logger.error(f"异常详情: {e}")
  825. import traceback
  826. logger.error(traceback.format_exc())
  827. return None
  828. # 判断英文表名是否在图谱中存在
  829. def status_query(key_list):
  830. query = """
  831. unwind $Key_list as name
  832. OPTIONAL MATCH (n:data_model {en_name: name})
  833. OPTIONAL MATCH (n:data_resource {en_name: name})
  834. OPTIONAL MATCH (n:data_metric {en_name: name})
  835. WITH name, CASE
  836. WHEN n IS NOT NULL THEN True
  837. ELSE False
  838. END AS exist
  839. return collect(exist)AS exist
  840. """
  841. with neo4j_driver.get_session() as session:
  842. result = session.run(query, Key_list=key_list)
  843. data = result.value() # 获取单个值
  844. return data
  845. def select_sql(sql_query):
  846. """解析SELECT查询语句"""
  847. try:
  848. # 提取SELECT子句
  849. select_pattern = r'SELECT\s+(.*?)\s+FROM'
  850. select_match = re.search(select_pattern, sql_query, re.IGNORECASE | re.DOTALL)
  851. if not select_match:
  852. return None
  853. select_clause = select_match.group(1)
  854. # 分割字段
  855. fields = []
  856. # 处理字段列表,避免在函数调用中的逗号导致错误分割
  857. in_parenthesis = 0
  858. current_field = ""
  859. for char in select_clause:
  860. if char == '(':
  861. in_parenthesis += 1
  862. current_field += char
  863. elif char == ')':
  864. in_parenthesis -= 1
  865. current_field += char
  866. elif char == ',' and in_parenthesis == 0:
  867. fields.append(current_field.strip())
  868. current_field = ""
  869. else:
  870. current_field += char
  871. if current_field.strip():
  872. fields.append(current_field.strip())
  873. # 解析每个字段
  874. parsed_fields = []
  875. for field in fields:
  876. # 检查是否有字段别名
  877. alias_pattern = r'(.*?)\s+[aA][sS]\s+(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))$'
  878. alias_match = re.search(alias_pattern, field)
  879. if alias_match:
  880. field_expr = alias_match.group(1).strip()
  881. field_alias = next((g for g in alias_match.groups()[1:] if g is not None), "")
  882. parsed_fields.append({
  883. "expression": field_expr,
  884. "alias": field_alias
  885. })
  886. else:
  887. # 没有别名的情况
  888. parsed_fields.append({
  889. "expression": field.strip(),
  890. "alias": None
  891. })
  892. # 提取FROM子句和表名
  893. from_pattern = r'FROM\s+(.*?)(?:\s+WHERE|\s+GROUP|\s+HAVING|\s+ORDER|\s+LIMIT|$)'
  894. from_match = re.search(from_pattern, sql_query, re.IGNORECASE | re.DOTALL)
  895. tables = []
  896. if from_match:
  897. from_clause = from_match.group(1).strip()
  898. # 分析FROM子句中的表
  899. table_pattern = r'(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))(?:\s+(?:AS\s+)?(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))?'
  900. for match in re.finditer(table_pattern, from_clause):
  901. table_name = match.group(1) or match.group(2) or match.group(3) or match.group(4)
  902. if table_name:
  903. tables.append(table_name)
  904. return tables
  905. except Exception as e:
  906. logger.error(f"解析SELECT查询语句失败: {str(e)}")
  907. # logger.error(traceback.format_exc())
  908. return None
  909. def model_resource_list(page, page_size, name_filter=None):
  910. """获取模型资源列表"""
  911. try:
  912. with neo4j_driver.get_session() as session:
  913. # 构建查询条件
  914. match_clause = "MATCH (n:model_resource)"
  915. where_clause = ""
  916. if name_filter:
  917. where_clause = f" WHERE n.name CONTAINS '{name_filter}'"
  918. # 计算总数
  919. count_cypher = f"{match_clause}{where_clause} RETURN count(n) as count"
  920. count_result = session.run(count_cypher)
  921. total_count = count_result.single()["count"]
  922. # 分页查询
  923. skip = (page - 1) * page_size
  924. cypher = f"""
  925. {match_clause}{where_clause}
  926. RETURN n
  927. ORDER BY n.createTime DESC
  928. SKIP {skip} LIMIT {page_size}
  929. """
  930. result = session.run(cypher)
  931. # 格式化结果
  932. resources = []
  933. for record in result:
  934. node = dict(record["n"])
  935. node["id"] = record["n"].id
  936. resources.append(node)
  937. return resources, total_count
  938. except Exception as e:
  939. logger.error(f"获取模型资源列表失败: {str(e)}")
  940. return [], 0
  941. def data_resource_edit(data):
  942. """编辑数据资源"""
  943. try:
  944. resource_id = data.get("id")
  945. if not resource_id:
  946. raise ValueError("缺少资源ID")
  947. with neo4j_driver.get_session() as session:
  948. # 更新节点属性
  949. update_fields = {}
  950. for key, value in data.items():
  951. if key != "id" and key != "tag":
  952. update_fields[key] = value
  953. # 添加更新时间
  954. update_fields["updateTime"] = get_formatted_time()
  955. # 构建更新语句
  956. set_clause = ", ".join([f"n.{k} = ${k}" for k in update_fields.keys()])
  957. cypher = f"""
  958. MATCH (n:data_resource)
  959. WHERE id(n) = $resource_id
  960. SET {set_clause}
  961. RETURN n
  962. """
  963. result = session.run(cypher, resource_id=int(resource_id), **update_fields)
  964. updated_node = result.single()
  965. if not updated_node:
  966. raise ValueError("资源不存在")
  967. # 处理标签关系
  968. tag_id = data.get("tag")
  969. if tag_id:
  970. # 删除旧的标签关系
  971. delete_rel_cypher = """
  972. MATCH (n:data_resource)-[r:label]->()
  973. WHERE id(n) = $resource_id
  974. DELETE r
  975. """
  976. session.run(delete_rel_cypher, resource_id=int(resource_id))
  977. # 创建新的标签关系
  978. create_rel_cypher = """
  979. MATCH (n:data_resource), (t:data_label)
  980. WHERE id(n) = $resource_id AND id(t) = $tag_id
  981. CREATE (n)-[r:label]->(t)
  982. RETURN r
  983. """
  984. session.run(create_rel_cypher, resource_id=int(resource_id), tag_id=int(tag_id))
  985. # 返回更新后的节点
  986. return dict(updated_node["n"])
  987. except Exception as e:
  988. logger.error(f"编辑数据资源失败: {str(e)}")
  989. raise
  990. def handle_data_source(data_source):
  991. """处理数据源的检查和创建
  992. 参数:
  993. data_source: 包含数据源信息的字典,支持两种情况:
  994. 1. 简单情况(只需查询已有数据源),只要包含en_name即可:
  995. {
  996. "en_name": "10-52-31-104_5432_inventory"
  997. }
  998. {
  999. "en_name": "10-52-31-104_5432_inventory",
  1000. "name": "教师数据库"
  1001. }
  1002. 2. 完整的数据源信息(用于创建新数据源):
  1003. {
  1004. "en_name": "10-52-31-104_5432_inventory",
  1005. "name": "教师数据库",
  1006. "type": "postgresql",
  1007. "host": "10.52.31.104",
  1008. "port": 5432,
  1009. "database": "inventory_management",
  1010. "username": "app_user",
  1011. "password": "pG$ecur3P@ss789"
  1012. }
  1013. 返回:
  1014. 成功时返回数据源的名称,失败时抛出异常
  1015. """
  1016. try:
  1017. # 获取数据源名称
  1018. ds_en_name = data_source.get("en_name")
  1019. if not ds_en_name:
  1020. raise ValueError("数据源信息不完整,缺少名称")
  1021. # 检查是否为简单查询模式
  1022. # 判断数据源是否包含创建新数据源所需的必要字段
  1023. required_fields = ["type", "host", "port", "database", "username"]
  1024. has_required_fields = all(data_source.get(field) for field in required_fields)
  1025. with neo4j_driver.get_session() as session:
  1026. if not has_required_fields:
  1027. # 简单查询模式:只通过en_name查找已有数据源
  1028. logger.info(f"简单数据源查询模式,查找en_name为: {ds_en_name}")
  1029. check_name_cypher = """
  1030. MATCH (ds:data_source {en_name: $en_name})
  1031. RETURN ds
  1032. """
  1033. check_result = session.run(check_name_cypher, en_name=ds_en_name)
  1034. existing_record = check_result.single()
  1035. if existing_record:
  1036. # 数据源已存在,返回其名称
  1037. existing_data_source = dict(existing_record["ds"])
  1038. logger.info(f"根据名称找到现有数据源: {existing_data_source.get('en_name')}")
  1039. return existing_data_source.get("en_name")
  1040. else:
  1041. # 数据源不存在,抛出异常
  1042. raise ValueError(f"未找到名称为 {ds_en_name} 的数据源,请先创建该数据源或提供完整的数据源信息")
  1043. else:
  1044. # 完整的数据源信息模式:创建或获取数据源
  1045. # 确保name不为空,如果为空则使用en_name
  1046. if "name" not in data_source or not data_source["name"]:
  1047. data_source["name"] = ds_en_name
  1048. logger.info(f"数据源name为空,使用en_name作为替代: {ds_en_name}")
  1049. # 检查是否已存在相同数据源(除名称和密码外属性相同)
  1050. check_cypher = """
  1051. MATCH (ds:data_source)
  1052. WHERE ds.type = $type AND
  1053. ds.host = $host AND
  1054. ds.port = $port AND
  1055. ds.database = $database AND
  1056. ds.username = $username
  1057. RETURN ds
  1058. """
  1059. check_result = session.run(
  1060. check_cypher,
  1061. type=data_source["type"],
  1062. host=data_source["host"],
  1063. port=data_source["port"],
  1064. database=data_source["database"],
  1065. username=data_source["username"]
  1066. )
  1067. existing_record = check_result.single()
  1068. if existing_record:
  1069. # 数据源已存在,返回其名称
  1070. existing_data_source = dict(existing_record["ds"])
  1071. logger.info(f"找到现有数据源: {existing_data_source.get('en_name')}")
  1072. return existing_data_source.get("en_name")
  1073. # 数据源不存在,创建新节点
  1074. create_cypher = """
  1075. CREATE (ds:data_source $properties)
  1076. RETURN ds
  1077. """
  1078. # 添加创建时间
  1079. data_source["createTime"] = get_formatted_time()
  1080. create_result = session.run(create_cypher, properties=data_source)
  1081. created_record = create_result.single()
  1082. if not created_record:
  1083. raise RuntimeError("创建数据源节点失败")
  1084. new_data_source = dict(created_record["ds"])
  1085. logger.info(f"创建新数据源: {new_data_source.get('en_name')}")
  1086. return new_data_source.get("en_name")
  1087. except Exception as e:
  1088. logger.error(f"处理数据源失败: {str(e)}")
  1089. raise RuntimeError(f"处理数据源失败: {str(e)}")