""" 数据接口核心业务逻辑模块 本模块包含了数据接口相关的所有核心业务逻辑函数,包括: - 数据标准(data_standard)相关功能 - 数据标签(DataLabel)相关功能 - 图谱生成 - 动态标签识别等功能 """ import logging import re from app.core.graph.graph_operations import connect_graph from app.services.neo4j_driver import neo4j_driver # 配置logger logger = logging.getLogger(__name__) def _build_category_filter_conditions(category_filter, params): """ 将 category_filter 转换为 Cypher 查询条件列表。 支持: - 字典: {field: value, ...} - 列表: [{"field": "...", "value": "..."}, {"category": "xxx"}] - 字符串: 兼容旧用法,等同于按 category 字段过滤 """ conditions = [] param_index = 0 def add_condition(field, value): nonlocal param_index if value is None: return if not isinstance(field, str): return if not re.match(r"^[A-Za-z_][A-Za-z0-9_]*$", field): logger.warning(f"忽略非法属性字段: {field}") return param_key = f"category_filter_{param_index}" param_index += 1 conditions.append(f"n.{field} CONTAINS ${param_key}") params[param_key] = value if isinstance(category_filter, dict): for field, value in category_filter.items(): add_condition(field, value) elif isinstance(category_filter, list): for item in category_filter: if not isinstance(item, dict): continue if "field" in item and "value" in item: add_condition(item.get("field"), item.get("value")) elif len(item) == 1: field, value = next(iter(item.items())) add_condition(field, value) elif category_filter: add_condition("category", category_filter) return conditions # 数据标准列表展示 def standard_list( skip_count, page_size, name_en_filter=None, name_zh_filter=None, category_filter=None, create_time_filter=None, ): """ 获取数据标准列表 Args: skip_count: 跳过的记录数量 page_size: 每页记录数量 name_en_filter: 英文名称过滤条件 name_zh_filter: 名称过滤条件 category_filter: 分类过滤条件 create_time_filter: 时间过滤条件 Returns: tuple: (数据列表, 总记录数) """ data = [] # 构建查询条件 where_clause = [] params = {} if name_zh_filter: where_clause.append("n.name_zh CONTAINS $name_zh_filter") params["name_zh_filter"] = name_zh_filter if name_en_filter: where_clause.append("n.name_en CONTAINS $name_en_filter") params["name_en_filter"] = name_en_filter if category_filter: where_clause.append("n.category CONTAINS $category_filter") params["category_filter"] = category_filter if create_time_filter: where_clause.append("n.create_time CONTAINS $create_time_filter") params["create_time_filter"] = create_time_filter else: where_clause.append("TRUE") where_str = " AND ".join(where_clause) # 构建完整的查询语句 cql = f""" MATCH (n:data_standard) WHERE {where_str} RETURN properties(n) as properties, n.create_time as create_time, id(n) as nodeid, size([(n)<-[]-() | 1]) + size([(n)-[]->() | 1]) as relationship_count ORDER BY create_time desc SKIP $skip_count LIMIT $page_size """ params["skip_count"] = skip_count params["page_size"] = page_size # 修复:使用正确的session方式执行查询 driver = None try: driver = connect_graph() with driver.session() as session: result = session.run(cql, **params) for record in result: properties = { key: value for key, value in record["properties"].items() if key not in ["input", "code", "output"] } properties.setdefault("describe", None) new_attr = { "id": record["nodeid"], "number": record["relationship_count"], } properties.update(new_attr) data.append(properties) # 获取总量 total_query = ( f"MATCH (n:data_standard) WHERE {where_str} RETURN COUNT(n) AS total" ) total_record = session.run(total_query, **params).single() total = total_record["total"] if total_record else 0 return data, total except (ConnectionError, ValueError) as e: logger.error(f"Neo4j数据库连接失败: {str(e)}") return [], 0 finally: if driver: driver.close() # 数据标准图谱展示(血缘关系)父节点 def standard_kinship_graph(nodeid): """ 生成数据标准的血缘关系图谱 Args: nodeid: 节点ID Returns: 图谱数据 """ # 查询语句 cql = """ MATCH(da:data_standard) WHERE id(da)=$nodeId OPTIONAL MATCH(a:DataResource)-[:clean_resource]-(da) OPTIONAL MATCH(b:DataModel)-[:clean_model]-(da) WITH collect({ id:toString(id(a)), text:a.name, type:split(labels(a)[0],'_')[1] })+ collect({ id:toString(id(b)), text:b.name, type:split(labels(b)[0],'_')[1] })+ collect({ id:toString(id(da)), text:da.name, type:split(labels(da)[0],'_')[1] }) as nodes, da, collect({from:toString(id(a)),to:toString(id(da)),text:'标准'})+ collect({from:toString(id(b)),to:toString(id(da)),text:'标准'}) as lines WITH toString(id(da)) as rootId, apoc.coll.toSet(lines) as lines, apoc.coll.toSet(nodes) as nodes RETURN nodes,lines,rootId """ driver = None try: driver = connect_graph() with driver.session() as session: result = session.run(cql, nodeId=nodeid) res = {} for item in result: res = { "nodes": [record for record in item["nodes"] if record["id"]], "lines": [ record for record in item["lines"] if record["from"] and record["to"] ], "rootId": item["rootId"], } return res except (ConnectionError, ValueError) as e: logger.error(f"Neo4j数据库连接失败: {str(e)}") return {} finally: if driver: driver.close() # 数据标准图谱展示(影响关系)下游 def standard_impact_graph(nodeid): """ 生成数据标准的影响关系图谱 Args: nodeid: 节点ID Returns: 图谱数据 """ # 查询语句 cql = """ MATCH(da:data_standard) WHERE id(da)=$nodeId OPTIONAL MATCH(da)-[:clean_model]-(m1:DataMeta)-[:clean_model]-(da) OPTIONAL MATCH(da)-[:clean_model]-(m2:DataMeta)-[:clean_model]-(da) WITH collect({ id:toString(id(da)), text:da.name, type:split(labels(da)[0],'_')[1] })+ collect({id:toString(id(m1)),text:m1.name})+ collect({id:toString(id(m2)),text:m2.name}) as nodes, da, collect({ from:toString(id(da)), to:toString(id(m1)), text:'标准清洗' })+ collect({ from:toString(id(da)), to:toString(id(m2)), text:'标准清洗' }) as lines WITH toString(id(da)) as rootId, apoc.coll.toSet(lines) as lines, apoc.coll.toSet(nodes) as nodes RETURN nodes,lines,rootId """ driver = None try: driver = connect_graph() with driver.session() as session: result = session.run(cql, nodeId=nodeid) res = {} for item in result: res = { "nodes": [record for record in item["nodes"] if record["id"]], "lines": [ record for record in item["lines"] if record["from"] and record["to"] ], "rootId": item["rootId"], } return res except (ConnectionError, ValueError) as e: logger.error(f"Neo4j数据库连接失败: {str(e)}") return {} finally: if driver: driver.close() # 数据标准图谱展示(所有关系) def standard_all_graph(nodeid): """ 生成数据标准的所有关系图谱 Args: nodeid: 节点ID Returns: 图谱数据 """ # 查询语句 cql = """ MATCH(da:data_standard) WHERE id(da)=$nodeId OPTIONAL MATCH(a:DataResource)-[:clean_resource]-(da) OPTIONAL MATCH(b:DataModel)-[:clean_model]-(da) OPTIONAL MATCH(da)-[:clean_model]-(m1:DataMeta)-[:clean_model]-(da) OPTIONAL MATCH(da)-[:clean_model]-(m2:DataMeta)-[:clean_model]-(da) WITH collect({ id:toString(id(a)), text:a.name, type:split(labels(a)[0],'_')[1] })+ collect({ id:toString(id(b)), text:b.name, type:split(labels(b)[0],'_')[1] })+ collect({ id:toString(id(da)), text:da.name, type:split(labels(da)[0],'_')[1] })+ collect({id:toString(id(m1)),text:m1.name})+ collect({id:toString(id(m2)),text:m2.name}) as nodes, da, collect({from:toString(id(a)),to:toString(id(da)),text:'标准'})+ collect({from:toString(id(b)),to:toString(id(da)),text:'标准'})+ collect({ from:toString(id(da)), to:toString(id(m1)), text:'标准清洗' })+ collect({ from:toString(id(da)), to:toString(id(m2)), text:'标准清洗' }) as lines WITH toString(id(da)) as rootId, apoc.coll.toSet(lines) as lines, apoc.coll.toSet(nodes) as nodes RETURN nodes,lines,rootId """ driver = None try: driver = connect_graph() with driver.session() as session: result = session.run(cql, nodeId=nodeid) res = {} for item in result: res = { "nodes": [record for record in item["nodes"] if record["id"]], "lines": [ record for record in item["lines"] if record["from"] and record["to"] ], "rootId": item["rootId"], } return res except (ConnectionError, ValueError) as e: logger.error(f"Neo4j数据库连接失败: {str(e)}") return {} finally: if driver: driver.close() # 数据标签列表展示 def label_list( skip_count, page_size, name_en_filter=None, name_zh_filter=None, category_filter=None, group_filter=None, ): """ 获取数据标签列表 Args: skip_count: 跳过的记录数量 page_size: 每页记录数量 name_en_filter: 英文名称过滤条件 name_zh_filter: 名称过滤条件 category_filter: 分类过滤条件 group_filter: 分组过滤条件 Returns: tuple: (数据列表, 总记录数) """ data = [] # 构建查询条件 where_clause = [] params = {} if name_zh_filter: where_clause.append("n.name_zh CONTAINS $name_zh_filter") params["name_zh_filter"] = name_zh_filter if name_en_filter: where_clause.append("n.name_en CONTAINS $name_en_filter") params["name_en_filter"] = name_en_filter where_clause.extend(_build_category_filter_conditions(category_filter, params)) if group_filter: where_clause.append("n.group CONTAINS $group_filter") params["group_filter"] = group_filter if not where_clause: where_clause.append("TRUE") where_str = " AND ".join(where_clause) # 构建完整的查询语句 cql = f""" MATCH (n:DataLabel) WHERE {where_str} WITH n, properties(n) as properties, n.create_time as create_time, id(n) as nodeid OPTIONAL MATCH (n)<-[r]-() WITH n, properties, create_time, nodeid, count(r) as incoming OPTIONAL MATCH (n)-[r]->() WITH n, properties, create_time, nodeid, incoming, count(r) as outgoing RETURN properties, create_time, nodeid, incoming + outgoing as relationship_count ORDER BY create_time desc SKIP $skip_count LIMIT $page_size """ params["skip_count"] = skip_count params["page_size"] = page_size driver = None try: driver = connect_graph() with driver.session() as session: result = session.run(cql, **params) for record in result: properties = record["properties"] new_attr = { "id": record["nodeid"], "number": record["relationship_count"], } if "describe" not in properties: properties["describe"] = None if "scope" not in properties: properties["scope"] = None properties.update(new_attr) data.append(properties) # 获取总量 total_query = ( f"MATCH (n:DataLabel) WHERE {where_str} RETURN COUNT(n) AS total" ) total_record = session.run(total_query, **params).single() total = total_record["total"] if total_record else 0 return data, total except (ConnectionError, ValueError) as e: logger.error(f"Neo4j数据库连接失败: {str(e)}") return [], 0 finally: if driver: driver.close() # 数据标签图谱展示 def id_label_graph(id): """ 根据ID生成数据标签图谱 Args: id: 节点ID Returns: 图谱数据 """ query = """ MATCH (n:DataLabel) WHERE id(n) = $nodeId OPTIONAL MATCH (a)-[:LABEL]-(n) WITH collect({ from: toString(id(a)), to: toString(id(n)), text: "标签" }) AS line1, collect({ id: toString(id(n)), text: n.name_zh, type:"label" }) AS node1, collect({ id: toString(id(a)), text: a.name_zh, type: split(labels(a)[0], '_')[1] }) AS node2, n WITH apoc.coll.toSet(line1) AS lines, apoc.coll.toSet(node1 + node2) AS nodes, toString(id(n)) AS res RETURN lines, nodes, res """ driver = None try: driver = connect_graph() with driver.session() as session: result = session.run(query, nodeId=id) res = {} for item in result: res = { "nodes": [record for record in item["nodes"] if record["id"]], "lines": [ record for record in item["lines"] if record["from"] and record["to"] ], "rootId": item["res"], } return res except (ConnectionError, ValueError) as e: logger.error(f"Neo4j数据库连接失败: {str(e)}") return {} finally: if driver: driver.close() # 数据标签图谱展示(血缘关系)父节点/(所有关系) def label_kinship_graph(nodeid): """ 生成数据标签的血缘关系图谱 Args: nodeid: 节点ID Returns: 图谱数据 """ # 查询语句 cql = """ MATCH(la:DataLabel) WHERE id(la)=$nodeId OPTIONAL MATCH(a:DataResource)-[:LABEL]-(la) OPTIONAL MATCH(b:DataModel)-[:LABEL]-(la) OPTIONAL MATCH(meta:DataMeta)-[:LABEL]-(la) OPTIONAL MATCH(d:data_standard)-[:LABEL]-(la) WITH collect({ id:toString(id(a)), text:a.name_zh, type:split(labels(a)[0],'_')[1] })+ collect({ id:toString(id(b)), text:b.name_zh, type:split(labels(b)[0],'_')[1] })+ collect({ id:toString(id(d)), text:d.name_zh, type:split(labels(e)[0],'_')[1] })+ collect({ id:toString(id(la)), text:la.name_zh, type:split(labels(la)[0],'_')[1] })+ collect({id:toString(id(meta)),text:meta.name_zh}) as nodes, la, collect({from:toString(id(a)),to:toString(id(la)),text:'标签'})+ collect({from:toString(id(b)),to:toString(id(la)),text:'标签'})+ collect({from:toString(id(meta)),to:toString(id(la)),text:'标签'})+ collect({from:toString(id(d)),to:toString(id(la)),text:'标签'})+ collect({from:toString(id(e)),to:toString(id(la)),text:'标签'}) as lines WITH toString(id(la)) as rootId, apoc.coll.toSet(lines) as lines, apoc.coll.toSet(nodes) as nodes RETURN nodes,lines,rootId """ driver = None try: driver = connect_graph() with driver.session() as session: result = session.run(cql, nodeId=nodeid) res = {} for item in result: res = { "nodes": [record for record in item["nodes"] if record["id"]], "lines": [ record for record in item["lines"] if record["from"] and record["to"] ], "rootId": item["rootId"], } return res except (ConnectionError, ValueError) as e: logger.error(f"Neo4j数据库连接失败: {str(e)}") return {} finally: if driver: driver.close() # 数据标签图谱展示(影响关系)下游 def label_impact_graph(nodeid): """ 生成数据标签的影响关系图谱 Args: nodeid: 节点ID Returns: 图谱数据 """ # 查询语句 cql = """ MATCH(n:DataLabel) WHERE id(n)=$nodeId RETURN { id:toString(id(n)), text:(n.name_zh), type:"label" } AS nodes, toString(id(n)) as rootId """ driver = None try: driver = connect_graph() with driver.session() as session: result = session.run(cql, nodeId=nodeid) res = {} for item in result: res = {"nodes": item["nodes"], "rootId": item["rootId"], "lines": []} return res except (ConnectionError, ValueError) as e: logger.error(f"Neo4j数据库连接失败: {str(e)}") return {} finally: if driver: driver.close() # 数据标签按照提交内容查询相似分组,并且返回 def dynamic_label_list(name_filter=None): """ 根据内容查询相似的数据标签分组 Args: name_filter: 内容过滤条件 Returns: 标签分组列表 """ # 构建完整的查询语句 cql = """ MATCH (n:DataLabel) WITH n, apoc.text.levenshteinSimilarity(n.group, $name_filter) AS similarity WHERE similarity > 0.1 // 设置相似度阈值 RETURN DISTINCT n.group as name_zh, id(n) as nodeid """ driver = None try: driver = connect_graph() with driver.session() as session: result = session.run(cql, name_filter=name_filter or "") data = [] for record in result: data.append( { "name_zh": record["name_zh"], "id": record["nodeid"], } ) return data except (ConnectionError, ValueError) as e: logger.error(f"Neo4j数据库连接失败: {str(e)}") return [] finally: if driver: driver.close() def search_info(key, value): """ 搜索指定属性的节点信息 Args: key: 搜索属性键 value: 搜索属性值 Returns: 搜索结果列表 """ field_pattern = r"^[A-Za-z_][A-Za-z0-9_]*$" if not re.match(field_pattern, str(key)): logger.warning("非法属性键: %s", key) return [] query = """ MATCH (n) WHERE n[$field] =~ $pattern WITH n, properties(n) as properties, n.create_time as create_time, id(n) as nodeid RETURN properties, nodeid, create_time, labels(n) as labels LIMIT 30 """ driver = None try: driver = connect_graph() except (ConnectionError, ValueError) as e: logger.error(f"无法连接到Neo4j数据库: {str(e)}") return [] try: with driver.session() as session: result = session.run( query, field=key, pattern=f"(?i).*{value}.*", ) results = [] for record in result: results.append( { "properties": record["properties"], "id": record["nodeid"], "create_time": record["create_time"], "labels": record["labels"], } ) return results except Exception as e: logger.error(f"搜索节点信息失败: {str(e)}") return [] finally: if driver: driver.close() def label_info(id): """ 获取标签节点的信息 Args: id: 节点ID Returns: 标签节点信息 """ query = """ MATCH (n) WHERE id(n) = $nodeId RETURN { id:toString(id(n)), text:(n.name_zh), type:"label" } AS nodes, toString(id(n)) as rootId """ driver = None try: driver = connect_graph() with driver.session() as session: result = session.run(query, nodeId=id).data() return result[0] if result else {} except (ConnectionError, ValueError) as e: logger.error(f"无法连接到Neo4j数据库: {str(e)}") return {} finally: if driver: driver.close() def graph_all(domain_id, include_meta=True): """ 获取完整关系图谱 从指定的 domain_id 节点开始,通过 INPUT 和 OUTPUT 关系遍历找出所有的 DataFlow 节点和 BusinessDomain 节点。 Args: domain_id: 起始节点ID(通常是 BusinessDomain 节点) include_meta: 是否包含元数据节点。如果为 True,会包含: - domain_id 指定的节点本身 - 通过 INCLUDES 关系连接到 domain_id 节点的 DataMeta 节点 Returns: dict: 包含 nodes 与 lines 的图谱数据 """ try: domain_id_int = int(domain_id) except (ValueError, TypeError): logger.error(f"节点ID不是有效的整数: {domain_id}") return {"nodes": [], "lines": []} try: with neo4j_driver.get_session() as session: nodes = {} # 节点字典: {node_id: node_props} lines = {} # 关系字典: {rel_id: rel_props} # 1. 验证起始节点是否存在 check_node_query = """ MATCH (n) WHERE id(n) = $domain_id RETURN n, labels(n) as labels """ result = session.run(check_node_query, domain_id=domain_id_int) record = result.single() if not record: logger.warning(f"未找到节点: {domain_id_int}") return {"nodes": [], "lines": []} start_node = record["n"] start_labels = record["labels"] start_node_type = start_labels[0] if start_labels else "" # 2. 如果 include_meta=True,添加起始节点及其 INCLUDES 关系的 DataMeta 节点 if include_meta: # 添加起始节点 start_props = dict(start_node) start_props["id"] = domain_id_int start_props["node_type"] = start_node_type nodes[domain_id_int] = start_props # 查找通过 INCLUDES 关系连接的 DataMeta 节点 meta_query = """ MATCH (n)-[r:INCLUDES]->(m:DataMeta) WHERE id(n) = $domain_id RETURN m, id(m) as meta_id, id(r) as rel_id """ meta_results = session.run(meta_query, domain_id=domain_id_int) for meta_record in meta_results: meta_node = meta_record["m"] meta_id = meta_record["meta_id"] rel_id = meta_record["rel_id"] # 添加 DataMeta 节点 meta_props = dict(meta_node) meta_props["id"] = meta_id meta_props["node_type"] = "DataMeta" nodes[meta_id] = meta_props # 添加 INCLUDES 关系 lines[str(rel_id)] = { "id": str(rel_id), "from": str(domain_id_int), "to": str(meta_id), "text": "INCLUDES", } # 3. 通过 INPUT 和 OUTPUT 关系遍历,找出所有相关的 DataFlow 和 BusinessDomain 节点 # 使用广度优先遍历,确保 BusinessDomain 和 DataFlow 两种节点都加入队列进行二次遍历 queue = [(domain_id_int, start_node_type)] # (node_id, node_type) processed_bd = set() # 已处理的 BusinessDomain 节点 ID processed_df = set() # 已处理的 DataFlow 节点 ID while queue: current_id, current_type = queue.pop(0) # 如果是 BusinessDomain,查找所有相关的 DataFlow(INPUT 和 OUTPUT 两个方向) if current_type == "BusinessDomain" and current_id not in processed_bd: processed_bd.add(current_id) # 添加当前 BusinessDomain 节点(如果还未添加) if current_id not in nodes: bd_query = """ MATCH (bd:BusinessDomain) WHERE id(bd) = $bd_id RETURN bd """ bd_result = session.run(bd_query, bd_id=current_id).single() if bd_result: bd_node = bd_result["bd"] bd_props = dict(bd_node) bd_props["id"] = current_id bd_props["node_type"] = "BusinessDomain" nodes[current_id] = bd_props # 查找通过 INPUT 关系连接的 DataFlow(BD-[INPUT]->DF) input_query = """ MATCH (bd:BusinessDomain)-[r:INPUT]->(df:DataFlow) WHERE id(bd) = $bd_id RETURN df, id(df) as df_id, id(r) as rel_id """ input_results = session.run(input_query, bd_id=current_id) for input_record in input_results: df_node = input_record["df"] df_id = input_record["df_id"] rel_id = input_record["rel_id"] # 添加 DataFlow 节点 if df_id not in nodes: df_props = dict(df_node) df_props["id"] = df_id df_props["node_type"] = "DataFlow" nodes[df_id] = df_props # 添加 INPUT 关系 lines[str(rel_id)] = { "id": str(rel_id), "from": str(current_id), "to": str(df_id), "text": "INPUT", } # 将 DataFlow 加入队列继续遍历 if df_id not in processed_df: queue.append((df_id, "DataFlow")) # 查找通过 OUTPUT 关系连接的 DataFlow(DF-[OUTPUT]->BD,反向查找) reverse_output_query = """ MATCH (df:DataFlow)-[r:OUTPUT]->(bd:BusinessDomain) WHERE id(bd) = $bd_id RETURN df, id(df) as df_id, id(r) as rel_id """ reverse_output_results = session.run( reverse_output_query, bd_id=current_id ) for reverse_record in reverse_output_results: df_node = reverse_record["df"] df_id = reverse_record["df_id"] rel_id = reverse_record["rel_id"] # 添加 DataFlow 节点 if df_id not in nodes: df_props = dict(df_node) df_props["id"] = df_id df_props["node_type"] = "DataFlow" nodes[df_id] = df_props # 添加 OUTPUT 关系 lines[str(rel_id)] = { "id": str(rel_id), "from": str(df_id), "to": str(current_id), "text": "OUTPUT", } # 将 DataFlow 加入队列继续遍历 if df_id not in processed_df: queue.append((df_id, "DataFlow")) # 如果是 DataFlow,查找所有相关的 BusinessDomain(INPUT 和 OUTPUT 两个方向) elif current_type == "DataFlow" and current_id not in processed_df: processed_df.add(current_id) # 添加当前 DataFlow 节点(如果还未添加) if current_id not in nodes: df_query = """ MATCH (df:DataFlow) WHERE id(df) = $df_id RETURN df """ df_result = session.run(df_query, df_id=current_id).single() if df_result: df_node = df_result["df"] df_props = dict(df_node) df_props["id"] = current_id df_props["node_type"] = "DataFlow" nodes[current_id] = df_props # 查找通过 OUTPUT 关系连接的目标 BusinessDomain(DF-[OUTPUT]->BD) output_query = """ MATCH (df:DataFlow)-[r:OUTPUT]->(bd:BusinessDomain) WHERE id(df) = $df_id RETURN bd, id(bd) as bd_id, id(r) as rel_id """ output_results = session.run(output_query, df_id=current_id) for output_record in output_results: bd_node = output_record["bd"] bd_id = output_record["bd_id"] rel_id = output_record["rel_id"] # 添加 BusinessDomain 节点 if bd_id not in nodes: bd_props = dict(bd_node) bd_props["id"] = bd_id bd_props["node_type"] = "BusinessDomain" nodes[bd_id] = bd_props # 添加 OUTPUT 关系 lines[str(rel_id)] = { "id": str(rel_id), "from": str(current_id), "to": str(bd_id), "text": "OUTPUT", } # 将 BusinessDomain 加入队列继续遍历 if bd_id not in processed_bd: queue.append((bd_id, "BusinessDomain")) # 查找通过 INPUT 关系连接的源 BusinessDomain(BD-[INPUT]->DF,反向查找) reverse_input_query = """ MATCH (bd:BusinessDomain)-[r:INPUT]->(df:DataFlow) WHERE id(df) = $df_id RETURN bd, id(bd) as bd_id, id(r) as rel_id """ reverse_input_results = session.run( reverse_input_query, df_id=current_id ) for reverse_record in reverse_input_results: bd_node = reverse_record["bd"] bd_id = reverse_record["bd_id"] rel_id = reverse_record["rel_id"] # 添加 BusinessDomain 节点 if bd_id not in nodes: bd_props = dict(bd_node) bd_props["id"] = bd_id bd_props["node_type"] = "BusinessDomain" nodes[bd_id] = bd_props # 添加 INPUT 关系 lines[str(rel_id)] = { "id": str(rel_id), "from": str(bd_id), "to": str(current_id), "text": "INPUT", } # 将 BusinessDomain 加入队列继续遍历 if bd_id not in processed_bd: queue.append((bd_id, "BusinessDomain")) logger.info( f"graph_all 结果: node_id={domain_id_int}, " f"nodes={len(nodes)}, lines={len(lines)}, " f"include_meta={include_meta}" ) return { "nodes": list(nodes.values()), "lines": list(lines.values()), } except Exception as e: logger.error(f"获取图谱失败: {str(e)}") import traceback logger.error(traceback.format_exc()) return {"nodes": [], "lines": []} def node_delete(node_id): """ 删除 DataLabel 节点及其所有关联关系 Args: node_id: 节点ID(整数) Returns: dict: 删除结果,包含 success 状态和 message 信息 """ driver = None try: driver = connect_graph() except (ConnectionError, ValueError) as e: logger.error(f"无法连接到Neo4j数据库: {str(e)}") return {"success": False, "message": "无法连接到数据库"} try: with driver.session() as session: # 首先检查节点是否存在且为 DataLabel 类型 check_query = """ MATCH (n:DataLabel) WHERE id(n) = $nodeId RETURN n """ check_result = session.run( check_query, nodeId=node_id, ).single() if not check_result: logger.warning(f"DataLabel 节点不存在: ID={node_id}") return { "success": False, "message": f"DataLabel 节点不存在 (ID: {node_id})", } # 删除节点及其所有关系 delete_query = """ MATCH (n:DataLabel) WHERE id(n) = $nodeId DETACH DELETE n RETURN count(n) as deleted_count """ delete_result = session.run( delete_query, nodeId=node_id, ).single() if not delete_result: logger.warning(f"删除结果为空: ID={node_id}") return { "success": False, "message": "删除失败,未获取到删除结果", } deleted_count = delete_result["deleted_count"] if deleted_count > 0: logger.info(f"成功删除 DataLabel 节点: ID={node_id}") return { "success": True, "message": (f"成功删除 DataLabel 节点 (ID: {node_id})"), } else: logger.warning(f"删除失败,节点可能已被删除: ID={node_id}") return {"success": False, "message": "删除失败,节点可能已被删除"} except Exception as e: logger.error(f"删除 DataLabel 节点失败: {str(e)}") return {"success": False, "message": f"删除失败: {str(e)}"} finally: if driver: driver.close()