123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838 |
- import json
- import re
- import logging
- from py2neo import Relationship
- import pandas as pd
- from app.services.neo4j_driver import neo4j_driver
- from app.services.package_function import create_or_get_node, relationship_exists, get_node
- logger = logging.getLogger("app")
- def get_formatted_time():
- """获取格式化的当前时间"""
- import time
- return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
- def get_node_by_id(label, id):
- """根据ID获取指定标签的节点"""
- try:
- with neo4j_driver.get_session() as session:
- cypher = f"MATCH (n:{label}) WHERE id(n) = $id RETURN n"
- result = session.run(cypher, id=int(id))
- record = result.single()
- return record["n"] if record else None
- except Exception as e:
- logger.error(f"根据ID获取节点失败: {str(e)}")
- return None
- def get_node_by_id_no_label(id):
- """根据ID获取节点,不限制标签"""
- try:
- with neo4j_driver.get_session() as session:
- cypher = "MATCH (n) WHERE id(n) = $id RETURN n"
- result = session.run(cypher, id=int(id))
- record = result.single()
- return record["n"] if record else None
- except Exception as e:
- logger.error(f"根据ID获取节点失败: {str(e)}")
- return None
- def delete_relationships(start_node, rel_type=None, end_node=None):
- """删除关系"""
- try:
- with neo4j_driver.get_session() as session:
- if rel_type and end_node:
- cypher = "MATCH (a)-[r:`{rel_type}`]->(b) WHERE id(a) = $start_id AND id(b) = $end_id DELETE r"
- cypher = cypher.replace("{rel_type}", rel_type)
- session.run(cypher, start_id=start_node.id, end_id=end_node.id)
- elif rel_type:
- cypher = "MATCH (a)-[r:`{rel_type}`]->() WHERE id(a) = $start_id DELETE r"
- cypher = cypher.replace("{rel_type}", rel_type)
- session.run(cypher, start_id=start_node.id)
- else:
- cypher = "MATCH (a)-[r]->() WHERE id(a) = $start_id DELETE r"
- session.run(cypher, start_id=start_node.id)
- return True
- except Exception as e:
- logger.error(f"删除关系失败: {str(e)}")
- return False
- def update_or_create_node(label, **properties):
- """更新或创建节点"""
- try:
- with neo4j_driver.get_session() as session:
- node_id = properties.pop('id', None)
- if node_id:
- # 更新现有节点
- set_clause = ", ".join([f"n.{k} = ${k}" for k in properties.keys()])
- cypher = f"MATCH (n:{label}) WHERE id(n) = $id SET {set_clause} RETURN n"
- result = session.run(cypher, id=int(node_id), **properties)
- else:
- # 创建新节点
- props_str = ", ".join([f"{k}: ${k}" for k in properties.keys()])
- cypher = f"CREATE (n:{label} {{{props_str}}}) RETURN n"
- result = session.run(cypher, **properties)
-
- record = result.single()
- return record["n"] if record else None
- except Exception as e:
- logger.error(f"更新或创建节点失败: {str(e)}")
- return None
- # 数据资源-元数据 关系节点创建、查询
- def handle_node(receiver, head_data, data_resource):
- """处理数据资源节点创建和关系建立"""
- try:
- # 更新属性
- update_attributes = {
- 'en_name': data_resource['en_name'],
- 'time': get_formatted_time(),
- 'type': 'structure' # 结构化文件没有type
- }
- if 'additional_info' in receiver:
- del receiver['additional_info']
- tag_list = receiver.get('tag')
- receiver.update(update_attributes)
- # 创建或获取 data_resource 节点
- with neo4j_driver.get_session() as session:
- props_str = ", ".join([f"{k}: ${k}" for k in receiver.keys()])
- cypher = f"""
- MERGE (n:data_resource {{name: $name}})
- ON CREATE SET n = {{{props_str}}}
- ON MATCH SET {", ".join([f"n.{k} = ${k}" for k in receiver.keys()])}
- RETURN n
- """
- result = session.run(cypher, **receiver)
- data_resource_node = result.single()["n"]
- resource_id = data_resource_node.id
- # 处理标签关系
- if tag_list:
- tag_node = get_node_by_id('data_label', tag_list)
- if tag_node:
- # 检查关系是否存在
- rel_check = """
- MATCH (a:data_resource)-[r:label]->(b:data_label)
- WHERE id(a) = $resource_id AND id(b) = $tag_id
- RETURN r
- """
- rel_result = session.run(rel_check, resource_id=resource_id, tag_id=tag_node.id)
-
- # 如果关系不存在则创建
- if not rel_result.single():
- rel_create = """
- MATCH (a:data_resource), (b:data_label)
- WHERE id(a) = $resource_id AND id(b) = $tag_id
- CREATE (a)-[r:label]->(b)
- RETURN r
- """
- session.run(rel_create, resource_id=resource_id, tag_id=tag_node.id)
- # 处理头部数据(元数据)
- if head_data:
- for item in head_data:
- # 创建元数据节点
- meta_cypher = """
- MERGE (m:Metadata {name: $name})
- ON CREATE SET m.en_name = $en_name, m.createTime = $create_time
- RETURN m
- """
-
- create_time = get_formatted_time()
- meta_result = session.run(
- meta_cypher,
- name=item['name'],
- en_name=item['en_name'],
- create_time=create_time
- )
- meta_node = meta_result.single()["m"]
-
- # 创建关系
- rel_cypher = """
- MATCH (a:data_resource), (m:Metadata)
- WHERE id(a) = $resource_id AND id(m) = $meta_id
- MERGE (a)-[r:contain]->(m)
- RETURN r
- """
-
- session.run(
- rel_cypher,
- resource_id=resource_id,
- meta_id=meta_node.id
- )
-
- return resource_id
- except Exception as e:
- logger.error(f"处理数据资源节点创建和关系建立失败: {str(e)}")
- raise
- def handle_id_resource(resource_id):
- """处理单个数据资源查询"""
- try:
- with neo4j_driver.get_session() as session:
- # 查询数据资源节点
- cypher = """
- MATCH (n:data_resource)
- WHERE id(n) = $resource_id
- RETURN n
- """
- result = session.run(cypher, resource_id=int(resource_id))
- record = result.single()
-
- if not record:
- return None
-
- data_resource = dict(record["n"])
- data_resource["id"] = record["n"].id
-
- # 查询关联的标签
- tag_cypher = """
- MATCH (n:data_resource)-[:label]->(t:data_label)
- WHERE id(n) = $resource_id
- RETURN t
- """
- tag_result = session.run(tag_cypher, resource_id=int(resource_id))
- tag_record = tag_result.single()
-
- if tag_record:
- tag = dict(tag_record["t"])
- tag["id"] = tag_record["t"].id
- data_resource["tag_info"] = tag
-
- # 查询关联的元数据
- meta_cypher = """
- MATCH (n:data_resource)-[:contain]->(m:Metadata)
- WHERE id(n) = $resource_id
- RETURN m
- """
- meta_result = session.run(meta_cypher, resource_id=int(resource_id))
-
- meta_list = []
- for meta_record in meta_result:
- meta = dict(meta_record["m"])
- meta["id"] = meta_record["m"].id
- meta_list.append(meta)
-
- data_resource["meta_list"] = meta_list
-
- return data_resource
- except Exception as e:
- logger.error(f"处理单个数据资源查询失败: {str(e)}")
- return None
- def id_resource_graph(resource_id):
- """获取数据资源图谱"""
- try:
- with neo4j_driver.get_session() as session:
- # 查询数据资源节点及其关系
- cypher = """
- MATCH (n:data_resource)-[r]-(m)
- WHERE id(n) = $resource_id
- RETURN n, r, m
- """
- result = session.run(cypher, resource_id=int(resource_id))
-
- # 收集节点和关系
- nodes = {}
- relationships = []
-
- for record in result:
- # 处理源节点
- source_node = dict(record["n"])
- source_node["id"] = record["n"].id
- nodes[source_node["id"]] = source_node
-
- # 处理目标节点
- target_node = dict(record["m"])
- target_node["id"] = record["m"].id
- nodes[target_node["id"]] = target_node
-
- # 处理关系
- rel = record["r"]
- relationship = {
- "id": rel.id,
- "source": record["n"].id,
- "target": record["m"].id,
- "type": rel.type
- }
- relationships.append(relationship)
-
- return {
- "nodes": list(nodes.values()),
- "relationships": relationships
- }
- except Exception as e:
- logger.error(f"获取数据资源图谱失败: {str(e)}")
- return {"nodes": [], "relationships": []}
- def resource_list(page, page_size, en_name_filter=None, name_filter=None,
- type_filter='all', category_filter=None, tag_filter=None):
- """获取数据资源列表"""
- try:
- with neo4j_driver.get_session() as session:
- # 构建查询条件
- match_clause = "MATCH (n:data_resource)"
- where_conditions = []
-
- if en_name_filter:
- where_conditions.append(f"n.en_name CONTAINS '{en_name_filter}'")
-
- if name_filter:
- where_conditions.append(f"n.name CONTAINS '{name_filter}'")
-
- if type_filter and type_filter != 'all':
- where_conditions.append(f"n.type = '{type_filter}'")
-
- if category_filter:
- where_conditions.append(f"n.category = '{category_filter}'")
-
- # 标签过滤需要额外的匹配
- if tag_filter:
- match_clause += "-[:label]->(t:data_label)"
- where_conditions.append(f"t.name = '{tag_filter}'")
-
- where_clause = " WHERE " + " AND ".join(where_conditions) if where_conditions else ""
-
- # 计算总数
- count_cypher = f"{match_clause}{where_clause} RETURN count(n) as count"
- count_result = session.run(count_cypher)
- total_count = count_result.single()["count"]
-
- # 分页查询
- skip = (page - 1) * page_size
- cypher = f"""
- {match_clause}{where_clause}
- RETURN n
- ORDER BY n.time DESC
- SKIP {skip} LIMIT {page_size}
- """
- result = session.run(cypher)
-
- # 格式化结果
- resources = []
- for record in result:
- node = dict(record["n"])
- node["id"] = record["n"].id
-
- # 查询关联的标签
- tag_cypher = """
- MATCH (n:data_resource)-[:label]->(t:data_label)
- WHERE id(n) = $resource_id
- RETURN t
- """
- tag_result = session.run(tag_cypher, resource_id=node["id"])
- tag_record = tag_result.single()
-
- if tag_record:
- tag = dict(tag_record["t"])
- tag["id"] = tag_record["t"].id
- node["tag_info"] = tag
-
- resources.append(node)
-
- return resources, total_count
- except Exception as e:
- logger.error(f"获取数据资源列表失败: {str(e)}")
- return [], 0
- def id_data_search_list(resource_id, page, page_size, en_name_filter=None,
- name_filter=None, category_filter=None, tag_filter=None):
- """获取特定数据资源关联的元数据列表"""
- try:
- with neo4j_driver.get_session() as session:
- # 基本匹配语句
- match_clause = """
- MATCH (n:data_resource)-[:contain]->(m:Metadata)
- WHERE id(n) = $resource_id
- """
- where_conditions = []
-
- if en_name_filter:
- where_conditions.append(f"m.en_name CONTAINS '{en_name_filter}'")
-
- if name_filter:
- where_conditions.append(f"m.name CONTAINS '{name_filter}'")
-
- if category_filter:
- where_conditions.append(f"m.category = '{category_filter}'")
-
- # 标签过滤需要额外的匹配
- tag_match = ""
- if tag_filter:
- tag_match = "MATCH (m)-[:HAS_TAG]->(t:Tag) WHERE t.name = $tag_filter"
-
- where_clause = " AND " + " AND ".join(where_conditions) if where_conditions else ""
-
- # 计算总数
- count_cypher = f"""
- {match_clause}{where_clause}
- {tag_match}
- RETURN count(m) as count
- """
- count_params = {"resource_id": int(resource_id)}
- if tag_filter:
- count_params["tag_filter"] = tag_filter
-
- count_result = session.run(count_cypher, **count_params)
- total_count = count_result.single()["count"]
-
- # 分页查询
- skip = (page - 1) * page_size
- cypher = f"""
- {match_clause}{where_clause}
- {tag_match}
- RETURN m
- ORDER BY m.name
- SKIP {skip} LIMIT {page_size}
- """
-
- result = session.run(cypher, **count_params)
-
- # 格式化结果
- metadata_list = []
- for record in result:
- node = dict(record["m"])
- node["id"] = record["m"].id
- metadata_list.append(node)
-
- return metadata_list, total_count
- except Exception as e:
- logger.error(f"获取数据资源关联的元数据列表失败: {str(e)}")
- return [], 0
- def resource_kinship_graph(resource_id, include_meta=True):
- """获取数据资源亲缘关系图谱"""
- try:
- with neo4j_driver.get_session() as session:
- # 基本查询
- cypher_parts = [
- "MATCH (n:data_resource) WHERE id(n) = $resource_id",
- "OPTIONAL MATCH (n)-[:label]->(l:data_label)",
- ]
-
- # 是否包含元数据
- if include_meta:
- cypher_parts.append("OPTIONAL MATCH (n)-[:contain]->(m:Metadata)")
-
- cypher_parts.append("RETURN n, l, collect(m) as metadata")
-
- cypher = "\n".join(cypher_parts)
-
- result = session.run(cypher, resource_id=int(resource_id))
- record = result.single()
-
- if not record:
- return {"nodes": [], "relationships": []}
-
- # 收集节点和关系
- nodes = {}
- relationships = []
-
- # 处理数据资源节点
- resource_node = dict(record["n"])
- resource_node["id"] = record["n"].id
- resource_node["labels"] = list(record["n"].labels)
- nodes[resource_node["id"]] = resource_node
-
- # 处理标签节点
- if record["l"]:
- label_node = dict(record["l"])
- label_node["id"] = record["l"].id
- label_node["labels"] = list(record["l"].labels)
- nodes[label_node["id"]] = label_node
-
- # 添加资源-标签关系
- relationships.append({
- "id": f"rel-{resource_node['id']}-label-{label_node['id']}",
- "source": resource_node["id"],
- "target": label_node["id"],
- "type": "label"
- })
-
- # 处理元数据节点
- if include_meta and record["metadata"]:
- for meta in record["metadata"]:
- if meta: # 检查元数据节点是否存在
- meta_node = dict(meta)
- meta_node["id"] = meta.id
- meta_node["labels"] = list(meta.labels)
- nodes[meta_node["id"]] = meta_node
-
- # 添加资源-元数据关系
- relationships.append({
- "id": f"rel-{resource_node['id']}-contain-{meta_node['id']}",
- "source": resource_node["id"],
- "target": meta_node["id"],
- "type": "contain"
- })
-
- return {
- "nodes": list(nodes.values()),
- "relationships": relationships
- }
- except Exception as e:
- logger.error(f"获取数据资源亲缘关系图谱失败: {str(e)}")
- return {"nodes": [], "relationships": []}
- def resource_impact_all_graph(resource_id, include_meta=True):
- """获取数据资源影响关系图谱"""
- try:
- with neo4j_driver.get_session() as session:
- # 根据meta参数决定查询深度
- if include_meta:
- cypher = """
- MATCH path = (n:data_resource)-[*1..3]-(m)
- WHERE id(n) = $resource_id
- RETURN path
- """
- else:
- cypher = """
- MATCH path = (n:data_resource)-[*1..2]-(m)
- WHERE id(n) = $resource_id
- AND NOT (m:Metadata)
- RETURN path
- """
-
- result = session.run(cypher, resource_id=int(resource_id))
-
- # 收集节点和关系
- nodes = {}
- relationships = {}
-
- for record in result:
- path = record["path"]
-
- # 处理路径中的所有节点
- for node in path.nodes:
- if node.id not in nodes:
- node_dict = dict(node)
- node_dict["id"] = node.id
- node_dict["labels"] = list(node.labels)
- nodes[node.id] = node_dict
-
- # 处理路径中的所有关系
- for rel in path.relationships:
- if rel.id not in relationships:
- rel_dict = {
- "id": rel.id,
- "source": rel.start_node.id,
- "target": rel.end_node.id,
- "type": rel.type
- }
- relationships[rel.id] = rel_dict
-
- return {
- "nodes": list(nodes.values()),
- "relationships": list(relationships.values())
- }
- except Exception as e:
- logger.error(f"获取数据资源影响关系图谱失败: {str(e)}")
- return {"nodes": [], "relationships": []}
- def clean_type(type_str):
- """清洗SQL类型字符串"""
- return re.sub(r'\(.*?\)', '', type_str).strip().upper()
- def clean_field_name(field_name):
- """清洗字段名"""
- return field_name.strip('`').strip('"').strip("'")
- def select_create_ddl(sql_content):
- """从SQL内容中提取创建表的DDL语句"""
- create_pattern = r'CREATE\s+TABLE.*?;'
- matches = re.findall(create_pattern, sql_content, re.DOTALL | re.IGNORECASE)
- return matches
- def table_sql(sql):
- """解析表定义SQL"""
- try:
- # 提取表名
- table_name_pattern = r'CREATE\s+TABLE\s+(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))'
- table_name_match = re.search(table_name_pattern, sql, re.IGNORECASE)
-
- if not table_name_match:
- return None
-
- # 获取匹配的表名(从四个捕获组中选择非None的一个)
- table_name = next((g for g in table_name_match.groups() if g is not None), "")
-
- # 提取字段定义
- fields_pattern = r'CREATE\s+TABLE[^(]*\(\s*(.*?)\s*\)'
- fields_match = re.search(fields_pattern, sql, re.DOTALL | re.IGNORECASE)
-
- if not fields_match:
- return None
-
- fields_text = fields_match.group(1)
-
- # 分割字段定义
- field_definitions = []
-
- # 处理字段定义,避免在逗号内的括号中分割
- in_parenthesis = 0
- current_field = ""
-
- for char in fields_text:
- if char == '(':
- in_parenthesis += 1
- current_field += char
- elif char == ')':
- in_parenthesis -= 1
- current_field += char
- elif char == ',' and in_parenthesis == 0:
- field_definitions.append(current_field.strip())
- current_field = ""
- else:
- current_field += char
-
- if current_field.strip():
- field_definitions.append(current_field.strip())
-
- # 解析每个字段
- fields = []
- primary_keys = []
-
- for field_def in field_definitions:
- # 忽略PRIMARY KEY等约束定义
- if re.match(r'^\s*(?:PRIMARY|UNIQUE|FOREIGN|CHECK|CONSTRAINT)\s+', field_def, re.IGNORECASE):
- # 提取主键字段
- pk_pattern = r'PRIMARY\s+KEY\s*\(\s*(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))\s*\)'
- pk_match = re.search(pk_pattern, field_def, re.IGNORECASE)
-
- if pk_match:
- pk = next((g for g in pk_match.groups() if g is not None), "")
- primary_keys.append(pk)
- continue
-
- # 解析常规字段定义
- field_pattern = r'^\s*(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))\s+([A-Za-z0-9_]+(?:\s*\([^)]*\))?)'
- field_match = re.search(field_pattern, field_def)
-
- if field_match:
- # 提取字段名和类型
- field_name = next((g for g in field_match.groups()[:4] if g is not None), "")
- field_type = field_match.group(5)
-
- # 检查是否为主键
- is_primary = "PRIMARY KEY" in field_def.upper()
- if is_primary:
- primary_keys.append(field_name)
-
- # 检查是否为非空
- not_null = "NOT NULL" in field_def.upper()
-
- # 检查默认值
- default_match = re.search(r'DEFAULT\s+([^,\s]+)', field_def, re.IGNORECASE)
- default_value = default_match.group(1) if default_match else None
-
- # 添加字段信息
- field_info = {
- "name": field_name,
- "type": clean_type(field_type),
- "is_primary": is_primary,
- "not_null": not_null
- }
-
- if default_value:
- field_info["default"] = default_value
-
- fields.append(field_info)
-
- # 更新主键标记
- for field in fields:
- if field["name"] in primary_keys and not field["is_primary"]:
- field["is_primary"] = True
-
- # 返回结果
- return {
- "table_name": table_name,
- "fields": fields
- }
- except Exception as e:
- logger.error(f"解析表定义SQL失败: {str(e)}")
- return None
- def select_sql(sql_query):
- """解析SELECT查询语句"""
- try:
- # 提取SELECT子句
- select_pattern = r'SELECT\s+(.*?)\s+FROM'
- select_match = re.search(select_pattern, sql_query, re.IGNORECASE | re.DOTALL)
-
- if not select_match:
- return None
-
- select_clause = select_match.group(1)
-
- # 分割字段
- fields = []
-
- # 处理字段列表,避免在函数调用中的逗号导致错误分割
- in_parenthesis = 0
- current_field = ""
-
- for char in select_clause:
- if char == '(':
- in_parenthesis += 1
- current_field += char
- elif char == ')':
- in_parenthesis -= 1
- current_field += char
- elif char == ',' and in_parenthesis == 0:
- fields.append(current_field.strip())
- current_field = ""
- else:
- current_field += char
-
- if current_field.strip():
- fields.append(current_field.strip())
-
- # 解析每个字段
- parsed_fields = []
-
- for field in fields:
- # 检查是否有字段别名
- alias_pattern = r'(.*?)\s+[aA][sS]\s+(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))$'
- alias_match = re.search(alias_pattern, field)
-
- if alias_match:
- field_expr = alias_match.group(1).strip()
- field_alias = next((g for g in alias_match.groups()[1:] if g is not None), "")
-
- parsed_fields.append({
- "expression": field_expr,
- "alias": field_alias
- })
- else:
- # 没有别名的情况
- parsed_fields.append({
- "expression": field.strip(),
- "alias": None
- })
-
- # 提取FROM子句和表名
- from_pattern = r'FROM\s+(.*?)(?:\s+WHERE|\s+GROUP|\s+HAVING|\s+ORDER|\s+LIMIT|$)'
- from_match = re.search(from_pattern, sql_query, re.IGNORECASE | re.DOTALL)
-
- tables = []
- if from_match:
- from_clause = from_match.group(1).strip()
-
- # 分析FROM子句中的表
- table_pattern = r'(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))(?:\s+(?:AS\s+)?(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+)))?'
- table_matches = re.finditer(table_pattern, from_clause)
-
- for match in table_matches:
- table_name = next((g for g in match.groups()[:4] if g is not None), "")
- table_alias = next((g for g in match.groups()[4:] if g is not None), table_name)
-
- tables.append({
- "name": table_name,
- "alias": table_alias
- })
-
- return {
- "fields": parsed_fields,
- "tables": tables
- }
- except Exception as e:
- logger.error(f"解析SELECT查询语句失败: {str(e)}")
- return None
- def model_resource_list(page, page_size, name_filter=None):
- """获取模型资源列表"""
- try:
- with neo4j_driver.get_session() as session:
- # 构建查询条件
- match_clause = "MATCH (n:model_resource)"
- where_clause = ""
-
- if name_filter:
- where_clause = f" WHERE n.name CONTAINS '{name_filter}'"
-
- # 计算总数
- count_cypher = f"{match_clause}{where_clause} RETURN count(n) as count"
- count_result = session.run(count_cypher)
- total_count = count_result.single()["count"]
-
- # 分页查询
- skip = (page - 1) * page_size
- cypher = f"""
- {match_clause}{where_clause}
- RETURN n
- ORDER BY n.createTime DESC
- SKIP {skip} LIMIT {page_size}
- """
- result = session.run(cypher)
-
- # 格式化结果
- resources = []
- for record in result:
- node = dict(record["n"])
- node["id"] = record["n"].id
- resources.append(node)
-
- return resources, total_count
- except Exception as e:
- logger.error(f"获取模型资源列表失败: {str(e)}")
- return [], 0
- def data_resource_edit(data):
- """编辑数据资源"""
- try:
- resource_id = data.get("id")
- if not resource_id:
- raise ValueError("缺少资源ID")
-
- with neo4j_driver.get_session() as session:
- # 更新节点属性
- update_fields = {}
- for key, value in data.items():
- if key != "id" and key != "tag":
- update_fields[key] = value
-
- # 添加更新时间
- update_fields["updateTime"] = get_formatted_time()
-
- # 构建更新语句
- set_clause = ", ".join([f"n.{k} = ${k}" for k in update_fields.keys()])
- cypher = f"""
- MATCH (n:data_resource)
- WHERE id(n) = $resource_id
- SET {set_clause}
- RETURN n
- """
-
- result = session.run(cypher, resource_id=int(resource_id), **update_fields)
- updated_node = result.single()
-
- if not updated_node:
- raise ValueError("资源不存在")
-
- # 处理标签关系
- tag_id = data.get("tag")
- if tag_id:
- # 删除旧的标签关系
- delete_rel_cypher = """
- MATCH (n:data_resource)-[r:label]->()
- WHERE id(n) = $resource_id
- DELETE r
- """
- session.run(delete_rel_cypher, resource_id=int(resource_id))
-
- # 创建新的标签关系
- create_rel_cypher = """
- MATCH (n:data_resource), (t:data_label)
- WHERE id(n) = $resource_id AND id(t) = $tag_id
- CREATE (n)-[r:label]->(t)
- RETURN r
- """
-
- session.run(create_rel_cypher, resource_id=int(resource_id), tag_id=int(tag_id))
-
- # 返回更新后的节点
- return dict(updated_node["n"])
- except Exception as e:
- logger.error(f"编辑数据资源失败: {str(e)}")
- raise
|