resource.py 40 KB


  1. import json
  2. import re
  3. import logging
  4. from py2neo import Relationship
  5. import pandas as pd
  6. from app.services.neo4j_driver import neo4j_driver
  7. from app.services.package_function import create_or_get_node, relationship_exists, get_node
  8. logger = logging.getLogger("app")
  9. def get_formatted_time():
  10. """获取格式化的当前时间"""
  11. import time
  12. return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  13. def get_node_by_id(label, id):
  14. """根据ID获取指定标签的节点"""
  15. try:
  16. with neo4j_driver.get_session() as session:
  17. cypher = f"MATCH (n:{label}) WHERE id(n) = $id RETURN n"
  18. result = session.run(cypher, id=int(id))
  19. record = result.single()
  20. return record["n"] if record else None
  21. except Exception as e:
  22. logger.error(f"根据ID获取节点失败: {str(e)}")
  23. return None
  24. def get_node_by_id_no_label(id):
  25. """根据ID获取节点,不限制标签"""
  26. try:
  27. with neo4j_driver.get_session() as session:
  28. cypher = "MATCH (n) WHERE id(n) = $id RETURN n"
  29. result = session.run(cypher, id=int(id))
  30. record = result.single()
  31. return record["n"] if record else None
  32. except Exception as e:
  33. logger.error(f"根据ID获取节点失败: {str(e)}")
  34. return None
  35. def delete_relationships(start_node, rel_type=None, end_node=None):
  36. """删除关系"""
  37. try:
  38. with neo4j_driver.get_session() as session:
  39. if rel_type and end_node:
  40. cypher = "MATCH (a)-[r:`{rel_type}`]->(b) WHERE id(a) = $start_id AND id(b) = $end_id DELETE r"
  41. cypher = cypher.replace("{rel_type}", rel_type)
  42. session.run(cypher, start_id=start_node.id, end_id=end_node.id)
  43. elif rel_type:
  44. cypher = "MATCH (a)-[r:`{rel_type}`]->() WHERE id(a) = $start_id DELETE r"
  45. cypher = cypher.replace("{rel_type}", rel_type)
  46. session.run(cypher, start_id=start_node.id)
  47. else:
  48. cypher = "MATCH (a)-[r]->() WHERE id(a) = $start_id DELETE r"
  49. session.run(cypher, start_id=start_node.id)
  50. return True
  51. except Exception as e:
  52. logger.error(f"删除关系失败: {str(e)}")
  53. return False
  54. def update_or_create_node(label, **properties):
  55. """更新或创建节点"""
  56. try:
  57. with neo4j_driver.get_session() as session:
  58. node_id = properties.pop('id', None)
  59. if node_id:
  60. # 更新现有节点
  61. set_clause = ", ".join([f"n.{k} = ${k}" for k in properties.keys()])
  62. cypher = f"MATCH (n:{label}) WHERE id(n) = $id SET {set_clause} RETURN n"
  63. result = session.run(cypher, id=int(node_id), **properties)
  64. else:
  65. # 创建新节点
  66. props_str = ", ".join([f"{k}: ${k}" for k in properties.keys()])
  67. cypher = f"CREATE (n:{label} {{{props_str}}}) RETURN n"
  68. result = session.run(cypher, **properties)
  69. record = result.single()
  70. return record["n"] if record else None
  71. except Exception as e:
  72. logger.error(f"更新或创建节点失败: {str(e)}")
  73. return None
  74. # 数据资源-元数据 关系节点创建、查询
  75. def handle_node(receiver, head_data, data_resource):
  76. """处理数据资源节点创建和关系建立"""
  77. try:
  78. # 更新属性
  79. update_attributes = {
  80. 'en_name': data_resource['en_name'],
  81. 'time': get_formatted_time(),
  82. 'type': 'structure' # 结构化文件没有type
  83. }
  84. if 'additional_info' in receiver:
  85. del receiver['additional_info']
  86. tag_list = receiver.get('tag')
  87. receiver.update(update_attributes)
  88. # 创建或获取 data_resource 节点
  89. with neo4j_driver.get_session() as session:
  90. props_str = ", ".join([f"{k}: ${k}" for k in receiver.keys()])
  91. cypher = f"""
  92. MERGE (n:data_resource {{name: $name}})
  93. ON CREATE SET n = {{{props_str}}}
  94. ON MATCH SET {", ".join([f"n.{k} = ${k}" for k in receiver.keys()])}
  95. RETURN n
  96. """
  97. result = session.run(cypher, **receiver)
  98. data_resource_node = result.single()["n"]
  99. resource_id = data_resource_node.id
  100. # 处理标签关系
  101. if tag_list:
  102. tag_node = get_node_by_id('data_label', tag_list)
  103. if tag_node:
  104. # 检查关系是否存在
  105. rel_check = """
  106. MATCH (a:data_resource)-[r:label]->(b:data_label)
  107. WHERE id(a) = $resource_id AND id(b) = $tag_id
  108. RETURN r
  109. """
  110. rel_result = session.run(rel_check, resource_id=resource_id, tag_id=tag_node.id)
  111. # 如果关系不存在则创建
  112. if not rel_result.single():
  113. rel_create = """
  114. MATCH (a:data_resource), (b:data_label)
  115. WHERE id(a) = $resource_id AND id(b) = $tag_id
  116. CREATE (a)-[r:label]->(b)
  117. RETURN r
  118. """
  119. session.run(rel_create, resource_id=resource_id, tag_id=tag_node.id)
  120. # 处理头部数据(元数据)
  121. if head_data:
  122. for item in head_data:
  123. # 创建元数据节点
  124. meta_cypher = """
  125. MERGE (m:Metadata {name: $name})
  126. ON CREATE SET m.en_name = $en_name,
  127. m.createTime = $create_time,
  128. m.type = $type
  129. ON MATCH SET m.type = $type
  130. RETURN m
  131. """
  132. create_time = get_formatted_time()
  133. meta_result = session.run(
  134. meta_cypher,
  135. name=item['name'],
  136. en_name=item['en_name'],
  137. create_time=create_time,
  138. type=item['data_type'] # 使用data_type作为type属性
  139. )
  140. meta_node = meta_result.single()["m"]
  141. # 创建关系
  142. rel_cypher = """
  143. MATCH (a:data_resource), (m:Metadata)
  144. WHERE id(a) = $resource_id AND id(m) = $meta_id
  145. MERGE (a)-[r:contain]->(m)
  146. RETURN r
  147. """
  148. session.run(
  149. rel_cypher,
  150. resource_id=resource_id,
  151. meta_id=meta_node.id
  152. )
  153. return resource_id
  154. except Exception as e:
  155. logger.error(f"处理数据资源节点创建和关系建立失败: {str(e)}")
  156. raise
  157. def handle_id_resource(resource_id):
  158. """处理单个数据资源查询"""
  159. try:
  160. with neo4j_driver.get_session() as session:
  161. # 查询数据资源节点
  162. cypher = """
  163. MATCH (n:data_resource)
  164. WHERE id(n) = $resource_id
  165. RETURN n
  166. """
  167. result = session.run(cypher, resource_id=int(resource_id))
  168. record = result.single()
  169. if not record:
  170. return None
  171. data_resource = dict(record["n"])
  172. data_resource["id"] = record["n"].id
  173. # 查询关联的标签
  174. tag_cypher = """
  175. MATCH (n:data_resource)-[:label]->(t:data_label)
  176. WHERE id(n) = $resource_id
  177. RETURN t
  178. """
  179. tag_result = session.run(tag_cypher, resource_id=int(resource_id))
  180. tag_record = tag_result.single()
  181. if tag_record:
  182. tag = dict(tag_record["t"])
  183. tag["id"] = tag_record["t"].id
  184. data_resource["tag_info"] = tag
  185. # 查询关联的元数据
  186. meta_cypher = """
  187. MATCH (n:data_resource)-[:contain]->(m:Metadata)
  188. WHERE id(n) = $resource_id
  189. RETURN m
  190. """
  191. meta_result = session.run(meta_cypher, resource_id=int(resource_id))
  192. meta_list = []
  193. for meta_record in meta_result:
  194. meta = dict(meta_record["m"])
  195. meta["id"] = meta_record["m"].id
  196. meta_list.append(meta)
  197. data_resource["meta_list"] = meta_list
  198. return data_resource
  199. except Exception as e:
  200. logger.error(f"处理单个数据资源查询失败: {str(e)}")
  201. return None
  202. def id_resource_graph(resource_id):
  203. """获取数据资源图谱"""
  204. try:
  205. with neo4j_driver.get_session() as session:
  206. # 查询数据资源节点及其关系
  207. cypher = """
  208. MATCH (n:data_resource)-[r]-(m)
  209. WHERE id(n) = $resource_id
  210. RETURN n, r, m
  211. """
  212. result = session.run(cypher, resource_id=int(resource_id))
  213. # 收集节点和关系
  214. nodes = {}
  215. relationships = []
  216. for record in result:
  217. # 处理源节点
  218. source_node = dict(record["n"])
  219. source_node["id"] = record["n"].id
  220. nodes[source_node["id"]] = source_node
  221. # 处理目标节点
  222. target_node = dict(record["m"])
  223. target_node["id"] = record["m"].id
  224. nodes[target_node["id"]] = target_node
  225. # 处理关系
  226. rel = record["r"]
  227. relationship = {
  228. "id": rel.id,
  229. "source": record["n"].id,
  230. "target": record["m"].id,
  231. "type": rel.type
  232. }
  233. relationships.append(relationship)
  234. return {
  235. "nodes": list(nodes.values()),
  236. "relationships": relationships
  237. }
  238. except Exception as e:
  239. logger.error(f"获取数据资源图谱失败: {str(e)}")
  240. return {"nodes": [], "relationships": []}
  241. def resource_list(page, page_size, en_name_filter=None, name_filter=None,
  242. type_filter='all', category_filter=None, tag_filter=None):
  243. """获取数据资源列表"""
  244. try:
  245. with neo4j_driver.get_session() as session:
  246. # 构建查询条件
  247. match_clause = "MATCH (n:data_resource)"
  248. where_conditions = []
  249. if en_name_filter:
  250. where_conditions.append(f"n.en_name CONTAINS '{en_name_filter}'")
  251. if name_filter:
  252. where_conditions.append(f"n.name CONTAINS '{name_filter}'")
  253. if type_filter and type_filter != 'all':
  254. where_conditions.append(f"n.type = '{type_filter}'")
  255. if category_filter:
  256. where_conditions.append(f"n.category = '{category_filter}'")
  257. # 标签过滤需要额外的匹配
  258. if tag_filter:
  259. match_clause += "-[:label]->(t:data_label)"
  260. where_conditions.append(f"t.name = '{tag_filter}'")
  261. where_clause = " WHERE " + " AND ".join(where_conditions) if where_conditions else ""
  262. # 计算总数
  263. count_cypher = f"{match_clause}{where_clause} RETURN count(n) as count"
  264. count_result = session.run(count_cypher)
  265. total_count = count_result.single()["count"]
  266. # 分页查询
  267. skip = (page - 1) * page_size
  268. cypher = f"""
  269. {match_clause}{where_clause}
  270. RETURN n
  271. ORDER BY n.time DESC
  272. SKIP {skip} LIMIT {page_size}
  273. """
  274. result = session.run(cypher)
  275. # 格式化结果
  276. resources = []
  277. for record in result:
  278. node = dict(record["n"])
  279. node["id"] = record["n"].id
  280. # 查询关联的标签
  281. tag_cypher = """
  282. MATCH (n:data_resource)-[:label]->(t:data_label)
  283. WHERE id(n) = $resource_id
  284. RETURN t
  285. """
  286. tag_result = session.run(tag_cypher, resource_id=node["id"])
  287. tag_record = tag_result.single()
  288. if tag_record:
  289. tag = dict(tag_record["t"])
  290. tag["id"] = tag_record["t"].id
  291. node["tag_info"] = tag
  292. resources.append(node)
  293. return resources, total_count
  294. except Exception as e:
  295. logger.error(f"获取数据资源列表失败: {str(e)}")
  296. return [], 0
  297. def id_data_search_list(resource_id, page, page_size, en_name_filter=None,
  298. name_filter=None, category_filter=None, tag_filter=None):
  299. """获取特定数据资源关联的元数据列表"""
  300. try:
  301. with neo4j_driver.get_session() as session:
  302. # 基本匹配语句
  303. match_clause = """
  304. MATCH (n:data_resource)-[:contain]->(m:Metadata)
  305. WHERE id(n) = $resource_id
  306. """
  307. where_conditions = []
  308. if en_name_filter:
  309. where_conditions.append(f"m.en_name CONTAINS '{en_name_filter}'")
  310. if name_filter:
  311. where_conditions.append(f"m.name CONTAINS '{name_filter}'")
  312. if category_filter:
  313. where_conditions.append(f"m.category = '{category_filter}'")
  314. # 标签过滤需要额外的匹配
  315. tag_match = ""
  316. if tag_filter:
  317. tag_match = "MATCH (m)-[:HAS_TAG]->(t:Tag) WHERE t.name = $tag_filter"
  318. where_clause = " AND " + " AND ".join(where_conditions) if where_conditions else ""
  319. # 计算总数
  320. count_cypher = f"""
  321. {match_clause}{where_clause}
  322. {tag_match}
  323. RETURN count(m) as count
  324. """
  325. count_params = {"resource_id": int(resource_id)}
  326. if tag_filter:
  327. count_params["tag_filter"] = tag_filter
  328. count_result = session.run(count_cypher, **count_params)
  329. total_count = count_result.single()["count"]
  330. # 分页查询
  331. skip = (page - 1) * page_size
  332. cypher = f"""
  333. {match_clause}{where_clause}
  334. {tag_match}
  335. RETURN m
  336. ORDER BY m.name
  337. SKIP {skip} LIMIT {page_size}
  338. """
  339. result = session.run(cypher, **count_params)
  340. # 格式化结果
  341. metadata_list = []
  342. for record in result:
  343. node = dict(record["m"])
  344. node["id"] = record["m"].id
  345. metadata_list.append(node)
  346. return metadata_list, total_count
  347. except Exception as e:
  348. logger.error(f"获取数据资源关联的元数据列表失败: {str(e)}")
  349. return [], 0
  350. def resource_kinship_graph(resource_id, include_meta=True):
  351. """获取数据资源亲缘关系图谱"""
  352. try:
  353. with neo4j_driver.get_session() as session:
  354. # 基本查询
  355. cypher_parts = [
  356. "MATCH (n:data_resource) WHERE id(n) = $resource_id",
  357. "OPTIONAL MATCH (n)-[:label]->(l:data_label)",
  358. ]
  359. # 是否包含元数据
  360. if include_meta:
  361. cypher_parts.append("OPTIONAL MATCH (n)-[:contain]->(m:Metadata)")
  362. cypher_parts.append("RETURN n, l, collect(m) as metadata")
  363. cypher = "\n".join(cypher_parts)
  364. result = session.run(cypher, resource_id=int(resource_id))
  365. record = result.single()
  366. if not record:
  367. return {"nodes": [], "relationships": []}
  368. # 收集节点和关系
  369. nodes = {}
  370. relationships = []
  371. # 处理数据资源节点
  372. resource_node = dict(record["n"])
  373. resource_node["id"] = record["n"].id
  374. resource_node["labels"] = list(record["n"].labels)
  375. nodes[resource_node["id"]] = resource_node
  376. # 处理标签节点
  377. if record["l"]:
  378. label_node = dict(record["l"])
  379. label_node["id"] = record["l"].id
  380. label_node["labels"] = list(record["l"].labels)
  381. nodes[label_node["id"]] = label_node
  382. # 添加资源-标签关系
  383. relationships.append({
  384. "id": f"rel-{resource_node['id']}-label-{label_node['id']}",
  385. "source": resource_node["id"],
  386. "target": label_node["id"],
  387. "type": "label"
  388. })
  389. # 处理元数据节点
  390. if include_meta and record["metadata"]:
  391. for meta in record["metadata"]:
  392. if meta: # 检查元数据节点是否存在
  393. meta_node = dict(meta)
  394. meta_node["id"] = meta.id
  395. meta_node["labels"] = list(meta.labels)
  396. nodes[meta_node["id"]] = meta_node
  397. # 添加资源-元数据关系
  398. relationships.append({
  399. "id": f"rel-{resource_node['id']}-contain-{meta_node['id']}",
  400. "source": resource_node["id"],
  401. "target": meta_node["id"],
  402. "type": "contain"
  403. })
  404. return {
  405. "nodes": list(nodes.values()),
  406. "relationships": relationships
  407. }
  408. except Exception as e:
  409. logger.error(f"获取数据资源亲缘关系图谱失败: {str(e)}")
  410. return {"nodes": [], "relationships": []}
  411. def resource_impact_all_graph(resource_id, include_meta=True):
  412. """获取数据资源影响关系图谱"""
  413. try:
  414. with neo4j_driver.get_session() as session:
  415. # 根据meta参数决定查询深度
  416. if include_meta:
  417. cypher = """
  418. MATCH path = (n:data_resource)-[*1..3]-(m)
  419. WHERE id(n) = $resource_id
  420. RETURN path
  421. """
  422. else:
  423. cypher = """
  424. MATCH path = (n:data_resource)-[*1..2]-(m)
  425. WHERE id(n) = $resource_id
  426. AND NOT (m:Metadata)
  427. RETURN path
  428. """
  429. result = session.run(cypher, resource_id=int(resource_id))
  430. # 收集节点和关系
  431. nodes = {}
  432. relationships = {}
  433. for record in result:
  434. path = record["path"]
  435. # 处理路径中的所有节点
  436. for node in path.nodes:
  437. if node.id not in nodes:
  438. node_dict = dict(node)
  439. node_dict["id"] = node.id
  440. node_dict["labels"] = list(node.labels)
  441. nodes[node.id] = node_dict
  442. # 处理路径中的所有关系
  443. for rel in path.relationships:
  444. if rel.id not in relationships:
  445. rel_dict = {
  446. "id": rel.id,
  447. "source": rel.start_node.id,
  448. "target": rel.end_node.id,
  449. "type": rel.type
  450. }
  451. relationships[rel.id] = rel_dict
  452. return {
  453. "nodes": list(nodes.values()),
  454. "relationships": list(relationships.values())
  455. }
  456. except Exception as e:
  457. logger.error(f"获取数据资源影响关系图谱失败: {str(e)}")
  458. return {"nodes": [], "relationships": []}
  459. def clean_type(type_str):
  460. """清洗SQL类型字符串"""
  461. # 提取基本类型,不包括长度或精度信息
  462. basic_type = re.sub(r'\(.*?\)', '', type_str).strip().upper()
  463. # 移除 VARYING 这样的后缀
  464. basic_type = re.sub(r'\s+VARYING$', '', basic_type)
  465. # 标准化常见类型
  466. type_mapping = {
  467. 'INT': 'INTEGER',
  468. 'INT4': 'INTEGER',
  469. 'INT8': 'BIGINT',
  470. 'SMALLINT': 'SMALLINT',
  471. 'BIGINT': 'BIGINT',
  472. 'FLOAT4': 'FLOAT',
  473. 'FLOAT8': 'DOUBLE',
  474. 'REAL': 'FLOAT',
  475. 'DOUBLE PRECISION': 'DOUBLE',
  476. 'NUMERIC': 'DECIMAL',
  477. 'BOOL': 'BOOLEAN',
  478. 'CHARACTER': 'CHAR',
  479. 'CHAR VARYING': 'VARCHAR',
  480. 'CHARACTER VARYING': 'VARCHAR',
  481. 'TEXT': 'TEXT',
  482. 'DATE': 'DATE',
  483. 'TIME': 'TIME',
  484. 'TIMESTAMP': 'TIMESTAMP',
  485. 'TIMESTAMPTZ': 'TIMESTAMP WITH TIME ZONE',
  486. 'BYTEA': 'BINARY',
  487. 'JSON': 'JSON',
  488. 'JSONB': 'JSONB',
  489. 'UUID': 'UUID',
  490. 'SERIAL': 'SERIAL',
  491. 'SERIAL4': 'SERIAL',
  492. 'SERIAL8': 'BIGSERIAL',
  493. 'BIGSERIAL': 'BIGSERIAL'
  494. }
  495. # 尝试从映射表中获取标准化的类型
  496. return type_mapping.get(basic_type, basic_type)
  497. def clean_field_name(field_name):
  498. """清洗字段名"""
  499. return field_name.strip('`').strip('"').strip("'")
  500. def select_create_ddl(sql_content):
  501. """从SQL内容中提取创建表的DDL语句"""
  502. try:
  503. # 解析复杂的SQL文件,识别所有的CREATE TABLE语句及其关联的注释
  504. # 找到所有以CREATE TABLE开头的语句块,每个语句块包含主语句和相关的注释
  505. # 首先,分割 SQL 内容按分号
  506. statements = []
  507. current_statement = ""
  508. in_string = False
  509. string_quote = None
  510. for char in sql_content:
  511. if char in ["'", '"']:
  512. if not in_string:
  513. in_string = True
  514. string_quote = char
  515. elif char == string_quote:
  516. in_string = False
  517. string_quote = None
  518. current_statement += char
  519. elif char == ';' and not in_string:
  520. current_statement += char
  521. if current_statement.strip():
  522. statements.append(current_statement.strip())
  523. current_statement = ""
  524. else:
  525. current_statement += char
  526. if current_statement.strip():
  527. statements.append(current_statement.strip())
  528. # 找出所有的CREATE TABLE语句和关联的注释
  529. create_table_statements = []
  530. create_index = -1
  531. in_table_block = False
  532. current_table = None
  533. current_block = ""
  534. for i, stmt in enumerate(statements):
  535. if re.search(r'^\s*CREATE\s+TABLE', stmt, re.IGNORECASE):
  536. # 如果已经在处理表,先保存当前块
  537. if in_table_block and current_block:
  538. create_table_statements.append(current_block)
  539. # 开始新的表块
  540. in_table_block = True
  541. current_block = stmt
  542. # 提取表名
  543. table_match = re.search(r'CREATE\s+TABLE\s+(?:(?:"[^"]+"|\'[^\']+\'|[^"\'\s\.]+)\.)?(?:"([^"]+)"|\'([^\']+)\'|([^"\'\s\(]+))', stmt, re.IGNORECASE)
  544. if table_match:
  545. current_table = table_match.group(1) or table_match.group(2) or table_match.group(3)
  546. current_table = current_table.strip('"\'') if current_table else ""
  547. elif in_table_block and (re.search(r'COMMENT\s+ON\s+TABLE', stmt, re.IGNORECASE) or
  548. re.search(r'COMMENT\s+ON\s+COLUMN', stmt, re.IGNORECASE)):
  549. # 检查注释是否属于当前表
  550. if current_table:
  551. # 表注释处理
  552. if re.search(r'COMMENT\s+ON\s+TABLE', stmt, re.IGNORECASE):
  553. table_comment_match = re.search(r'COMMENT\s+ON\s+TABLE\s+[\'"]?(\w+)[\'"]?', stmt, re.IGNORECASE)
  554. if table_comment_match:
  555. comment_table = table_comment_match.group(1).strip('"\'')
  556. if comment_table == current_table:
  557. current_block += " " + stmt
  558. else:
  559. # 这是另一个表的注释,当前表的DDL到此结束
  560. create_table_statements.append(current_block)
  561. in_table_block = False
  562. current_block = ""
  563. current_table = None
  564. # 列注释处理
  565. elif re.search(r'COMMENT\s+ON\s+COLUMN', stmt, re.IGNORECASE):
  566. column_comment_match = re.search(
  567. r'COMMENT\s+ON\s+COLUMN\s+[\'"]?(\w+)[\'"]?\.[\'"]?(\w+)[\'"]?\s+IS\s+\'([^\']+)\'',
  568. stmt,
  569. re.IGNORECASE
  570. )
  571. if column_comment_match:
  572. comment_table = column_comment_match.group(1)
  573. if comment_table == current_table:
  574. current_block += " " + stmt
  575. else:
  576. # 这是另一个表的注释,当前表的DDL到此结束
  577. create_table_statements.append(current_block)
  578. in_table_block = False
  579. current_block = ""
  580. current_table = None
  581. elif in_table_block and re.search(r'^\s*CREATE\s+', stmt, re.IGNORECASE):
  582. # 如果遇到新的CREATE语句(不是注释),保存当前块并结束
  583. create_table_statements.append(current_block)
  584. in_table_block = False
  585. current_block = ""
  586. current_table = None
  587. # 添加最后一个块
  588. if in_table_block and current_block:
  589. create_table_statements.append(current_block)
  590. # 日志记录
  591. logger.debug(f"提取到 {len(create_table_statements)} 个DDL语句")
  592. for i, stmt in enumerate(create_table_statements):
  593. logger.debug(f"DDL语句 {i+1}: {stmt}")
  594. return create_table_statements
  595. except Exception as e:
  596. logger.error(f"提取DDL语句失败: {str(e)}")
  597. # logger.error(traceback.format_exc())
  598. return []
  599. def table_sql(sql):
  600. """解析表定义SQL,支持带schema和不带schema两种格式"""
  601. try:
  602. # 支持以下格式:
  603. # 1. CREATE TABLE tablename
  604. # 2. CREATE TABLE "tablename"
  605. # 3. CREATE TABLE 'tablename'
  606. # 4. CREATE TABLE schema.tablename
  607. # 5. CREATE TABLE "schema"."tablename"
  608. # 6. CREATE TABLE 'schema'.'tablename'
  609. # 匹配表名,支持带引号和不带引号的情况
  610. table_pattern = r'CREATE\s+TABLE\s+(?:(?:"([^"]+)"|\'([^\']+)\'|([^"\'\s\.]+))\.)?(?:"([^"]+)"|\'([^\']+)\'|([^"\'\s\(]+))'
  611. table_match = re.search(table_pattern, sql, re.IGNORECASE)
  612. if not table_match:
  613. logger.error(f"无法匹配CREATE TABLE语句: {sql[:100]}...")
  614. return None
  615. # 获取表名
  616. schema = table_match.group(1) or table_match.group(2) or table_match.group(3)
  617. table_name = table_match.group(4) or table_match.group(5) or table_match.group(6)
  618. if not table_name:
  619. logger.error("无法解析表名")
  620. return None
  621. logger.debug(f"解析到表名: {table_name}")
  622. # 提取CREATE TABLE语句的主体部分(括号内的内容)
  623. body_pattern = r'CREATE\s+TABLE\s+[^(]*\((.*?)\)(?=\s*;|\s*$)'
  624. body_match = re.search(body_pattern, sql, re.DOTALL | re.IGNORECASE)
  625. if not body_match:
  626. logger.error("无法提取表主体内容")
  627. return None
  628. body_text = body_match.group(1).strip()
  629. logger.debug(f"表定义主体部分: {body_text}")
  630. # 解析字段定义
  631. fields = []
  632. # 分割字段定义,处理括号嵌套和引号
  633. field_defs = []
  634. pos = 0
  635. in_parentheses = 0
  636. in_quotes = False
  637. quote_char = None
  638. for i, char in enumerate(body_text):
  639. if char in ["'", '"', '`'] and (not in_quotes or char == quote_char):
  640. in_quotes = not in_quotes
  641. if in_quotes:
  642. quote_char = char
  643. else:
  644. quote_char = None
  645. elif char == '(' and not in_quotes:
  646. in_parentheses += 1
  647. elif char == ')' and not in_quotes:
  648. in_parentheses -= 1
  649. elif char == ',' and in_parentheses == 0 and not in_quotes:
  650. field_defs.append(body_text[pos:i].strip())
  651. pos = i + 1
  652. # 添加最后一个字段定义
  653. if pos < len(body_text):
  654. field_defs.append(body_text[pos:].strip())
  655. logger.debug(f"解析出 {len(field_defs)} 个字段定义")
  656. # 处理每个字段定义
  657. for field_def in field_defs:
  658. # 跳过约束定义
  659. if re.match(r'^\s*(?:PRIMARY|UNIQUE|FOREIGN|CHECK|CONSTRAINT)\s+', field_def, re.IGNORECASE):
  660. continue
  661. # 提取字段名和类型
  662. field_pattern = r'^\s*(?:"([^"]+)"|\'([^\']+)\'|`([^`]+)`|([a-zA-Z0-9_]+))\s+(.+?)(?:\s+DEFAULT\s+|\s+NOT\s+NULL|\s+REFERENCES|\s*$)'
  663. field_match = re.search(field_pattern, field_def, re.IGNORECASE)
  664. if field_match:
  665. # 提取字段名
  666. field_name = field_match.group(1) or field_match.group(2) or field_match.group(3) or field_match.group(4)
  667. # 提取类型
  668. field_type = field_match.group(5).strip()
  669. # 处理类型中可能的括号
  670. type_base = re.split(r'\s+', field_type)[0]
  671. clean_type_value = clean_type(type_base)
  672. fields.append((field_name, clean_type_value))
  673. logger.debug(f"解析到字段: {field_name}, 类型: {clean_type_value}")
  674. else:
  675. logger.warning(f"无法解析字段定义: {field_def}")
  676. # 提取表注释
  677. table_comment = ""
  678. table_comment_pattern = r"COMMENT\s+ON\s+TABLE\s+(?:['\"]?(\w+)['\"]?)\s+IS\s+'([^']+)'"
  679. table_comment_match = re.search(table_comment_pattern, sql, re.IGNORECASE)
  680. if table_comment_match:
  681. comment_table = table_comment_match.group(1)
  682. if comment_table.strip("'\"") == table_name.strip("'\""):
  683. table_comment = table_comment_match.group(2)
  684. logger.debug(f"找到表注释: {table_comment}")
  685. # 提取列注释
  686. comments = {}
  687. column_comment_pattern = r"COMMENT\s+ON\s+COLUMN\s+['\"]?(\w+)['\"]?\.['\"]?(\w+)['\"]?\s+IS\s+'([^']+)'"
  688. for match in re.finditer(column_comment_pattern, sql, re.IGNORECASE):
  689. comment_table = match.group(1)
  690. column_name = match.group(2)
  691. comment = match.group(3)
  692. # 检查表名是否匹配
  693. if comment_table.strip("'\"") == table_name.strip("'\""):
  694. comments[column_name] = comment
  695. logger.debug(f"找到列注释: {column_name} - {comment}")
  696. else:
  697. logger.debug(f"忽略列注释,表名不匹配: {comment_table} vs {table_name}")
  698. # 检查字段和注释匹配情况
  699. logger.debug("========字段和注释匹配情况========")
  700. field_names = [f[0] for f in fields]
  701. logger.debug(f"字段列表 ({len(field_names)}): {field_names}")
  702. logger.debug(f"注释字段 ({len(comments)}): {list(comments.keys())}")
  703. # 构建返回结果
  704. meta_list = []
  705. for field_name, field_type in fields:
  706. chinese_name = comments.get(field_name, "")
  707. meta_list.append({
  708. "en_name": field_name,
  709. "data_type": field_type,
  710. "name": chinese_name if chinese_name else field_name
  711. })
  712. # 检查表是否存在
  713. try:
  714. status = status_query([table_name])
  715. except Exception as e:
  716. logger.error(f"检查表存在状态失败: {str(e)}")
  717. status = [False]
  718. # 构建返回结果
  719. result = {
  720. table_name: {
  721. "exist": status[0] if status else False,
  722. "meta": meta_list
  723. }
  724. }
  725. logger.debug(f"解析结果: {json.dumps(result, ensure_ascii=False)}")
  726. return result
  727. except Exception as e:
  728. logger.error(f"解析表定义SQL失败: {str(e)}")
  729. logger.error(f"异常详情: {e}")
  730. import traceback
  731. logger.error(traceback.format_exc())
  732. return None
  733. # 判断英文表名是否在图谱中存在
  734. def status_query(key_list):
  735. query = """
  736. unwind $Key_list as name
  737. OPTIONAL MATCH (n:data_model {en_name: name})
  738. OPTIONAL MATCH (n:data_resource {en_name: name})
  739. OPTIONAL MATCH (n:data_metric {en_name: name})
  740. WITH name, CASE
  741. WHEN n IS NOT NULL THEN True
  742. ELSE False
  743. END AS exist
  744. return collect(exist)AS exist
  745. """
  746. with neo4j_driver.get_session() as session:
  747. result = session.run(query, Key_list=key_list)
  748. data = result.value() # 获取单个值
  749. return data
  750. def select_sql(sql_query):
  751. """解析SELECT查询语句"""
  752. try:
  753. # 提取SELECT子句
  754. select_pattern = r'SELECT\s+(.*?)\s+FROM'
  755. select_match = re.search(select_pattern, sql_query, re.IGNORECASE | re.DOTALL)
  756. if not select_match:
  757. return None
  758. select_clause = select_match.group(1)
  759. # 分割字段
  760. fields = []
  761. # 处理字段列表,避免在函数调用中的逗号导致错误分割
  762. in_parenthesis = 0
  763. current_field = ""
  764. for char in select_clause:
  765. if char == '(':
  766. in_parenthesis += 1
  767. current_field += char
  768. elif char == ')':
  769. in_parenthesis -= 1
  770. current_field += char
  771. elif char == ',' and in_parenthesis == 0:
  772. fields.append(current_field.strip())
  773. current_field = ""
  774. else:
  775. current_field += char
  776. if current_field.strip():
  777. fields.append(current_field.strip())
  778. # 解析每个字段
  779. parsed_fields = []
  780. for field in fields:
  781. # 检查是否有字段别名
  782. alias_pattern = r'(.*?)\s+[aA][sS]\s+(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))$'
  783. alias_match = re.search(alias_pattern, field)
  784. if alias_match:
  785. field_expr = alias_match.group(1).strip()
  786. field_alias = next((g for g in alias_match.groups()[1:] if g is not None), "")
  787. parsed_fields.append({
  788. "expression": field_expr,
  789. "alias": field_alias
  790. })
  791. else:
  792. # 没有别名的情况
  793. parsed_fields.append({
  794. "expression": field.strip(),
  795. "alias": None
  796. })
  797. # 提取FROM子句和表名
  798. from_pattern = r'FROM\s+(.*?)(?:\s+WHERE|\s+GROUP|\s+HAVING|\s+ORDER|\s+LIMIT|$)'
  799. from_match = re.search(from_pattern, sql_query, re.IGNORECASE | re.DOTALL)
  800. tables = []
  801. if from_match:
  802. from_clause = from_match.group(1).strip()
  803. # 分析FROM子句中的表
  804. table_pattern = r'(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))(?:\s+(?:AS\s+)?(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))?'
  805. for match in re.finditer(table_pattern, from_clause):
  806. table_name = match.group(1) or match.group(2) or match.group(3) or match.group(4)
  807. if table_name:
  808. tables.append(table_name)
  809. return tables
  810. except Exception as e:
  811. logger.error(f"解析SELECT查询语句失败: {str(e)}")
  812. # logger.error(traceback.format_exc())
  813. return None
  814. def model_resource_list(page, page_size, name_filter=None):
  815. """获取模型资源列表"""
  816. try:
  817. with neo4j_driver.get_session() as session:
  818. # 构建查询条件
  819. match_clause = "MATCH (n:model_resource)"
  820. where_clause = ""
  821. if name_filter:
  822. where_clause = f" WHERE n.name CONTAINS '{name_filter}'"
  823. # 计算总数
  824. count_cypher = f"{match_clause}{where_clause} RETURN count(n) as count"
  825. count_result = session.run(count_cypher)
  826. total_count = count_result.single()["count"]
  827. # 分页查询
  828. skip = (page - 1) * page_size
  829. cypher = f"""
  830. {match_clause}{where_clause}
  831. RETURN n
  832. ORDER BY n.createTime DESC
  833. SKIP {skip} LIMIT {page_size}
  834. """
  835. result = session.run(cypher)
  836. # 格式化结果
  837. resources = []
  838. for record in result:
  839. node = dict(record["n"])
  840. node["id"] = record["n"].id
  841. resources.append(node)
  842. return resources, total_count
  843. except Exception as e:
  844. logger.error(f"获取模型资源列表失败: {str(e)}")
  845. return [], 0
  846. def data_resource_edit(data):
  847. """编辑数据资源"""
  848. try:
  849. resource_id = data.get("id")
  850. if not resource_id:
  851. raise ValueError("缺少资源ID")
  852. with neo4j_driver.get_session() as session:
  853. # 更新节点属性
  854. update_fields = {}
  855. for key, value in data.items():
  856. if key != "id" and key != "tag":
  857. update_fields[key] = value
  858. # 添加更新时间
  859. update_fields["updateTime"] = get_formatted_time()
  860. # 构建更新语句
  861. set_clause = ", ".join([f"n.{k} = ${k}" for k in update_fields.keys()])
  862. cypher = f"""
  863. MATCH (n:data_resource)
  864. WHERE id(n) = $resource_id
  865. SET {set_clause}
  866. RETURN n
  867. """
  868. result = session.run(cypher, resource_id=int(resource_id), **update_fields)
  869. updated_node = result.single()
  870. if not updated_node:
  871. raise ValueError("资源不存在")
  872. # 处理标签关系
  873. tag_id = data.get("tag")
  874. if tag_id:
  875. # 删除旧的标签关系
  876. delete_rel_cypher = """
  877. MATCH (n:data_resource)-[r:label]->()
  878. WHERE id(n) = $resource_id
  879. DELETE r
  880. """
  881. session.run(delete_rel_cypher, resource_id=int(resource_id))
  882. # 创建新的标签关系
  883. create_rel_cypher = """
  884. MATCH (n:data_resource), (t:data_label)
  885. WHERE id(n) = $resource_id AND id(t) = $tag_id
  886. CREATE (n)-[r:label]->(t)
  887. RETURN r
  888. """
  889. session.run(create_rel_cypher, resource_id=int(resource_id), tag_id=int(tag_id))
  890. # 返回更新后的节点
  891. return dict(updated_node["n"])
  892. except Exception as e:
  893. logger.error(f"编辑数据资源失败: {str(e)}")
  894. raise