import json import re import logging from py2neo import Relationship import pandas as pd from app.services.neo4j_driver import neo4j_driver from app.services.package_function import create_or_get_node, relationship_exists, get_node logger = logging.getLogger("app") def get_formatted_time(): """获取格式化的当前时间""" import time return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) def get_node_by_id(label, id): """根据ID获取指定标签的节点""" try: with neo4j_driver.get_session() as session: cypher = f"MATCH (n:{label}) WHERE id(n) = $id RETURN n" result = session.run(cypher, id=int(id)) record = result.single() return record["n"] if record else None except Exception as e: logger.error(f"根据ID获取节点失败: {str(e)}") return None def get_node_by_id_no_label(id): """根据ID获取节点,不限制标签""" try: with neo4j_driver.get_session() as session: cypher = "MATCH (n) WHERE id(n) = $id RETURN n" result = session.run(cypher, id=int(id)) record = result.single() return record["n"] if record else None except Exception as e: logger.error(f"根据ID获取节点失败: {str(e)}") return None def delete_relationships(start_node, rel_type=None, end_node=None): """删除关系""" try: with neo4j_driver.get_session() as session: if rel_type and end_node: cypher = "MATCH (a)-[r:`{rel_type}`]->(b) WHERE id(a) = $start_id AND id(b) = $end_id DELETE r" cypher = cypher.replace("{rel_type}", rel_type) session.run(cypher, start_id=start_node.id, end_id=end_node.id) elif rel_type: cypher = "MATCH (a)-[r:`{rel_type}`]->() WHERE id(a) = $start_id DELETE r" cypher = cypher.replace("{rel_type}", rel_type) session.run(cypher, start_id=start_node.id) else: cypher = "MATCH (a)-[r]->() WHERE id(a) = $start_id DELETE r" session.run(cypher, start_id=start_node.id) return True except Exception as e: logger.error(f"删除关系失败: {str(e)}") return False def update_or_create_node(label, **properties): """更新或创建节点""" try: with neo4j_driver.get_session() as session: node_id = properties.pop('id', None) if node_id: # 更新现有节点 set_clause = ", ".join([f"n.{k} = ${k}" for k in properties.keys()]) cypher = f"MATCH (n:{label}) WHERE id(n) = $id SET {set_clause} RETURN n" result = session.run(cypher, id=int(node_id), **properties) else: # 创建新节点 props_str = ", ".join([f"{k}: ${k}" for k in properties.keys()]) cypher = f"CREATE (n:{label} {{{props_str}}}) RETURN n" result = session.run(cypher, **properties) record = result.single() return record["n"] if record else None except Exception as e: logger.error(f"更新或创建节点失败: {str(e)}") return None # 数据资源-元数据 关系节点创建、查询 def handle_node(receiver, head_data, data_resource): """处理数据资源节点创建和关系建立""" try: # 更新属性 update_attributes = { 'en_name': data_resource['en_name'], 'time': get_formatted_time(), 'type': 'structure' # 结构化文件没有type } if 'additional_info' in receiver: del receiver['additional_info'] tag_list = receiver.get('tag') receiver.update(update_attributes) # 创建或获取 data_resource 节点 with neo4j_driver.get_session() as session: props_str = ", ".join([f"{k}: ${k}" for k in receiver.keys()]) cypher = f""" MERGE (n:data_resource {{name: $name}}) ON CREATE SET n = {{{props_str}}} ON MATCH SET {", ".join([f"n.{k} = ${k}" for k in receiver.keys()])} RETURN n """ result = session.run(cypher, **receiver) data_resource_node = result.single()["n"] resource_id = data_resource_node.id # 处理标签关系 if tag_list: tag_node = get_node_by_id('data_label', tag_list) if tag_node: # 检查关系是否存在 rel_check = """ MATCH (a:data_resource)-[r:label]->(b:data_label) WHERE id(a) = $resource_id AND id(b) = $tag_id RETURN r """ rel_result = session.run(rel_check, resource_id=resource_id, tag_id=tag_node.id) # 如果关系不存在则创建 if not rel_result.single(): rel_create = """ MATCH (a:data_resource), (b:data_label) WHERE id(a) = $resource_id AND id(b) = $tag_id CREATE (a)-[r:label]->(b) RETURN r """ session.run(rel_create, resource_id=resource_id, tag_id=tag_node.id) # 处理头部数据(元数据) if head_data: for item in head_data: # 创建元数据节点 meta_cypher = """ MERGE (m:Metadata {name: $name}) ON CREATE SET m.en_name = $en_name, m.createTime = $create_time RETURN m """ create_time = get_formatted_time() meta_result = session.run( meta_cypher, name=item['name'], en_name=item['en_name'], create_time=create_time ) meta_node = meta_result.single()["m"] # 创建关系 rel_cypher = """ MATCH (a:data_resource), (m:Metadata) WHERE id(a) = $resource_id AND id(m) = $meta_id MERGE (a)-[r:contain]->(m) RETURN r """ session.run( rel_cypher, resource_id=resource_id, meta_id=meta_node.id ) return resource_id except Exception as e: logger.error(f"处理数据资源节点创建和关系建立失败: {str(e)}") raise def handle_id_resource(resource_id): """处理单个数据资源查询""" try: with neo4j_driver.get_session() as session: # 查询数据资源节点 cypher = """ MATCH (n:data_resource) WHERE id(n) = $resource_id RETURN n """ result = session.run(cypher, resource_id=int(resource_id)) record = result.single() if not record: return None data_resource = dict(record["n"]) data_resource["id"] = record["n"].id # 查询关联的标签 tag_cypher = """ MATCH (n:data_resource)-[:label]->(t:data_label) WHERE id(n) = $resource_id RETURN t """ tag_result = session.run(tag_cypher, resource_id=int(resource_id)) tag_record = tag_result.single() if tag_record: tag = dict(tag_record["t"]) tag["id"] = tag_record["t"].id data_resource["tag_info"] = tag # 查询关联的元数据 meta_cypher = """ MATCH (n:data_resource)-[:contain]->(m:Metadata) WHERE id(n) = $resource_id RETURN m """ meta_result = session.run(meta_cypher, resource_id=int(resource_id)) meta_list = [] for meta_record in meta_result: meta = dict(meta_record["m"]) meta["id"] = meta_record["m"].id meta_list.append(meta) data_resource["meta_list"] = meta_list return data_resource except Exception as e: logger.error(f"处理单个数据资源查询失败: {str(e)}") return None def id_resource_graph(resource_id): """获取数据资源图谱""" try: with neo4j_driver.get_session() as session: # 查询数据资源节点及其关系 cypher = """ MATCH (n:data_resource)-[r]-(m) WHERE id(n) = $resource_id RETURN n, r, m """ result = session.run(cypher, resource_id=int(resource_id)) # 收集节点和关系 nodes = {} relationships = [] for record in result: # 处理源节点 source_node = dict(record["n"]) source_node["id"] = record["n"].id nodes[source_node["id"]] = source_node # 处理目标节点 target_node = dict(record["m"]) target_node["id"] = record["m"].id nodes[target_node["id"]] = target_node # 处理关系 rel = record["r"] relationship = { "id": rel.id, "source": record["n"].id, "target": record["m"].id, "type": rel.type } relationships.append(relationship) return { "nodes": list(nodes.values()), "relationships": relationships } except Exception as e: logger.error(f"获取数据资源图谱失败: {str(e)}") return {"nodes": [], "relationships": []} def resource_list(page, page_size, en_name_filter=None, name_filter=None, type_filter='all', category_filter=None, tag_filter=None): """获取数据资源列表""" try: with neo4j_driver.get_session() as session: # 构建查询条件 match_clause = "MATCH (n:data_resource)" where_conditions = [] if en_name_filter: where_conditions.append(f"n.en_name CONTAINS '{en_name_filter}'") if name_filter: where_conditions.append(f"n.name CONTAINS '{name_filter}'") if type_filter and type_filter != 'all': where_conditions.append(f"n.type = '{type_filter}'") if category_filter: where_conditions.append(f"n.category = '{category_filter}'") # 标签过滤需要额外的匹配 if tag_filter: match_clause += "-[:label]->(t:data_label)" where_conditions.append(f"t.name = '{tag_filter}'") where_clause = " WHERE " + " AND ".join(where_conditions) if where_conditions else "" # 计算总数 count_cypher = f"{match_clause}{where_clause} RETURN count(n) as count" count_result = session.run(count_cypher) total_count = count_result.single()["count"] # 分页查询 skip = (page - 1) * page_size cypher = f""" {match_clause}{where_clause} RETURN n ORDER BY n.time DESC SKIP {skip} LIMIT {page_size} """ result = session.run(cypher) # 格式化结果 resources = [] for record in result: node = dict(record["n"]) node["id"] = record["n"].id # 查询关联的标签 tag_cypher = """ MATCH (n:data_resource)-[:label]->(t:data_label) WHERE id(n) = $resource_id RETURN t """ tag_result = session.run(tag_cypher, resource_id=node["id"]) tag_record = tag_result.single() if tag_record: tag = dict(tag_record["t"]) tag["id"] = tag_record["t"].id node["tag_info"] = tag resources.append(node) return resources, total_count except Exception as e: logger.error(f"获取数据资源列表失败: {str(e)}") return [], 0 def id_data_search_list(resource_id, page, page_size, en_name_filter=None, name_filter=None, category_filter=None, tag_filter=None): """获取特定数据资源关联的元数据列表""" try: with neo4j_driver.get_session() as session: # 基本匹配语句 match_clause = """ MATCH (n:data_resource)-[:contain]->(m:Metadata) WHERE id(n) = $resource_id """ where_conditions = [] if en_name_filter: where_conditions.append(f"m.en_name CONTAINS '{en_name_filter}'") if name_filter: where_conditions.append(f"m.name CONTAINS '{name_filter}'") if category_filter: where_conditions.append(f"m.category = '{category_filter}'") # 标签过滤需要额外的匹配 tag_match = "" if tag_filter: tag_match = "MATCH (m)-[:HAS_TAG]->(t:Tag) WHERE t.name = $tag_filter" where_clause = " AND " + " AND ".join(where_conditions) if where_conditions else "" # 计算总数 count_cypher = f""" {match_clause}{where_clause} {tag_match} RETURN count(m) as count """ count_params = {"resource_id": int(resource_id)} if tag_filter: count_params["tag_filter"] = tag_filter count_result = session.run(count_cypher, **count_params) total_count = count_result.single()["count"] # 分页查询 skip = (page - 1) * page_size cypher = f""" {match_clause}{where_clause} {tag_match} RETURN m ORDER BY m.name SKIP {skip} LIMIT {page_size} """ result = session.run(cypher, **count_params) # 格式化结果 metadata_list = [] for record in result: node = dict(record["m"]) node["id"] = record["m"].id metadata_list.append(node) return metadata_list, total_count except Exception as e: logger.error(f"获取数据资源关联的元数据列表失败: {str(e)}") return [], 0 def resource_kinship_graph(resource_id, include_meta=True): """获取数据资源亲缘关系图谱""" try: with neo4j_driver.get_session() as session: # 基本查询 cypher_parts = [ "MATCH (n:data_resource) WHERE id(n) = $resource_id", "OPTIONAL MATCH (n)-[:label]->(l:data_label)", ] # 是否包含元数据 if include_meta: cypher_parts.append("OPTIONAL MATCH (n)-[:contain]->(m:Metadata)") cypher_parts.append("RETURN n, l, collect(m) as metadata") cypher = "\n".join(cypher_parts) result = session.run(cypher, resource_id=int(resource_id)) record = result.single() if not record: return {"nodes": [], "relationships": []} # 收集节点和关系 nodes = {} relationships = [] # 处理数据资源节点 resource_node = dict(record["n"]) resource_node["id"] = record["n"].id resource_node["labels"] = list(record["n"].labels) nodes[resource_node["id"]] = resource_node # 处理标签节点 if record["l"]: label_node = dict(record["l"]) label_node["id"] = record["l"].id label_node["labels"] = list(record["l"].labels) nodes[label_node["id"]] = label_node # 添加资源-标签关系 relationships.append({ "id": f"rel-{resource_node['id']}-label-{label_node['id']}", "source": resource_node["id"], "target": label_node["id"], "type": "label" }) # 处理元数据节点 if include_meta and record["metadata"]: for meta in record["metadata"]: if meta: # 检查元数据节点是否存在 meta_node = dict(meta) meta_node["id"] = meta.id meta_node["labels"] = list(meta.labels) nodes[meta_node["id"]] = meta_node # 添加资源-元数据关系 relationships.append({ "id": f"rel-{resource_node['id']}-contain-{meta_node['id']}", "source": resource_node["id"], "target": meta_node["id"], "type": "contain" }) return { "nodes": list(nodes.values()), "relationships": relationships } except Exception as e: logger.error(f"获取数据资源亲缘关系图谱失败: {str(e)}") return {"nodes": [], "relationships": []} def resource_impact_all_graph(resource_id, include_meta=True): """获取数据资源影响关系图谱""" try: with neo4j_driver.get_session() as session: # 根据meta参数决定查询深度 if include_meta: cypher = """ MATCH path = (n:data_resource)-[*1..3]-(m) WHERE id(n) = $resource_id RETURN path """ else: cypher = """ MATCH path = (n:data_resource)-[*1..2]-(m) WHERE id(n) = $resource_id AND NOT (m:Metadata) RETURN path """ result = session.run(cypher, resource_id=int(resource_id)) # 收集节点和关系 nodes = {} relationships = {} for record in result: path = record["path"] # 处理路径中的所有节点 for node in path.nodes: if node.id not in nodes: node_dict = dict(node) node_dict["id"] = node.id node_dict["labels"] = list(node.labels) nodes[node.id] = node_dict # 处理路径中的所有关系 for rel in path.relationships: if rel.id not in relationships: rel_dict = { "id": rel.id, "source": rel.start_node.id, "target": rel.end_node.id, "type": rel.type } relationships[rel.id] = rel_dict return { "nodes": list(nodes.values()), "relationships": list(relationships.values()) } except Exception as e: logger.error(f"获取数据资源影响关系图谱失败: {str(e)}") return {"nodes": [], "relationships": []} def clean_type(type_str): """清洗SQL类型字符串""" return re.sub(r'\(.*?\)', '', type_str).strip().upper() def clean_field_name(field_name): """清洗字段名""" return field_name.strip('`').strip('"').strip("'") def select_create_ddl(sql_content): """从SQL内容中提取创建表的DDL语句""" create_pattern = r'CREATE\s+TABLE.*?;' matches = re.findall(create_pattern, sql_content, re.DOTALL | re.IGNORECASE) return matches def table_sql(sql): """解析表定义SQL""" try: # 提取表名 table_name_pattern = r'CREATE\s+TABLE\s+(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))' table_name_match = re.search(table_name_pattern, sql, re.IGNORECASE) if not table_name_match: return None # 获取匹配的表名(从四个捕获组中选择非None的一个) table_name = next((g for g in table_name_match.groups() if g is not None), "") # 提取字段定义 fields_pattern = r'CREATE\s+TABLE[^(]*\(\s*(.*?)\s*\)' fields_match = re.search(fields_pattern, sql, re.DOTALL | re.IGNORECASE) if not fields_match: return None fields_text = fields_match.group(1) # 分割字段定义 field_definitions = [] # 处理字段定义,避免在逗号内的括号中分割 in_parenthesis = 0 current_field = "" for char in fields_text: if char == '(': in_parenthesis += 1 current_field += char elif char == ')': in_parenthesis -= 1 current_field += char elif char == ',' and in_parenthesis == 0: field_definitions.append(current_field.strip()) current_field = "" else: current_field += char if current_field.strip(): field_definitions.append(current_field.strip()) # 解析每个字段 fields = [] primary_keys = [] for field_def in field_definitions: # 忽略PRIMARY KEY等约束定义 if re.match(r'^\s*(?:PRIMARY|UNIQUE|FOREIGN|CHECK|CONSTRAINT)\s+', field_def, re.IGNORECASE): # 提取主键字段 pk_pattern = r'PRIMARY\s+KEY\s*\(\s*(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))\s*\)' pk_match = re.search(pk_pattern, field_def, re.IGNORECASE) if pk_match: pk = next((g for g in pk_match.groups() if g is not None), "") primary_keys.append(pk) continue # 解析常规字段定义 field_pattern = r'^\s*(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))\s+([A-Za-z0-9_]+(?:\s*\([^)]*\))?)' field_match = re.search(field_pattern, field_def) if field_match: # 提取字段名和类型 field_name = next((g for g in field_match.groups()[:4] if g is not None), "") field_type = field_match.group(5) # 检查是否为主键 is_primary = "PRIMARY KEY" in field_def.upper() if is_primary: primary_keys.append(field_name) # 检查是否为非空 not_null = "NOT NULL" in field_def.upper() # 检查默认值 default_match = re.search(r'DEFAULT\s+([^,\s]+)', field_def, re.IGNORECASE) default_value = default_match.group(1) if default_match else None # 添加字段信息 field_info = { "name": field_name, "type": clean_type(field_type), "is_primary": is_primary, "not_null": not_null } if default_value: field_info["default"] = default_value fields.append(field_info) # 更新主键标记 for field in fields: if field["name"] in primary_keys and not field["is_primary"]: field["is_primary"] = True # 返回结果 return { "table_name": table_name, "fields": fields } except Exception as e: logger.error(f"解析表定义SQL失败: {str(e)}") return None def select_sql(sql_query): """解析SELECT查询语句""" try: # 提取SELECT子句 select_pattern = r'SELECT\s+(.*?)\s+FROM' select_match = re.search(select_pattern, sql_query, re.IGNORECASE | re.DOTALL) if not select_match: return None select_clause = select_match.group(1) # 分割字段 fields = [] # 处理字段列表,避免在函数调用中的逗号导致错误分割 in_parenthesis = 0 current_field = "" for char in select_clause: if char == '(': in_parenthesis += 1 current_field += char elif char == ')': in_parenthesis -= 1 current_field += char elif char == ',' and in_parenthesis == 0: fields.append(current_field.strip()) current_field = "" else: current_field += char if current_field.strip(): fields.append(current_field.strip()) # 解析每个字段 parsed_fields = [] for field in fields: # 检查是否有字段别名 alias_pattern = r'(.*?)\s+[aA][sS]\s+(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))$' alias_match = re.search(alias_pattern, field) if alias_match: field_expr = alias_match.group(1).strip() field_alias = next((g for g in alias_match.groups()[1:] if g is not None), "") parsed_fields.append({ "expression": field_expr, "alias": field_alias }) else: # 没有别名的情况 parsed_fields.append({ "expression": field.strip(), "alias": None }) # 提取FROM子句和表名 from_pattern = r'FROM\s+(.*?)(?:\s+WHERE|\s+GROUP|\s+HAVING|\s+ORDER|\s+LIMIT|$)' from_match = re.search(from_pattern, sql_query, re.IGNORECASE | re.DOTALL) tables = [] if from_match: from_clause = from_match.group(1).strip() # 分析FROM子句中的表 table_pattern = r'(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))(?:\s+(?:AS\s+)?(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+)))?' table_matches = re.finditer(table_pattern, from_clause) for match in table_matches: table_name = next((g for g in match.groups()[:4] if g is not None), "") table_alias = next((g for g in match.groups()[4:] if g is not None), table_name) tables.append({ "name": table_name, "alias": table_alias }) return { "fields": parsed_fields, "tables": tables } except Exception as e: logger.error(f"解析SELECT查询语句失败: {str(e)}") return None def model_resource_list(page, page_size, name_filter=None): """获取模型资源列表""" try: with neo4j_driver.get_session() as session: # 构建查询条件 match_clause = "MATCH (n:model_resource)" where_clause = "" if name_filter: where_clause = f" WHERE n.name CONTAINS '{name_filter}'" # 计算总数 count_cypher = f"{match_clause}{where_clause} RETURN count(n) as count" count_result = session.run(count_cypher) total_count = count_result.single()["count"] # 分页查询 skip = (page - 1) * page_size cypher = f""" {match_clause}{where_clause} RETURN n ORDER BY n.createTime DESC SKIP {skip} LIMIT {page_size} """ result = session.run(cypher) # 格式化结果 resources = [] for record in result: node = dict(record["n"]) node["id"] = record["n"].id resources.append(node) return resources, total_count except Exception as e: logger.error(f"获取模型资源列表失败: {str(e)}") return [], 0 def data_resource_edit(data): """编辑数据资源""" try: resource_id = data.get("id") if not resource_id: raise ValueError("缺少资源ID") with neo4j_driver.get_session() as session: # 更新节点属性 update_fields = {} for key, value in data.items(): if key != "id" and key != "tag": update_fields[key] = value # 添加更新时间 update_fields["updateTime"] = get_formatted_time() # 构建更新语句 set_clause = ", ".join([f"n.{k} = ${k}" for k in update_fields.keys()]) cypher = f""" MATCH (n:data_resource) WHERE id(n) = $resource_id SET {set_clause} RETURN n """ result = session.run(cypher, resource_id=int(resource_id), **update_fields) updated_node = result.single() if not updated_node: raise ValueError("资源不存在") # 处理标签关系 tag_id = data.get("tag") if tag_id: # 删除旧的标签关系 delete_rel_cypher = """ MATCH (n:data_resource)-[r:label]->() WHERE id(n) = $resource_id DELETE r """ session.run(delete_rel_cypher, resource_id=int(resource_id)) # 创建新的标签关系 create_rel_cypher = """ MATCH (n:data_resource), (t:data_label) WHERE id(n) = $resource_id AND id(t) = $tag_id CREATE (n)-[r:label]->(t) RETURN r """ session.run(create_rel_cypher, resource_id=int(resource_id), tag_id=int(tag_id)) # 返回更新后的节点 return dict(updated_node["n"]) except Exception as e: logger.error(f"编辑数据资源失败: {str(e)}") raise