import json import re import logging from py2neo import Relationship import pandas as pd from app.services.neo4j_driver import neo4j_driver from app.services.package_function import create_or_get_node, relationship_exists, get_node import time logger = logging.getLogger("app") def get_formatted_time(): """获取格式化的当前时间""" return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) def get_node_by_id(label, id): """根据ID获取指定标签的节点""" try: with neo4j_driver.get_session() as session: # 确保id为整数 try: id_int = int(id) except (ValueError, TypeError): logger.error(f"节点ID不是有效的整数: {id}") return None cypher = f"MATCH (n:{label}) WHERE id(n) = $id RETURN n" result = session.run(cypher, id=id_int) record = result.single() return record["n"] if record else None except Exception as e: logger.error(f"根据ID获取节点失败: {str(e)}") return None def get_node_by_id_no_label(id): """根据ID获取节点,不限制标签""" try: with neo4j_driver.get_session() as session: # 确保id为整数 try: id_int = int(id) except (ValueError, TypeError): logger.error(f"节点ID不是有效的整数: {id}") return None cypher = "MATCH (n) WHERE id(n) = $id RETURN n" result = session.run(cypher, id=id_int) record = result.single() return record["n"] if record else None except Exception as e: logger.error(f"根据ID获取节点失败: {str(e)}") return None def delete_relationships(start_node, rel_type=None, end_node=None): """删除关系""" try: with neo4j_driver.get_session() as session: if rel_type and end_node: cypher = "MATCH (a)-[r:`{rel_type}`]->(b) WHERE id(a) = $start_id AND id(b) = $end_id DELETE r" cypher = cypher.replace("{rel_type}", rel_type) session.run(cypher, start_id=start_node.id, end_id=end_node.id) elif rel_type: cypher = "MATCH (a)-[r:`{rel_type}`]->() WHERE id(a) = $start_id DELETE r" cypher = cypher.replace("{rel_type}", rel_type) session.run(cypher, start_id=start_node.id) else: cypher = "MATCH (a)-[r]->() WHERE id(a) = $start_id DELETE r" session.run(cypher, start_id=start_node.id) return True except Exception as e: logger.error(f"删除关系失败: {str(e)}") return False def update_or_create_node(label, **properties): """更新或创建节点""" try: with neo4j_driver.get_session() as session: node_id = properties.pop('id', None) if node_id: # 更新现有节点 set_clause = ", ".join([f"n.{k} = ${k}" for k in properties.keys()]) cypher = f"MATCH (n:{label}) WHERE id(n) = $id SET {set_clause} RETURN n" result = session.run(cypher, id=int(node_id), **properties) else: # 创建新节点 props_str = ", ".join([f"{k}: ${k}" for k in properties.keys()]) cypher = f"CREATE (n:{label} {{{props_str}}}) RETURN n" result = session.run(cypher, **properties) record = result.single() return record["n"] if record else None except Exception as e: logger.error(f"更新或创建节点失败: {str(e)}") return None # 数据资源-元数据 关系节点创建、查询 def handle_node(receiver, head_data, data_source=None, resource_type=None): """处理数据资源节点创建和关系建立""" try: # 根据resource_type设置type属性的值 if resource_type == 'ddl': type_value = 'ddl' else: type_value = 'structure' # 更新属性 update_attributes = { 'en_name': receiver['en_name'], 'time': get_formatted_time(), 'type': type_value # 根据资源类型设置不同的type值 } if 'additional_info' in receiver: del receiver['additional_info'] # 从receiver中移除data_source属性,避免将复杂对象作为节点属性 if 'data_source' in receiver: del receiver['data_source'] tag_list = receiver.get('tag') receiver.update(update_attributes) # 创建或获取 data_resource 节点 with neo4j_driver.get_session() as session: props_str = ", ".join([f"{k}: ${k}" for k in receiver.keys()]) cypher = f""" MERGE (n:data_resource {{name: $name}}) ON CREATE SET n = {{{props_str}}} ON MATCH SET {", ".join([f"n.{k} = ${k}" for k in receiver.keys()])} RETURN n """ result = session.run(cypher, **receiver) data_resource_node = result.single()["n"] resource_id = data_resource_node.id # 使用id属性获取数值ID # 处理标签关系 if tag_list: tag_node = get_node_by_id('data_label', tag_list) if tag_node: # 检查关系是否存在 rel_check = """ MATCH (a:data_resource)-[r:label]->(b:data_label) WHERE id(a) = $resource_id AND id(b) = $tag_id RETURN r """ rel_result = session.run(rel_check, resource_id=resource_id, tag_id=tag_node.id) # 使用数值id # 如果关系不存在则创建 if not rel_result.single(): rel_create = """ MATCH (a:data_resource), (b:data_label) WHERE id(a) = $resource_id AND id(b) = $tag_id CREATE (a)-[r:label]->(b) RETURN r """ session.run(rel_create, resource_id=resource_id, tag_id=tag_node.id) # 处理头部数据(元数据,字段) if head_data: for item in head_data: # 创建元数据节点 meta_cypher = """ MERGE (m:meta_data {name: $name}) ON CREATE SET m.en_name = $en_name, m.create_time = $create_time, m.data_type = $type, m.status = 'true' ON MATCH SET m.data_type = $type, m.status = 'true' RETURN m """ create_time = get_formatted_time() meta_result = session.run( meta_cypher, name=item['name'], en_name=item['en_name'], create_time=create_time, type=item['data_type'] # 使用data_type作为data_type属性 ) meta_record = meta_result.single() if meta_record and meta_record["m"]: meta_node = meta_record["m"] meta_id = meta_node.id # 使用数值ID # 打印日志确认节点创建成功和ID logger.info(f"创建或获取到元数据节点: ID={meta_id}, name={item['name']}") # 确认数据资源节点是否可以正确查询到 check_resource_cypher = """ MATCH (n:data_resource) WHERE id(n) = $resource_id RETURN n """ check_resource = session.run(check_resource_cypher, resource_id=resource_id) if check_resource.single(): logger.info(f"找到数据资源节点: ID={resource_id}") else: logger.error(f"无法找到数据资源节点: ID={resource_id}") continue # 创建关系 rel_cypher = """ MATCH (a:data_resource), (m:meta_data) WHERE id(a) = $resource_id AND id(m) = $meta_id MERGE (a)-[r:contain]->(m) RETURN r """ rel_result = session.run( rel_cypher, resource_id=resource_id, meta_id=meta_id ) rel_record = rel_result.single() if rel_record: logger.info(f"成功创建数据资源与元数据的关系: {resource_id} -> {meta_id}") else: logger.warning(f"创建数据资源与元数据的关系失败: {resource_id} -> {meta_id}") else: logger.error(f"未能创建或获取元数据节点: {item['name']}") # 处理数据源关系 if data_source and resource_type == 'ddl': try: # 创建或获取数据源节点 # data_source_en_name = handle_data_source(data_source) data_source_en_name = data_source['en_name'] # 创建数据资源与数据源的关系 if data_source_en_name: # 创建 originates_from 关系 rel_data_source_cypher = """ MATCH (a:data_resource), (b:data_source) WHERE id(a) = $resource_id AND b.en_name = $ds_en_name MERGE (a)-[r:originates_from]->(b) RETURN r """ rel_result = session.run( rel_data_source_cypher, resource_id=resource_id, ds_en_name=data_source_en_name ) rel_record = rel_result.single() if rel_record: logger.info(f"已创建数据资源与数据源的关系: {resource_id} -> {data_source_en_name}") else: # 添加严重错误日志 error_msg = f"创建数据资源与数据源的关系失败: {resource_id} -> {data_source_en_name}" logger.error(error_msg) # 检查数据源节点是否存在 check_ds_cypher = "MATCH (b:data_source) WHERE b.en_name = $ds_en_name RETURN b" check_ds_result = session.run(check_ds_cypher, ds_en_name=data_source_en_name) if not check_ds_result.single(): logger.error(f"数据源节点不存在: en_name={data_source_en_name}") # 检查数据资源节点是否存在 check_res_cypher = "MATCH (a:data_resource) WHERE id(a) = $resource_id RETURN a" check_res_result = session.run(check_res_cypher, resource_id=resource_id) if not check_res_result.single(): logger.error(f"数据资源节点不存在: id={resource_id}") # 严重错误应该抛出异常 raise RuntimeError(error_msg) except Exception as e: logger.error(f"处理数据源关系失败: {str(e)}") raise RuntimeError(f"处理数据源关系失败: {str(e)}") return resource_id except Exception as e: logger.error(f"处理数据资源节点创建和关系建立失败: {str(e)}") raise def handle_id_resource(resource_id): """处理单个数据资源查询""" try: with neo4j_driver.get_session() as session: # 确保resource_id为整数 try: resource_id_int = int(resource_id) except (ValueError, TypeError): logger.error(f"资源ID不是有效的整数: {resource_id}") return None # 使用数值ID查询 cypher = """ MATCH (n:data_resource) WHERE id(n) = $resource_id RETURN n """ result = session.run(cypher, resource_id=resource_id_int) record = result.single() if not record: logger.error(f"未找到资源,ID: {resource_id_int}") return None # 构建返回数据 data_resource = dict(record["n"]) data_resource["id"] = record["n"].id # 查询关联的标签 tag_cypher = """ MATCH (n:data_resource)-[r:label]->(t:data_label) WHERE id(n) = $resource_id RETURN t """ tag_result = session.run(tag_cypher, resource_id=resource_id_int) tag_record = tag_result.single() if tag_record: tag = dict(tag_record["t"]) tag["id"] = tag_record["t"].id data_resource["tag_info"] = tag # 查询关联的元数据 - 支持meta_data和Metadata两种标签 meta_cypher = """ MATCH (n:data_resource)-[:contain]->(m) WHERE id(n) = $resource_id AND (m:meta_data OR m:Metadata) RETURN m """ meta_result = session.run(meta_cypher, resource_id=resource_id_int) meta_list = [] for meta_record in meta_result: meta = dict(meta_record["m"]) meta["id"] = meta_record["m"].id meta_list.append(meta) data_resource["meta_list"] = meta_list logger.info(f"成功获取资源详情,ID: {resource_id_int}") return data_resource except Exception as e: logger.error(f"处理单个数据资源查询失败: {str(e)}") return None def id_resource_graph(resource_id): """获取数据资源图谱""" try: with neo4j_driver.get_session() as session: # 查询数据资源节点及其关系 cypher = """ MATCH (n:data_resource)-[r]-(m) WHERE id(n) = $resource_id RETURN n, r, m """ result = session.run(cypher, resource_id=int(resource_id)) # 收集节点和关系 nodes = {} relationships = [] for record in result: # 处理源节点 source_node = dict(record["n"]) source_node["id"] = record["n"].id nodes[source_node["id"]] = source_node # 处理目标节点 target_node = dict(record["m"]) target_node["id"] = record["m"].id nodes[target_node["id"]] = target_node # 处理关系 rel = record["r"] relationship = { "id": rel.id, "source": record["n"].id, "target": record["m"].id, "type": rel.type } relationships.append(relationship) return { "nodes": list(nodes.values()), "relationships": relationships } except Exception as e: logger.error(f"获取数据资源图谱失败: {str(e)}") return {"nodes": [], "relationships": []} def resource_list(page, page_size, en_name_filter=None, name_filter=None, type_filter='all', category_filter=None, tag_filter=None): """获取数据资源列表""" try: with neo4j_driver.get_session() as session: # 构建查询条件 match_clause = "MATCH (n:data_resource)" where_conditions = [] if en_name_filter: where_conditions.append(f"n.en_name CONTAINS '{en_name_filter}'") if name_filter: where_conditions.append(f"n.name CONTAINS '{name_filter}'") if type_filter and type_filter != 'all': where_conditions.append(f"n.type = '{type_filter}'") if category_filter: where_conditions.append(f"n.category = '{category_filter}'") # 标签过滤需要额外的匹配 if tag_filter: match_clause += "-[:label]->(t:data_label)" where_conditions.append(f"t.name = '{tag_filter}'") where_clause = " WHERE " + " AND ".join(where_conditions) if where_conditions else "" # 计算总数 count_cypher = f"{match_clause}{where_clause} RETURN count(n) as count" count_result = session.run(count_cypher) total_count = count_result.single()["count"] # 分页查询 skip = (page - 1) * page_size cypher = f""" {match_clause}{where_clause} RETURN n ORDER BY n.time DESC SKIP {skip} LIMIT {page_size} """ result = session.run(cypher) # 格式化结果 resources = [] for record in result: node = dict(record["n"]) node["id"] = record["n"].id # 查询关联的标签 tag_cypher = """ MATCH (n:data_resource)-[r:label]->(t:data_label) WHERE id(n) = $resource_id RETURN t """ tag_result = session.run(tag_cypher, resource_id=node["id"]) tag_record = tag_result.single() if tag_record: tag = dict(tag_record["t"]) tag["id"] = tag_record["t"].id node["tag_info"] = tag resources.append(node) return resources, total_count except Exception as e: logger.error(f"获取数据资源列表失败: {str(e)}") return [], 0 def id_data_search_list(resource_id, page, page_size, en_name_filter=None, name_filter=None, category_filter=None, tag_filter=None): """获取特定数据资源关联的元数据列表""" try: with neo4j_driver.get_session() as session: # 确保resource_id为整数 try: resource_id_int = int(resource_id) except (ValueError, TypeError): logger.error(f"资源ID不是有效的整数: {resource_id}") return [], 0 # 基本匹配语句 - 支持meta_data和Metadata标签 match_clause = """ MATCH (n:data_resource)-[:contain]->(m) WHERE id(n) = $resource_id AND (m:meta_data OR m:Metadata) """ where_conditions = [] if en_name_filter: where_conditions.append(f"m.en_name CONTAINS '{en_name_filter}'") if name_filter: where_conditions.append(f"m.name CONTAINS '{name_filter}'") if category_filter: where_conditions.append(f"m.category = '{category_filter}'") # 标签过滤需要额外的匹配 tag_match = "" if tag_filter: tag_match = "MATCH (m)-[:HAS_TAG]->(t:Tag) WHERE t.name = $tag_filter" where_clause = " AND " + " AND ".join(where_conditions) if where_conditions else "" # 计算总数 count_cypher = f""" {match_clause}{where_clause} {tag_match} RETURN count(m) as count """ count_params = {"resource_id": resource_id_int} if tag_filter: count_params["tag_filter"] = tag_filter count_result = session.run(count_cypher, **count_params) total_count = count_result.single()["count"] # 分页查询 skip = (page - 1) * page_size cypher = f""" {match_clause}{where_clause} {tag_match} RETURN m ORDER BY m.name SKIP {skip} LIMIT {page_size} """ result = session.run(cypher, **count_params) # 格式化结果 metadata_list = [] for record in result: meta = dict(record["m"]) meta["id"] = record["m"].id metadata_list.append(meta) logger.info(f"成功获取资源关联元数据,ID: {resource_id_int}, 元数据数量: {total_count}") return metadata_list, total_count except Exception as e: logger.error(f"获取数据资源关联的元数据列表失败: {str(e)}") return [], 0 def resource_kinship_graph(resource_id, include_meta=True): """获取数据资源亲缘关系图谱""" try: with neo4j_driver.get_session() as session: # 确保resource_id为整数 try: resource_id_int = int(resource_id) except (ValueError, TypeError): logger.error(f"资源ID不是有效的整数: {resource_id}") return {"nodes": [], "relationships": []} # 基本查询 cypher_parts = [ f"MATCH (n:data_resource) WHERE id(n) = $resource_id", "OPTIONAL MATCH (n)-[:label]->(l:data_label)", ] # 是否包含元数据 - 支持meta_data和Metadata两种标签 if include_meta: cypher_parts.append("OPTIONAL MATCH (n)-[:contain]->(m) WHERE (m:meta_data OR m:Metadata)") cypher_parts.append("RETURN n, l, collect(m) as metadata") cypher = "\n".join(cypher_parts) result = session.run(cypher, resource_id=resource_id_int) record = result.single() if not record: logger.error(f"未找到资源图谱数据,ID: {resource_id_int}") return {"nodes": [], "relationships": []} # 收集节点和关系 nodes = {} relationships = [] # 处理数据资源节点 resource_node = dict(record["n"]) resource_node["id"] = record["n"].id resource_node["labels"] = list(record["n"].labels) nodes[resource_node["id"]] = resource_node # 处理标签节点 if record["l"]: label_node = dict(record["l"]) label_node["id"] = record["l"].id label_node["labels"] = list(record["l"].labels) nodes[label_node["id"]] = label_node # 添加资源-标签关系 relationships.append({ "id": f"rel-{resource_node['id']}-label-{label_node['id']}", "source": resource_node["id"], "target": label_node["id"], "type": "label" }) # 处理元数据节点 if include_meta and record["metadata"]: for meta in record["metadata"]: if meta: # 检查元数据节点是否存在 meta_node = dict(meta) meta_node["id"] = meta.id meta_node["labels"] = list(meta.labels) nodes[meta_node["id"]] = meta_node # 添加资源-元数据关系 relationships.append({ "id": f"rel-{resource_node['id']}-contain-{meta_node['id']}", "source": resource_node["id"], "target": meta_node["id"], "type": "contain" }) logger.info(f"成功获取资源图谱,ID: {resource_id_int}, 节点数: {len(nodes)}") return { "nodes": list(nodes.values()), "relationships": relationships } except Exception as e: logger.error(f"获取数据资源亲缘关系图谱失败: {str(e)}") return {"nodes": [], "relationships": []} def resource_impact_all_graph(resource_id, include_meta=True): """获取数据资源影响关系图谱""" try: with neo4j_driver.get_session() as session: # 确保resource_id为整数 try: resource_id_int = int(resource_id) except (ValueError, TypeError): logger.error(f"资源ID不是有效的整数: {resource_id}") return {"nodes": [], "relationships": []} # 根据meta参数决定查询深度 if include_meta: cypher = """ MATCH path = (n:data_resource)-[*1..3]-(m) WHERE id(n) = $resource_id RETURN path """ else: cypher = """ MATCH path = (n:data_resource)-[*1..2]-(m) WHERE id(n) = $resource_id AND NOT (m:meta_data) AND NOT (m:Metadata) RETURN path """ result = session.run(cypher, resource_id=resource_id_int) # 收集节点和关系 nodes = {} relationships = {} for record in result: path = record["path"] # 处理路径中的所有节点 for node in path.nodes: if node.id not in nodes: node_dict = dict(node) node_dict["id"] = node.id node_dict["labels"] = list(node.labels) nodes[node.id] = node_dict # 处理路径中的所有关系 for rel in path.relationships: if rel.id not in relationships: rel_dict = { "id": rel.id, "source": rel.start_node.id, "target": rel.end_node.id, "type": rel.type } relationships[rel.id] = rel_dict logger.info(f"成功获取完整图谱,ID: {resource_id_int}, 节点数: {len(nodes)}") return { "nodes": list(nodes.values()), "relationships": list(relationships.values()) } except Exception as e: logger.error(f"获取数据资源影响关系图谱失败: {str(e)}") return {"nodes": [], "relationships": []} def clean_type(type_str): """清洗SQL类型字符串""" # 提取基本类型,不包括长度或精度信息 basic_type = re.sub(r'\(.*?\)', '', type_str).strip().upper() # 移除 VARYING 这样的后缀 basic_type = re.sub(r'\s+VARYING$', '', basic_type) # 标准化常见类型 type_mapping = { 'INT': 'INTEGER', 'INT4': 'INTEGER', 'INT8': 'BIGINT', 'SMALLINT': 'SMALLINT', 'BIGINT': 'BIGINT', 'FLOAT4': 'FLOAT', 'FLOAT8': 'DOUBLE', 'REAL': 'FLOAT', 'DOUBLE PRECISION': 'DOUBLE', 'NUMERIC': 'DECIMAL', 'BOOL': 'BOOLEAN', 'CHARACTER': 'CHAR', 'CHAR VARYING': 'VARCHAR', 'CHARACTER VARYING': 'VARCHAR', 'TEXT': 'TEXT', 'DATE': 'DATE', 'TIME': 'TIME', 'TIMESTAMP': 'TIMESTAMP', 'TIMESTAMPTZ': 'TIMESTAMP WITH TIME ZONE', 'BYTEA': 'BINARY', 'JSON': 'JSON', 'JSONB': 'JSONB', 'UUID': 'UUID', 'SERIAL': 'SERIAL', 'SERIAL4': 'SERIAL', 'SERIAL8': 'BIGSERIAL', 'BIGSERIAL': 'BIGSERIAL' } # 尝试从映射表中获取标准化的类型 return type_mapping.get(basic_type, basic_type) def clean_field_name(field_name): """清洗字段名""" return field_name.strip('`').strip('"').strip("'") def select_create_ddl(sql_content): """从SQL内容中提取创建表的DDL语句""" try: # 解析复杂的SQL文件,识别所有的CREATE TABLE语句及其关联的注释 # 找到所有以CREATE TABLE开头的语句块,每个语句块包含主语句和相关的注释 # 首先,分割 SQL 内容按分号 statements = [] current_statement = "" in_string = False string_quote = None for char in sql_content: if char in ["'", '"']: if not in_string: in_string = True string_quote = char elif char == string_quote: in_string = False string_quote = None current_statement += char elif char == ';' and not in_string: current_statement += char if current_statement.strip(): statements.append(current_statement.strip()) current_statement = "" else: current_statement += char if current_statement.strip(): statements.append(current_statement.strip()) # 找出所有的CREATE TABLE语句和关联的注释 create_table_statements = [] create_index = -1 in_table_block = False current_table = None current_block = "" for i, stmt in enumerate(statements): if re.search(r'^\s*CREATE\s+TABLE', stmt, re.IGNORECASE): # 如果已经在处理表,先保存当前块 if in_table_block and current_block: create_table_statements.append(current_block) # 开始新的表块 in_table_block = True current_block = stmt # 提取表名 table_match = re.search(r'CREATE\s+TABLE\s+(?:(?:"[^"]+"|\'[^\']+\'|[^"\'\s\.]+)\.)?(?:"([^"]+)"|\'([^\']+)\'|([^"\'\s\(]+))', stmt, re.IGNORECASE) if table_match: current_table = table_match.group(1) or table_match.group(2) or table_match.group(3) current_table = current_table.strip('"\'') if current_table else "" elif in_table_block and (re.search(r'COMMENT\s+ON\s+TABLE', stmt, re.IGNORECASE) or re.search(r'COMMENT\s+ON\s+COLUMN', stmt, re.IGNORECASE)): # 检查注释是否属于当前表 if current_table: # 表注释处理 if re.search(r'COMMENT\s+ON\s+TABLE', stmt, re.IGNORECASE): table_comment_match = re.search(r'COMMENT\s+ON\s+TABLE\s+[\'"]?(\w+)[\'"]?', stmt, re.IGNORECASE) if table_comment_match: comment_table = table_comment_match.group(1).strip('"\'') if comment_table == current_table: current_block += " " + stmt else: # 这是另一个表的注释,当前表的DDL到此结束 create_table_statements.append(current_block) in_table_block = False current_block = "" current_table = None # 列注释处理 elif re.search(r'COMMENT\s+ON\s+COLUMN', stmt, re.IGNORECASE): column_comment_match = re.search( r'COMMENT\s+ON\s+COLUMN\s+[\'"]?(\w+)[\'"]?\.[\'"]?(\w+)[\'"]?\s+IS\s+\'([^\']+)\'', stmt, re.IGNORECASE ) if column_comment_match: comment_table = column_comment_match.group(1) if comment_table == current_table: current_block += " " + stmt else: # 这是另一个表的注释,当前表的DDL到此结束 create_table_statements.append(current_block) in_table_block = False current_block = "" current_table = None elif in_table_block and re.search(r'^\s*CREATE\s+', stmt, re.IGNORECASE): # 如果遇到新的CREATE语句(不是注释),保存当前块并结束 create_table_statements.append(current_block) in_table_block = False current_block = "" current_table = None # 添加最后一个块 if in_table_block and current_block: create_table_statements.append(current_block) # 日志记录 logger.debug(f"提取到 {len(create_table_statements)} 个DDL语句") for i, stmt in enumerate(create_table_statements): logger.debug(f"DDL语句 {i+1}: {stmt}") return create_table_statements except Exception as e: logger.error(f"提取DDL语句失败: {str(e)}") # logger.error(traceback.format_exc()) return [] def table_sql(sql): """解析表定义SQL,支持带schema和不带schema两种格式""" try: # 支持以下格式: # 1. CREATE TABLE tablename # 2. CREATE TABLE "tablename" # 3. CREATE TABLE 'tablename' # 4. CREATE TABLE schema.tablename # 5. CREATE TABLE "schema"."tablename" # 6. CREATE TABLE 'schema'.'tablename' # 匹配表名,支持带引号和不带引号的情况 table_pattern = r'CREATE\s+TABLE\s+(?:(?:"([^"]+)"|\'([^\']+)\'|([^"\'\s\.]+))\.)?(?:"([^"]+)"|\'([^\']+)\'|([^"\'\s\(]+))' table_match = re.search(table_pattern, sql, re.IGNORECASE) if not table_match: logger.error(f"无法匹配CREATE TABLE语句: {sql[:100]}...") return None # 获取表名 schema = table_match.group(1) or table_match.group(2) or table_match.group(3) table_name = table_match.group(4) or table_match.group(5) or table_match.group(6) if not table_name: logger.error("无法解析表名") return None logger.debug(f"解析到表名: {table_name}") # 提取CREATE TABLE语句的主体部分(括号内的内容) body_pattern = r'CREATE\s+TABLE\s+[^(]*\((.*?)\)(?=\s*;|\s*$)' body_match = re.search(body_pattern, sql, re.DOTALL | re.IGNORECASE) if not body_match: logger.error("无法提取表主体内容") return None body_text = body_match.group(1).strip() logger.debug(f"表定义主体部分: {body_text}") # 解析字段定义 fields = [] # 分割字段定义,处理括号嵌套和引号 field_defs = [] pos = 0 in_parentheses = 0 in_quotes = False quote_char = None for i, char in enumerate(body_text): if char in ["'", '"', '`'] and (not in_quotes or char == quote_char): in_quotes = not in_quotes if in_quotes: quote_char = char else: quote_char = None elif char == '(' and not in_quotes: in_parentheses += 1 elif char == ')' and not in_quotes: in_parentheses -= 1 elif char == ',' and in_parentheses == 0 and not in_quotes: field_defs.append(body_text[pos:i].strip()) pos = i + 1 # 添加最后一个字段定义 if pos < len(body_text): field_defs.append(body_text[pos:].strip()) logger.debug(f"解析出 {len(field_defs)} 个字段定义") # 处理每个字段定义 for field_def in field_defs: # 跳过约束定义 if re.match(r'^\s*(?:PRIMARY|UNIQUE|FOREIGN|CHECK|CONSTRAINT)\s+', field_def, re.IGNORECASE): continue # 提取字段名和类型 field_pattern = r'^\s*(?:"([^"]+)"|\'([^\']+)\'|`([^`]+)`|([a-zA-Z0-9_]+))\s+(.+?)(?:\s+DEFAULT\s+|\s+NOT\s+NULL|\s+REFERENCES|\s*$)' field_match = re.search(field_pattern, field_def, re.IGNORECASE) if field_match: # 提取字段名 field_name = field_match.group(1) or field_match.group(2) or field_match.group(3) or field_match.group(4) # 提取类型 field_type = field_match.group(5).strip() # 处理类型中可能的括号 type_base = re.split(r'\s+', field_type)[0] clean_type_value = clean_type(type_base) fields.append((field_name, clean_type_value)) logger.debug(f"解析到字段: {field_name}, 类型: {clean_type_value}") else: logger.warning(f"无法解析字段定义: {field_def}") # 提取表注释 table_comment = "" table_comment_pattern = r"COMMENT\s+ON\s+TABLE\s+(?:['\"]?(\w+)['\"]?)\s+IS\s+'([^']+)'" table_comment_match = re.search(table_comment_pattern, sql, re.IGNORECASE) if table_comment_match: comment_table = table_comment_match.group(1) if comment_table.strip("'\"") == table_name.strip("'\""): table_comment = table_comment_match.group(2) logger.debug(f"找到表注释: {table_comment}") # 提取列注释 comments = {} column_comment_pattern = r"COMMENT\s+ON\s+COLUMN\s+['\"]?(\w+)['\"]?\.['\"]?(\w+)['\"]?\s+IS\s+'([^']+)'" for match in re.finditer(column_comment_pattern, sql, re.IGNORECASE): comment_table = match.group(1) column_name = match.group(2) comment = match.group(3) # 检查表名是否匹配 if comment_table.strip("'\"") == table_name.strip("'\""): comments[column_name] = comment logger.debug(f"找到列注释: {column_name} - {comment}") else: logger.debug(f"忽略列注释,表名不匹配: {comment_table} vs {table_name}") # 检查字段和注释匹配情况 logger.debug("========字段和注释匹配情况========") field_names = [f[0] for f in fields] logger.debug(f"字段列表 ({len(field_names)}): {field_names}") logger.debug(f"注释字段 ({len(comments)}): {list(comments.keys())}") # 构建返回结果 meta_list = [] for field_name, field_type in fields: chinese_name = comments.get(field_name, "") meta_list.append({ "en_name": field_name, "data_type": field_type, "name": chinese_name if chinese_name else field_name }) # 检查表是否存在 try: status = status_query([table_name]) except Exception as e: logger.error(f"检查表存在状态失败: {str(e)}") status = [False] # 构建返回结果 result = { table_name: { "exist": status[0] if status else False, "meta": meta_list } } logger.debug(f"解析结果: {json.dumps(result, ensure_ascii=False)}") return result except Exception as e: logger.error(f"解析表定义SQL失败: {str(e)}") logger.error(f"异常详情: {e}") import traceback logger.error(traceback.format_exc()) return None # 判断英文表名是否在图谱中存在 def status_query(key_list): query = """ unwind $Key_list as name OPTIONAL MATCH (n:data_model {en_name: name}) OPTIONAL MATCH (n:data_resource {en_name: name}) OPTIONAL MATCH (n:data_metric {en_name: name}) WITH name, CASE WHEN n IS NOT NULL THEN True ELSE False END AS exist return collect(exist)AS exist """ with neo4j_driver.get_session() as session: result = session.run(query, Key_list=key_list) data = result.value() # 获取单个值 return data def select_sql(sql_query): """解析SELECT查询语句""" try: # 提取SELECT子句 select_pattern = r'SELECT\s+(.*?)\s+FROM' select_match = re.search(select_pattern, sql_query, re.IGNORECASE | re.DOTALL) if not select_match: return None select_clause = select_match.group(1) # 分割字段 fields = [] # 处理字段列表,避免在函数调用中的逗号导致错误分割 in_parenthesis = 0 current_field = "" for char in select_clause: if char == '(': in_parenthesis += 1 current_field += char elif char == ')': in_parenthesis -= 1 current_field += char elif char == ',' and in_parenthesis == 0: fields.append(current_field.strip()) current_field = "" else: current_field += char if current_field.strip(): fields.append(current_field.strip()) # 解析每个字段 parsed_fields = [] for field in fields: # 检查是否有字段别名 alias_pattern = r'(.*?)\s+[aA][sS]\s+(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))$' alias_match = re.search(alias_pattern, field) if alias_match: field_expr = alias_match.group(1).strip() field_alias = next((g for g in alias_match.groups()[1:] if g is not None), "") parsed_fields.append({ "expression": field_expr, "alias": field_alias }) else: # 没有别名的情况 parsed_fields.append({ "expression": field.strip(), "alias": None }) # 提取FROM子句和表名 from_pattern = r'FROM\s+(.*?)(?:\s+WHERE|\s+GROUP|\s+HAVING|\s+ORDER|\s+LIMIT|$)' from_match = re.search(from_pattern, sql_query, re.IGNORECASE | re.DOTALL) tables = [] if from_match: from_clause = from_match.group(1).strip() # 分析FROM子句中的表 table_pattern = r'(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))(?:\s+(?:AS\s+)?(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))?' for match in re.finditer(table_pattern, from_clause): table_name = match.group(1) or match.group(2) or match.group(3) or match.group(4) if table_name: tables.append(table_name) return tables except Exception as e: logger.error(f"解析SELECT查询语句失败: {str(e)}") # logger.error(traceback.format_exc()) return None def model_resource_list(page, page_size, name_filter=None): """获取模型资源列表""" try: with neo4j_driver.get_session() as session: # 构建查询条件 match_clause = "MATCH (n:model_resource)" where_clause = "" if name_filter: where_clause = f" WHERE n.name CONTAINS '{name_filter}'" # 计算总数 count_cypher = f"{match_clause}{where_clause} RETURN count(n) as count" count_result = session.run(count_cypher) total_count = count_result.single()["count"] # 分页查询 skip = (page - 1) * page_size cypher = f""" {match_clause}{where_clause} RETURN n ORDER BY n.createTime DESC SKIP {skip} LIMIT {page_size} """ result = session.run(cypher) # 格式化结果 resources = [] for record in result: node = dict(record["n"]) node["id"] = record["n"].id resources.append(node) return resources, total_count except Exception as e: logger.error(f"获取模型资源列表失败: {str(e)}") return [], 0 def data_resource_edit(data): """编辑数据资源""" try: resource_id = data.get("id") if not resource_id: raise ValueError("缺少资源ID") with neo4j_driver.get_session() as session: # 更新节点属性 update_fields = {} for key, value in data.items(): if key != "id" and key != "tag": update_fields[key] = value # 添加更新时间 update_fields["updateTime"] = get_formatted_time() # 构建更新语句 set_clause = ", ".join([f"n.{k} = ${k}" for k in update_fields.keys()]) cypher = f""" MATCH (n:data_resource) WHERE id(n) = $resource_id SET {set_clause} RETURN n """ result = session.run(cypher, resource_id=int(resource_id), **update_fields) updated_node = result.single() if not updated_node: raise ValueError("资源不存在") # 处理标签关系 tag_id = data.get("tag") if tag_id: # 删除旧的标签关系 delete_rel_cypher = """ MATCH (n:data_resource)-[r:label]->() WHERE id(n) = $resource_id DELETE r """ session.run(delete_rel_cypher, resource_id=int(resource_id)) # 创建新的标签关系 create_rel_cypher = """ MATCH (n:data_resource), (t:data_label) WHERE id(n) = $resource_id AND id(t) = $tag_id CREATE (n)-[r:label]->(t) RETURN r """ session.run(create_rel_cypher, resource_id=int(resource_id), tag_id=int(tag_id)) # 返回更新后的节点 return dict(updated_node["n"]) except Exception as e: logger.error(f"编辑数据资源失败: {str(e)}") raise def handle_data_source(data_source): """处理数据源的检查和创建 """ try: # 1. 检查en_name是否存在 ds_en_name = data_source.get("en_name") if not ds_en_name: raise ValueError("数据源信息不完整,缺少名称(en_name)") # 2. 处理name字段 if "name" not in data_source or not data_source["name"]: data_source["name"] = ds_en_name logger.debug(f"数据源name为空,使用en_name作为替代: {ds_en_name}") # 3. 检查是否为简单查询模式 required_fields = ["type", "host", "port", "database", "username"] has_required_fields = all(data_source.get(field) for field in required_fields) with neo4j_driver.get_session() as session: # 简单查询模式:只通过en_name查找已有数据源 if not has_required_fields: logger.info(f"简单数据源查询模式,查找en_name为: {ds_en_name}") check_name_cypher = """ MATCH (ds:data_source {en_name: $en_name}) RETURN ds """ check_result = session.run(check_name_cypher, en_name=ds_en_name) existing_record = check_result.single() if existing_record: # 数据源已存在,返回其名称 existing_data_source = dict(existing_record["ds"]) logger.info(f"根据名称找到现有数据源: {existing_data_source.get('en_name')}") return existing_data_source.get("en_name") else: # 数据源不存在,抛出异常 raise ValueError(f"未找到名称为 {ds_en_name} 的数据源,请先创建该数据源或提供完整的数据源信息") except Exception as e: logger.error(f"处理数据源失败: {str(e)}") raise RuntimeError(f"处理数据源失败: {str(e)}")