resource.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838
  1. import json
  2. import re
  3. import logging
  4. from py2neo import Relationship
  5. import pandas as pd
  6. from app.services.neo4j_driver import neo4j_driver
  7. from app.services.package_function import create_or_get_node, relationship_exists, get_node
  8. logger = logging.getLogger("app")
  9. def get_formatted_time():
  10. """获取格式化的当前时间"""
  11. import time
  12. return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  13. def get_node_by_id(label, id):
  14. """根据ID获取指定标签的节点"""
  15. try:
  16. with neo4j_driver.get_session() as session:
  17. cypher = f"MATCH (n:{label}) WHERE id(n) = $id RETURN n"
  18. result = session.run(cypher, id=int(id))
  19. record = result.single()
  20. return record["n"] if record else None
  21. except Exception as e:
  22. logger.error(f"根据ID获取节点失败: {str(e)}")
  23. return None
  24. def get_node_by_id_no_label(id):
  25. """根据ID获取节点,不限制标签"""
  26. try:
  27. with neo4j_driver.get_session() as session:
  28. cypher = "MATCH (n) WHERE id(n) = $id RETURN n"
  29. result = session.run(cypher, id=int(id))
  30. record = result.single()
  31. return record["n"] if record else None
  32. except Exception as e:
  33. logger.error(f"根据ID获取节点失败: {str(e)}")
  34. return None
  35. def delete_relationships(start_node, rel_type=None, end_node=None):
  36. """删除关系"""
  37. try:
  38. with neo4j_driver.get_session() as session:
  39. if rel_type and end_node:
  40. cypher = "MATCH (a)-[r:`{rel_type}`]->(b) WHERE id(a) = $start_id AND id(b) = $end_id DELETE r"
  41. cypher = cypher.replace("{rel_type}", rel_type)
  42. session.run(cypher, start_id=start_node.id, end_id=end_node.id)
  43. elif rel_type:
  44. cypher = "MATCH (a)-[r:`{rel_type}`]->() WHERE id(a) = $start_id DELETE r"
  45. cypher = cypher.replace("{rel_type}", rel_type)
  46. session.run(cypher, start_id=start_node.id)
  47. else:
  48. cypher = "MATCH (a)-[r]->() WHERE id(a) = $start_id DELETE r"
  49. session.run(cypher, start_id=start_node.id)
  50. return True
  51. except Exception as e:
  52. logger.error(f"删除关系失败: {str(e)}")
  53. return False
  54. def update_or_create_node(label, **properties):
  55. """更新或创建节点"""
  56. try:
  57. with neo4j_driver.get_session() as session:
  58. node_id = properties.pop('id', None)
  59. if node_id:
  60. # 更新现有节点
  61. set_clause = ", ".join([f"n.{k} = ${k}" for k in properties.keys()])
  62. cypher = f"MATCH (n:{label}) WHERE id(n) = $id SET {set_clause} RETURN n"
  63. result = session.run(cypher, id=int(node_id), **properties)
  64. else:
  65. # 创建新节点
  66. props_str = ", ".join([f"{k}: ${k}" for k in properties.keys()])
  67. cypher = f"CREATE (n:{label} {{{props_str}}}) RETURN n"
  68. result = session.run(cypher, **properties)
  69. record = result.single()
  70. return record["n"] if record else None
  71. except Exception as e:
  72. logger.error(f"更新或创建节点失败: {str(e)}")
  73. return None
  74. # 数据资源-元数据 关系节点创建、查询
  75. def handle_node(receiver, head_data, data_resource):
  76. """处理数据资源节点创建和关系建立"""
  77. try:
  78. # 更新属性
  79. update_attributes = {
  80. 'en_name': data_resource['en_name'],
  81. 'time': get_formatted_time(),
  82. 'type': 'structure' # 结构化文件没有type
  83. }
  84. if 'additional_info' in receiver:
  85. del receiver['additional_info']
  86. tag_list = receiver.get('tag')
  87. receiver.update(update_attributes)
  88. # 创建或获取 data_resource 节点
  89. with neo4j_driver.get_session() as session:
  90. props_str = ", ".join([f"{k}: ${k}" for k in receiver.keys()])
  91. cypher = f"""
  92. MERGE (n:data_resource {{name: $name}})
  93. ON CREATE SET n = {{{props_str}}}
  94. ON MATCH SET {", ".join([f"n.{k} = ${k}" for k in receiver.keys()])}
  95. RETURN n
  96. """
  97. result = session.run(cypher, **receiver)
  98. data_resource_node = result.single()["n"]
  99. resource_id = data_resource_node.id
  100. # 处理标签关系
  101. if tag_list:
  102. tag_node = get_node_by_id('data_label', tag_list)
  103. if tag_node:
  104. # 检查关系是否存在
  105. rel_check = """
  106. MATCH (a:data_resource)-[r:label]->(b:data_label)
  107. WHERE id(a) = $resource_id AND id(b) = $tag_id
  108. RETURN r
  109. """
  110. rel_result = session.run(rel_check, resource_id=resource_id, tag_id=tag_node.id)
  111. # 如果关系不存在则创建
  112. if not rel_result.single():
  113. rel_create = """
  114. MATCH (a:data_resource), (b:data_label)
  115. WHERE id(a) = $resource_id AND id(b) = $tag_id
  116. CREATE (a)-[r:label]->(b)
  117. RETURN r
  118. """
  119. session.run(rel_create, resource_id=resource_id, tag_id=tag_node.id)
  120. # 处理头部数据(元数据)
  121. if head_data:
  122. for item in head_data:
  123. # 创建元数据节点
  124. meta_cypher = """
  125. MERGE (m:Metadata {name: $name})
  126. ON CREATE SET m.en_name = $en_name, m.createTime = $create_time
  127. RETURN m
  128. """
  129. create_time = get_formatted_time()
  130. meta_result = session.run(
  131. meta_cypher,
  132. name=item['name'],
  133. en_name=item['en_name'],
  134. create_time=create_time
  135. )
  136. meta_node = meta_result.single()["m"]
  137. # 创建关系
  138. rel_cypher = """
  139. MATCH (a:data_resource), (m:Metadata)
  140. WHERE id(a) = $resource_id AND id(m) = $meta_id
  141. MERGE (a)-[r:contain]->(m)
  142. RETURN r
  143. """
  144. session.run(
  145. rel_cypher,
  146. resource_id=resource_id,
  147. meta_id=meta_node.id
  148. )
  149. return resource_id
  150. except Exception as e:
  151. logger.error(f"处理数据资源节点创建和关系建立失败: {str(e)}")
  152. raise
  153. def handle_id_resource(resource_id):
  154. """处理单个数据资源查询"""
  155. try:
  156. with neo4j_driver.get_session() as session:
  157. # 查询数据资源节点
  158. cypher = """
  159. MATCH (n:data_resource)
  160. WHERE id(n) = $resource_id
  161. RETURN n
  162. """
  163. result = session.run(cypher, resource_id=int(resource_id))
  164. record = result.single()
  165. if not record:
  166. return None
  167. data_resource = dict(record["n"])
  168. data_resource["id"] = record["n"].id
  169. # 查询关联的标签
  170. tag_cypher = """
  171. MATCH (n:data_resource)-[:label]->(t:data_label)
  172. WHERE id(n) = $resource_id
  173. RETURN t
  174. """
  175. tag_result = session.run(tag_cypher, resource_id=int(resource_id))
  176. tag_record = tag_result.single()
  177. if tag_record:
  178. tag = dict(tag_record["t"])
  179. tag["id"] = tag_record["t"].id
  180. data_resource["tag_info"] = tag
  181. # 查询关联的元数据
  182. meta_cypher = """
  183. MATCH (n:data_resource)-[:contain]->(m:Metadata)
  184. WHERE id(n) = $resource_id
  185. RETURN m
  186. """
  187. meta_result = session.run(meta_cypher, resource_id=int(resource_id))
  188. meta_list = []
  189. for meta_record in meta_result:
  190. meta = dict(meta_record["m"])
  191. meta["id"] = meta_record["m"].id
  192. meta_list.append(meta)
  193. data_resource["meta_list"] = meta_list
  194. return data_resource
  195. except Exception as e:
  196. logger.error(f"处理单个数据资源查询失败: {str(e)}")
  197. return None
  198. def id_resource_graph(resource_id):
  199. """获取数据资源图谱"""
  200. try:
  201. with neo4j_driver.get_session() as session:
  202. # 查询数据资源节点及其关系
  203. cypher = """
  204. MATCH (n:data_resource)-[r]-(m)
  205. WHERE id(n) = $resource_id
  206. RETURN n, r, m
  207. """
  208. result = session.run(cypher, resource_id=int(resource_id))
  209. # 收集节点和关系
  210. nodes = {}
  211. relationships = []
  212. for record in result:
  213. # 处理源节点
  214. source_node = dict(record["n"])
  215. source_node["id"] = record["n"].id
  216. nodes[source_node["id"]] = source_node
  217. # 处理目标节点
  218. target_node = dict(record["m"])
  219. target_node["id"] = record["m"].id
  220. nodes[target_node["id"]] = target_node
  221. # 处理关系
  222. rel = record["r"]
  223. relationship = {
  224. "id": rel.id,
  225. "source": record["n"].id,
  226. "target": record["m"].id,
  227. "type": rel.type
  228. }
  229. relationships.append(relationship)
  230. return {
  231. "nodes": list(nodes.values()),
  232. "relationships": relationships
  233. }
  234. except Exception as e:
  235. logger.error(f"获取数据资源图谱失败: {str(e)}")
  236. return {"nodes": [], "relationships": []}
  237. def resource_list(page, page_size, en_name_filter=None, name_filter=None,
  238. type_filter='all', category_filter=None, tag_filter=None):
  239. """获取数据资源列表"""
  240. try:
  241. with neo4j_driver.get_session() as session:
  242. # 构建查询条件
  243. match_clause = "MATCH (n:data_resource)"
  244. where_conditions = []
  245. if en_name_filter:
  246. where_conditions.append(f"n.en_name CONTAINS '{en_name_filter}'")
  247. if name_filter:
  248. where_conditions.append(f"n.name CONTAINS '{name_filter}'")
  249. if type_filter and type_filter != 'all':
  250. where_conditions.append(f"n.type = '{type_filter}'")
  251. if category_filter:
  252. where_conditions.append(f"n.category = '{category_filter}'")
  253. # 标签过滤需要额外的匹配
  254. if tag_filter:
  255. match_clause += "-[:label]->(t:data_label)"
  256. where_conditions.append(f"t.name = '{tag_filter}'")
  257. where_clause = " WHERE " + " AND ".join(where_conditions) if where_conditions else ""
  258. # 计算总数
  259. count_cypher = f"{match_clause}{where_clause} RETURN count(n) as count"
  260. count_result = session.run(count_cypher)
  261. total_count = count_result.single()["count"]
  262. # 分页查询
  263. skip = (page - 1) * page_size
  264. cypher = f"""
  265. {match_clause}{where_clause}
  266. RETURN n
  267. ORDER BY n.time DESC
  268. SKIP {skip} LIMIT {page_size}
  269. """
  270. result = session.run(cypher)
  271. # 格式化结果
  272. resources = []
  273. for record in result:
  274. node = dict(record["n"])
  275. node["id"] = record["n"].id
  276. # 查询关联的标签
  277. tag_cypher = """
  278. MATCH (n:data_resource)-[:label]->(t:data_label)
  279. WHERE id(n) = $resource_id
  280. RETURN t
  281. """
  282. tag_result = session.run(tag_cypher, resource_id=node["id"])
  283. tag_record = tag_result.single()
  284. if tag_record:
  285. tag = dict(tag_record["t"])
  286. tag["id"] = tag_record["t"].id
  287. node["tag_info"] = tag
  288. resources.append(node)
  289. return resources, total_count
  290. except Exception as e:
  291. logger.error(f"获取数据资源列表失败: {str(e)}")
  292. return [], 0
  293. def id_data_search_list(resource_id, page, page_size, en_name_filter=None,
  294. name_filter=None, category_filter=None, tag_filter=None):
  295. """获取特定数据资源关联的元数据列表"""
  296. try:
  297. with neo4j_driver.get_session() as session:
  298. # 基本匹配语句
  299. match_clause = """
  300. MATCH (n:data_resource)-[:contain]->(m:Metadata)
  301. WHERE id(n) = $resource_id
  302. """
  303. where_conditions = []
  304. if en_name_filter:
  305. where_conditions.append(f"m.en_name CONTAINS '{en_name_filter}'")
  306. if name_filter:
  307. where_conditions.append(f"m.name CONTAINS '{name_filter}'")
  308. if category_filter:
  309. where_conditions.append(f"m.category = '{category_filter}'")
  310. # 标签过滤需要额外的匹配
  311. tag_match = ""
  312. if tag_filter:
  313. tag_match = "MATCH (m)-[:HAS_TAG]->(t:Tag) WHERE t.name = $tag_filter"
  314. where_clause = " AND " + " AND ".join(where_conditions) if where_conditions else ""
  315. # 计算总数
  316. count_cypher = f"""
  317. {match_clause}{where_clause}
  318. {tag_match}
  319. RETURN count(m) as count
  320. """
  321. count_params = {"resource_id": int(resource_id)}
  322. if tag_filter:
  323. count_params["tag_filter"] = tag_filter
  324. count_result = session.run(count_cypher, **count_params)
  325. total_count = count_result.single()["count"]
  326. # 分页查询
  327. skip = (page - 1) * page_size
  328. cypher = f"""
  329. {match_clause}{where_clause}
  330. {tag_match}
  331. RETURN m
  332. ORDER BY m.name
  333. SKIP {skip} LIMIT {page_size}
  334. """
  335. result = session.run(cypher, **count_params)
  336. # 格式化结果
  337. metadata_list = []
  338. for record in result:
  339. node = dict(record["m"])
  340. node["id"] = record["m"].id
  341. metadata_list.append(node)
  342. return metadata_list, total_count
  343. except Exception as e:
  344. logger.error(f"获取数据资源关联的元数据列表失败: {str(e)}")
  345. return [], 0
  346. def resource_kinship_graph(resource_id, include_meta=True):
  347. """获取数据资源亲缘关系图谱"""
  348. try:
  349. with neo4j_driver.get_session() as session:
  350. # 基本查询
  351. cypher_parts = [
  352. "MATCH (n:data_resource) WHERE id(n) = $resource_id",
  353. "OPTIONAL MATCH (n)-[:label]->(l:data_label)",
  354. ]
  355. # 是否包含元数据
  356. if include_meta:
  357. cypher_parts.append("OPTIONAL MATCH (n)-[:contain]->(m:Metadata)")
  358. cypher_parts.append("RETURN n, l, collect(m) as metadata")
  359. cypher = "\n".join(cypher_parts)
  360. result = session.run(cypher, resource_id=int(resource_id))
  361. record = result.single()
  362. if not record:
  363. return {"nodes": [], "relationships": []}
  364. # 收集节点和关系
  365. nodes = {}
  366. relationships = []
  367. # 处理数据资源节点
  368. resource_node = dict(record["n"])
  369. resource_node["id"] = record["n"].id
  370. resource_node["labels"] = list(record["n"].labels)
  371. nodes[resource_node["id"]] = resource_node
  372. # 处理标签节点
  373. if record["l"]:
  374. label_node = dict(record["l"])
  375. label_node["id"] = record["l"].id
  376. label_node["labels"] = list(record["l"].labels)
  377. nodes[label_node["id"]] = label_node
  378. # 添加资源-标签关系
  379. relationships.append({
  380. "id": f"rel-{resource_node['id']}-label-{label_node['id']}",
  381. "source": resource_node["id"],
  382. "target": label_node["id"],
  383. "type": "label"
  384. })
  385. # 处理元数据节点
  386. if include_meta and record["metadata"]:
  387. for meta in record["metadata"]:
  388. if meta: # 检查元数据节点是否存在
  389. meta_node = dict(meta)
  390. meta_node["id"] = meta.id
  391. meta_node["labels"] = list(meta.labels)
  392. nodes[meta_node["id"]] = meta_node
  393. # 添加资源-元数据关系
  394. relationships.append({
  395. "id": f"rel-{resource_node['id']}-contain-{meta_node['id']}",
  396. "source": resource_node["id"],
  397. "target": meta_node["id"],
  398. "type": "contain"
  399. })
  400. return {
  401. "nodes": list(nodes.values()),
  402. "relationships": relationships
  403. }
  404. except Exception as e:
  405. logger.error(f"获取数据资源亲缘关系图谱失败: {str(e)}")
  406. return {"nodes": [], "relationships": []}
  407. def resource_impact_all_graph(resource_id, include_meta=True):
  408. """获取数据资源影响关系图谱"""
  409. try:
  410. with neo4j_driver.get_session() as session:
  411. # 根据meta参数决定查询深度
  412. if include_meta:
  413. cypher = """
  414. MATCH path = (n:data_resource)-[*1..3]-(m)
  415. WHERE id(n) = $resource_id
  416. RETURN path
  417. """
  418. else:
  419. cypher = """
  420. MATCH path = (n:data_resource)-[*1..2]-(m)
  421. WHERE id(n) = $resource_id
  422. AND NOT (m:Metadata)
  423. RETURN path
  424. """
  425. result = session.run(cypher, resource_id=int(resource_id))
  426. # 收集节点和关系
  427. nodes = {}
  428. relationships = {}
  429. for record in result:
  430. path = record["path"]
  431. # 处理路径中的所有节点
  432. for node in path.nodes:
  433. if node.id not in nodes:
  434. node_dict = dict(node)
  435. node_dict["id"] = node.id
  436. node_dict["labels"] = list(node.labels)
  437. nodes[node.id] = node_dict
  438. # 处理路径中的所有关系
  439. for rel in path.relationships:
  440. if rel.id not in relationships:
  441. rel_dict = {
  442. "id": rel.id,
  443. "source": rel.start_node.id,
  444. "target": rel.end_node.id,
  445. "type": rel.type
  446. }
  447. relationships[rel.id] = rel_dict
  448. return {
  449. "nodes": list(nodes.values()),
  450. "relationships": list(relationships.values())
  451. }
  452. except Exception as e:
  453. logger.error(f"获取数据资源影响关系图谱失败: {str(e)}")
  454. return {"nodes": [], "relationships": []}
  455. def clean_type(type_str):
  456. """清洗SQL类型字符串"""
  457. return re.sub(r'\(.*?\)', '', type_str).strip().upper()
  458. def clean_field_name(field_name):
  459. """清洗字段名"""
  460. return field_name.strip('`').strip('"').strip("'")
  461. def select_create_ddl(sql_content):
  462. """从SQL内容中提取创建表的DDL语句"""
  463. create_pattern = r'CREATE\s+TABLE.*?;'
  464. matches = re.findall(create_pattern, sql_content, re.DOTALL | re.IGNORECASE)
  465. return matches
  466. def table_sql(sql):
  467. """解析表定义SQL"""
  468. try:
  469. # 提取表名
  470. table_name_pattern = r'CREATE\s+TABLE\s+(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))'
  471. table_name_match = re.search(table_name_pattern, sql, re.IGNORECASE)
  472. if not table_name_match:
  473. return None
  474. # 获取匹配的表名(从四个捕获组中选择非None的一个)
  475. table_name = next((g for g in table_name_match.groups() if g is not None), "")
  476. # 提取字段定义
  477. fields_pattern = r'CREATE\s+TABLE[^(]*\(\s*(.*?)\s*\)'
  478. fields_match = re.search(fields_pattern, sql, re.DOTALL | re.IGNORECASE)
  479. if not fields_match:
  480. return None
  481. fields_text = fields_match.group(1)
  482. # 分割字段定义
  483. field_definitions = []
  484. # 处理字段定义,避免在逗号内的括号中分割
  485. in_parenthesis = 0
  486. current_field = ""
  487. for char in fields_text:
  488. if char == '(':
  489. in_parenthesis += 1
  490. current_field += char
  491. elif char == ')':
  492. in_parenthesis -= 1
  493. current_field += char
  494. elif char == ',' and in_parenthesis == 0:
  495. field_definitions.append(current_field.strip())
  496. current_field = ""
  497. else:
  498. current_field += char
  499. if current_field.strip():
  500. field_definitions.append(current_field.strip())
  501. # 解析每个字段
  502. fields = []
  503. primary_keys = []
  504. for field_def in field_definitions:
  505. # 忽略PRIMARY KEY等约束定义
  506. if re.match(r'^\s*(?:PRIMARY|UNIQUE|FOREIGN|CHECK|CONSTRAINT)\s+', field_def, re.IGNORECASE):
  507. # 提取主键字段
  508. pk_pattern = r'PRIMARY\s+KEY\s*\(\s*(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))\s*\)'
  509. pk_match = re.search(pk_pattern, field_def, re.IGNORECASE)
  510. if pk_match:
  511. pk = next((g for g in pk_match.groups() if g is not None), "")
  512. primary_keys.append(pk)
  513. continue
  514. # 解析常规字段定义
  515. field_pattern = r'^\s*(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))\s+([A-Za-z0-9_]+(?:\s*\([^)]*\))?)'
  516. field_match = re.search(field_pattern, field_def)
  517. if field_match:
  518. # 提取字段名和类型
  519. field_name = next((g for g in field_match.groups()[:4] if g is not None), "")
  520. field_type = field_match.group(5)
  521. # 检查是否为主键
  522. is_primary = "PRIMARY KEY" in field_def.upper()
  523. if is_primary:
  524. primary_keys.append(field_name)
  525. # 检查是否为非空
  526. not_null = "NOT NULL" in field_def.upper()
  527. # 检查默认值
  528. default_match = re.search(r'DEFAULT\s+([^,\s]+)', field_def, re.IGNORECASE)
  529. default_value = default_match.group(1) if default_match else None
  530. # 添加字段信息
  531. field_info = {
  532. "name": field_name,
  533. "type": clean_type(field_type),
  534. "is_primary": is_primary,
  535. "not_null": not_null
  536. }
  537. if default_value:
  538. field_info["default"] = default_value
  539. fields.append(field_info)
  540. # 更新主键标记
  541. for field in fields:
  542. if field["name"] in primary_keys and not field["is_primary"]:
  543. field["is_primary"] = True
  544. # 返回结果
  545. return {
  546. "table_name": table_name,
  547. "fields": fields
  548. }
  549. except Exception as e:
  550. logger.error(f"解析表定义SQL失败: {str(e)}")
  551. return None
  552. def select_sql(sql_query):
  553. """解析SELECT查询语句"""
  554. try:
  555. # 提取SELECT子句
  556. select_pattern = r'SELECT\s+(.*?)\s+FROM'
  557. select_match = re.search(select_pattern, sql_query, re.IGNORECASE | re.DOTALL)
  558. if not select_match:
  559. return None
  560. select_clause = select_match.group(1)
  561. # 分割字段
  562. fields = []
  563. # 处理字段列表,避免在函数调用中的逗号导致错误分割
  564. in_parenthesis = 0
  565. current_field = ""
  566. for char in select_clause:
  567. if char == '(':
  568. in_parenthesis += 1
  569. current_field += char
  570. elif char == ')':
  571. in_parenthesis -= 1
  572. current_field += char
  573. elif char == ',' and in_parenthesis == 0:
  574. fields.append(current_field.strip())
  575. current_field = ""
  576. else:
  577. current_field += char
  578. if current_field.strip():
  579. fields.append(current_field.strip())
  580. # 解析每个字段
  581. parsed_fields = []
  582. for field in fields:
  583. # 检查是否有字段别名
  584. alias_pattern = r'(.*?)\s+[aA][sS]\s+(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))$'
  585. alias_match = re.search(alias_pattern, field)
  586. if alias_match:
  587. field_expr = alias_match.group(1).strip()
  588. field_alias = next((g for g in alias_match.groups()[1:] if g is not None), "")
  589. parsed_fields.append({
  590. "expression": field_expr,
  591. "alias": field_alias
  592. })
  593. else:
  594. # 没有别名的情况
  595. parsed_fields.append({
  596. "expression": field.strip(),
  597. "alias": None
  598. })
  599. # 提取FROM子句和表名
  600. from_pattern = r'FROM\s+(.*?)(?:\s+WHERE|\s+GROUP|\s+HAVING|\s+ORDER|\s+LIMIT|$)'
  601. from_match = re.search(from_pattern, sql_query, re.IGNORECASE | re.DOTALL)
  602. tables = []
  603. if from_match:
  604. from_clause = from_match.group(1).strip()
  605. # 分析FROM子句中的表
  606. table_pattern = r'(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))(?:\s+(?:AS\s+)?(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+)))?'
  607. table_matches = re.finditer(table_pattern, from_clause)
  608. for match in table_matches:
  609. table_name = next((g for g in match.groups()[:4] if g is not None), "")
  610. table_alias = next((g for g in match.groups()[4:] if g is not None), table_name)
  611. tables.append({
  612. "name": table_name,
  613. "alias": table_alias
  614. })
  615. return {
  616. "fields": parsed_fields,
  617. "tables": tables
  618. }
  619. except Exception as e:
  620. logger.error(f"解析SELECT查询语句失败: {str(e)}")
  621. return None
  622. def model_resource_list(page, page_size, name_filter=None):
  623. """获取模型资源列表"""
  624. try:
  625. with neo4j_driver.get_session() as session:
  626. # 构建查询条件
  627. match_clause = "MATCH (n:model_resource)"
  628. where_clause = ""
  629. if name_filter:
  630. where_clause = f" WHERE n.name CONTAINS '{name_filter}'"
  631. # 计算总数
  632. count_cypher = f"{match_clause}{where_clause} RETURN count(n) as count"
  633. count_result = session.run(count_cypher)
  634. total_count = count_result.single()["count"]
  635. # 分页查询
  636. skip = (page - 1) * page_size
  637. cypher = f"""
  638. {match_clause}{where_clause}
  639. RETURN n
  640. ORDER BY n.createTime DESC
  641. SKIP {skip} LIMIT {page_size}
  642. """
  643. result = session.run(cypher)
  644. # 格式化结果
  645. resources = []
  646. for record in result:
  647. node = dict(record["n"])
  648. node["id"] = record["n"].id
  649. resources.append(node)
  650. return resources, total_count
  651. except Exception as e:
  652. logger.error(f"获取模型资源列表失败: {str(e)}")
  653. return [], 0
  654. def data_resource_edit(data):
  655. """编辑数据资源"""
  656. try:
  657. resource_id = data.get("id")
  658. if not resource_id:
  659. raise ValueError("缺少资源ID")
  660. with neo4j_driver.get_session() as session:
  661. # 更新节点属性
  662. update_fields = {}
  663. for key, value in data.items():
  664. if key != "id" and key != "tag":
  665. update_fields[key] = value
  666. # 添加更新时间
  667. update_fields["updateTime"] = get_formatted_time()
  668. # 构建更新语句
  669. set_clause = ", ".join([f"n.{k} = ${k}" for k in update_fields.keys()])
  670. cypher = f"""
  671. MATCH (n:data_resource)
  672. WHERE id(n) = $resource_id
  673. SET {set_clause}
  674. RETURN n
  675. """
  676. result = session.run(cypher, resource_id=int(resource_id), **update_fields)
  677. updated_node = result.single()
  678. if not updated_node:
  679. raise ValueError("资源不存在")
  680. # 处理标签关系
  681. tag_id = data.get("tag")
  682. if tag_id:
  683. # 删除旧的标签关系
  684. delete_rel_cypher = """
  685. MATCH (n:data_resource)-[r:label]->()
  686. WHERE id(n) = $resource_id
  687. DELETE r
  688. """
  689. session.run(delete_rel_cypher, resource_id=int(resource_id))
  690. # 创建新的标签关系
  691. create_rel_cypher = """
  692. MATCH (n:data_resource), (t:data_label)
  693. WHERE id(n) = $resource_id AND id(t) = $tag_id
  694. CREATE (n)-[r:label]->(t)
  695. RETURN r
  696. """
  697. session.run(create_rel_cypher, resource_id=int(resource_id), tag_id=int(tag_id))
  698. # 返回更新后的节点
  699. return dict(updated_node["n"])
  700. except Exception as e:
  701. logger.error(f"编辑数据资源失败: {str(e)}")
  702. raise