""" 数据接口核心业务逻辑模块 本模块包含了数据接口相关的所有核心业务逻辑函数,包括: - 数据标准(data_standard)相关功能 - 数据标签(DataLabel)相关功能 - 图谱生成 - 动态标签识别等功能 """ import logging import re from app.core.graph.graph_operations import connect_graph from app.services.neo4j_driver import neo4j_driver # 配置logger logger = logging.getLogger(__name__) def _build_category_filter_conditions(category_filter, params): """ 将 category_filter 转换为 Cypher 查询条件列表。 支持: - 字典: {field: value, ...} - 列表: [{"field": "...", "value": "..."}, {"category": "xxx"}] - 字符串: 兼容旧用法,等同于按 category 字段过滤 """ conditions = [] param_index = 0 def add_condition(field, value): nonlocal param_index if value is None: return if not isinstance(field, str): return if not re.match(r"^[A-Za-z_][A-Za-z0-9_]*$", field): logger.warning(f"忽略非法属性字段: {field}") return param_key = f"category_filter_{param_index}" param_index += 1 conditions.append(f"n.{field} CONTAINS ${param_key}") params[param_key] = value if isinstance(category_filter, dict): for field, value in category_filter.items(): add_condition(field, value) elif isinstance(category_filter, list): for item in category_filter: if not isinstance(item, dict): continue if "field" in item and "value" in item: add_condition(item.get("field"), item.get("value")) elif len(item) == 1: field, value = next(iter(item.items())) add_condition(field, value) elif category_filter: add_condition("category", category_filter) return conditions # 数据标准列表展示 def standard_list( skip_count, page_size, name_en_filter=None, name_zh_filter=None, category_filter=None, create_time_filter=None, ): """ 获取数据标准列表 Args: skip_count: 跳过的记录数量 page_size: 每页记录数量 name_en_filter: 英文名称过滤条件 name_zh_filter: 名称过滤条件 category_filter: 分类过滤条件 create_time_filter: 时间过滤条件 Returns: tuple: (数据列表, 总记录数) """ data = [] # 构建查询条件 where_clause = [] params = {} if name_zh_filter: where_clause.append("n.name_zh CONTAINS $name_zh_filter") params['name_zh_filter'] = name_zh_filter if name_en_filter: where_clause.append("n.name_en CONTAINS $name_en_filter") params['name_en_filter'] = name_en_filter if category_filter: where_clause.append("n.category CONTAINS $category_filter") params['category_filter'] = category_filter if create_time_filter: where_clause.append("n.create_time CONTAINS $create_time_filter") params['create_time_filter'] = create_time_filter else: where_clause.append("TRUE") where_str = " AND ".join(where_clause) # 构建完整的查询语句 cql = f""" MATCH (n:data_standard) WHERE {where_str} RETURN properties(n) as properties, n.create_time as create_time, id(n) as nodeid, size([(n)<-[]-() | 1]) + size([(n)-[]->() | 1]) as relationship_count ORDER BY create_time desc SKIP $skip_count LIMIT $page_size """ params['skip_count'] = skip_count params['page_size'] = page_size # 修复:使用正确的session方式执行查询 driver = None try: driver = connect_graph() with driver.session() as session: result = session.run(cql, **params) for record in result: properties = { key: value for key, value in record['properties'].items() if key not in ['input', 'code', 'output'] } properties.setdefault("describe", None) new_attr = { 'id': record['nodeid'], 'number': record['relationship_count'] } properties.update(new_attr) data.append(properties) # 获取总量 total_query = ( f"MATCH (n:data_standard) WHERE {where_str} " "RETURN COUNT(n) AS total" ) total_record = session.run(total_query, **params).single() total = total_record["total"] if total_record else 0 return data, total except (ConnectionError, ValueError) as e: logger.error(f"Neo4j数据库连接失败: {str(e)}") return [], 0 finally: if driver: driver.close() # 数据标准图谱展示(血缘关系)父节点 def standard_kinship_graph(nodeid): """ 生成数据标准的血缘关系图谱 Args: nodeid: 节点ID Returns: 图谱数据 """ # 查询语句 cql = """ MATCH(da:data_standard) WHERE id(da)=$nodeId OPTIONAL MATCH(a:DataResource)-[:clean_resource]-(da) OPTIONAL MATCH(b:DataModel)-[:clean_model]-(da) WITH collect({ id:toString(id(a)), text:a.name, type:split(labels(a)[0],'_')[1] })+ collect({ id:toString(id(b)), text:b.name, type:split(labels(b)[0],'_')[1] })+ collect({ id:toString(id(da)), text:da.name, type:split(labels(da)[0],'_')[1] }) as nodes, da, collect({from:toString(id(a)),to:toString(id(da)),text:'标准'})+ collect({from:toString(id(b)),to:toString(id(da)),text:'标准'}) as lines WITH toString(id(da)) as rootId, apoc.coll.toSet(lines) as lines, apoc.coll.toSet(nodes) as nodes RETURN nodes,lines,rootId """ driver = None try: driver = connect_graph() with driver.session() as session: result = session.run(cql, nodeId=nodeid) res = {} for item in result: res = { "nodes": [ record for record in item['nodes'] if record['id'] ], "lines": [ record for record in item['lines'] if record['from'] and record['to'] ], "rootId": item['rootId'] } return res except (ConnectionError, ValueError) as e: logger.error(f"Neo4j数据库连接失败: {str(e)}") return {} finally: if driver: driver.close() # 数据标准图谱展示(影响关系)下游 def standard_impact_graph(nodeid): """ 生成数据标准的影响关系图谱 Args: nodeid: 节点ID Returns: 图谱数据 """ # 查询语句 cql = """ MATCH(da:data_standard) WHERE id(da)=$nodeId OPTIONAL MATCH(da)-[:clean_model]-(m1:DataMeta)-[:clean_model]-(da) OPTIONAL MATCH(da)-[:clean_model]-(m2:DataMeta)-[:clean_model]-(da) WITH collect({ id:toString(id(da)), text:da.name, type:split(labels(da)[0],'_')[1] })+ collect({id:toString(id(m1)),text:m1.name})+ collect({id:toString(id(m2)),text:m2.name}) as nodes, da, collect({ from:toString(id(da)), to:toString(id(m1)), text:'标准清洗' })+ collect({ from:toString(id(da)), to:toString(id(m2)), text:'标准清洗' }) as lines WITH toString(id(da)) as rootId, apoc.coll.toSet(lines) as lines, apoc.coll.toSet(nodes) as nodes RETURN nodes,lines,rootId """ driver = None try: driver = connect_graph() with driver.session() as session: result = session.run(cql, nodeId=nodeid) res = {} for item in result: res = { "nodes": [ record for record in item['nodes'] if record['id'] ], "lines": [ record for record in item['lines'] if record['from'] and record['to'] ], "rootId": item['rootId'] } return res except (ConnectionError, ValueError) as e: logger.error(f"Neo4j数据库连接失败: {str(e)}") return {} finally: if driver: driver.close() # 数据标准图谱展示(所有关系) def standard_all_graph(nodeid): """ 生成数据标准的所有关系图谱 Args: nodeid: 节点ID Returns: 图谱数据 """ # 查询语句 cql = """ MATCH(da:data_standard) WHERE id(da)=$nodeId OPTIONAL MATCH(a:DataResource)-[:clean_resource]-(da) OPTIONAL MATCH(b:DataModel)-[:clean_model]-(da) OPTIONAL MATCH(da)-[:clean_model]-(m1:DataMeta)-[:clean_model]-(da) OPTIONAL MATCH(da)-[:clean_model]-(m2:DataMeta)-[:clean_model]-(da) WITH collect({ id:toString(id(a)), text:a.name, type:split(labels(a)[0],'_')[1] })+ collect({ id:toString(id(b)), text:b.name, type:split(labels(b)[0],'_')[1] })+ collect({ id:toString(id(da)), text:da.name, type:split(labels(da)[0],'_')[1] })+ collect({id:toString(id(m1)),text:m1.name})+ collect({id:toString(id(m2)),text:m2.name}) as nodes, da, collect({from:toString(id(a)),to:toString(id(da)),text:'标准'})+ collect({from:toString(id(b)),to:toString(id(da)),text:'标准'})+ collect({ from:toString(id(da)), to:toString(id(m1)), text:'标准清洗' })+ collect({ from:toString(id(da)), to:toString(id(m2)), text:'标准清洗' }) as lines WITH toString(id(da)) as rootId, apoc.coll.toSet(lines) as lines, apoc.coll.toSet(nodes) as nodes RETURN nodes,lines,rootId """ driver = None try: driver = connect_graph() with driver.session() as session: result = session.run(cql, nodeId=nodeid) res = {} for item in result: res = { "nodes": [ record for record in item['nodes'] if record['id'] ], "lines": [ record for record in item['lines'] if record['from'] and record['to'] ], "rootId": item['rootId'] } return res except (ConnectionError, ValueError) as e: logger.error(f"Neo4j数据库连接失败: {str(e)}") return {} finally: if driver: driver.close() # 数据标签列表展示 def label_list( skip_count, page_size, name_en_filter=None, name_zh_filter=None, category_filter=None, group_filter=None, ): """ 获取数据标签列表 Args: skip_count: 跳过的记录数量 page_size: 每页记录数量 name_en_filter: 英文名称过滤条件 name_zh_filter: 名称过滤条件 category_filter: 分类过滤条件 group_filter: 分组过滤条件 Returns: tuple: (数据列表, 总记录数) """ data = [] # 构建查询条件 where_clause = [] params = {} if name_zh_filter: where_clause.append("n.name_zh CONTAINS $name_zh_filter") params['name_zh_filter'] = name_zh_filter if name_en_filter: where_clause.append("n.name_en CONTAINS $name_en_filter") params['name_en_filter'] = name_en_filter where_clause.extend( _build_category_filter_conditions(category_filter, params) ) if group_filter: where_clause.append("n.group CONTAINS $group_filter") params['group_filter'] = group_filter if not where_clause: where_clause.append("TRUE") where_str = " AND ".join(where_clause) # 构建完整的查询语句 cql = f""" MATCH (n:DataLabel) WHERE {where_str} WITH n, properties(n) as properties, n.create_time as create_time, id(n) as nodeid OPTIONAL MATCH (n)<-[r]-() WITH n, properties, create_time, nodeid, count(r) as incoming OPTIONAL MATCH (n)-[r]->() WITH n, properties, create_time, nodeid, incoming, count(r) as outgoing RETURN properties, create_time, nodeid, incoming + outgoing as relationship_count ORDER BY create_time desc SKIP $skip_count LIMIT $page_size """ params['skip_count'] = skip_count params['page_size'] = page_size driver = None try: driver = connect_graph() with driver.session() as session: result = session.run(cql, **params) for record in result: properties = record['properties'] new_attr = { 'id': record['nodeid'], 'number': record['relationship_count'] } if "describe" not in properties: properties["describe"] = None if "scope" not in properties: properties["scope"] = None properties.update(new_attr) data.append(properties) # 获取总量 total_query = ( f"MATCH (n:DataLabel) WHERE {where_str} " "RETURN COUNT(n) AS total" ) total_record = session.run(total_query, **params).single() total = total_record["total"] if total_record else 0 return data, total except (ConnectionError, ValueError) as e: logger.error(f"Neo4j数据库连接失败: {str(e)}") return [], 0 finally: if driver: driver.close() # 数据标签图谱展示 def id_label_graph(id): """ 根据ID生成数据标签图谱 Args: id: 节点ID Returns: 图谱数据 """ query = """ MATCH (n:DataLabel) WHERE id(n) = $nodeId OPTIONAL MATCH (a)-[:LABEL]-(n) WITH collect({ from: toString(id(a)), to: toString(id(n)), text: "标签" }) AS line1, collect({ id: toString(id(n)), text: n.name_zh, type:"label" }) AS node1, collect({ id: toString(id(a)), text: a.name_zh, type: split(labels(a)[0], '_')[1] }) AS node2, n WITH apoc.coll.toSet(line1) AS lines, apoc.coll.toSet(node1 + node2) AS nodes, toString(id(n)) AS res RETURN lines, nodes, res """ driver = None try: driver = connect_graph() with driver.session() as session: result = session.run(query, nodeId=id) res = {} for item in result: res = { "nodes": [ record for record in item['nodes'] if record['id'] ], "lines": [ record for record in item['lines'] if record['from'] and record['to'] ], "rootId": item['res'], } return res except (ConnectionError, ValueError) as e: logger.error(f"Neo4j数据库连接失败: {str(e)}") return {} finally: if driver: driver.close() # 数据标签图谱展示(血缘关系)父节点/(所有关系) def label_kinship_graph(nodeid): """ 生成数据标签的血缘关系图谱 Args: nodeid: 节点ID Returns: 图谱数据 """ # 查询语句 cql = """ MATCH(la:DataLabel) WHERE id(la)=$nodeId OPTIONAL MATCH(a:DataResource)-[:LABEL]-(la) OPTIONAL MATCH(b:DataModel)-[:LABEL]-(la) OPTIONAL MATCH(meta:DataMeta)-[:LABEL]-(la) OPTIONAL MATCH(d:data_standard)-[:LABEL]-(la) OPTIONAL MATCH(e:DataMetric)-[:LABEL]-(la) WITH collect({ id:toString(id(a)), text:a.name_zh, type:split(labels(a)[0],'_')[1] })+ collect({ id:toString(id(b)), text:b.name_zh, type:split(labels(b)[0],'_')[1] })+ collect({ id:toString(id(d)), text:d.name_zh, type:split(labels(d)[0],'_')[1] })+ collect({ id:toString(id(e)), text:e.name_zh, type:split(labels(e)[0],'_')[1] })+ collect({ id:toString(id(la)), text:la.name_zh, type:split(labels(la)[0],'_')[1] })+ collect({id:toString(id(meta)),text:meta.name_zh}) as nodes, la, collect({from:toString(id(a)),to:toString(id(la)),text:'标签'})+ collect({from:toString(id(b)),to:toString(id(la)),text:'标签'})+ collect({from:toString(id(meta)),to:toString(id(la)),text:'标签'})+ collect({from:toString(id(d)),to:toString(id(la)),text:'标签'})+ collect({from:toString(id(e)),to:toString(id(la)),text:'标签'}) as lines WITH toString(id(la)) as rootId, apoc.coll.toSet(lines) as lines, apoc.coll.toSet(nodes) as nodes RETURN nodes,lines,rootId """ driver = None try: driver = connect_graph() with driver.session() as session: result = session.run(cql, nodeId=nodeid) res = {} for item in result: res = { "nodes": [ record for record in item['nodes'] if record['id'] ], "lines": [ record for record in item['lines'] if record['from'] and record['to'] ], "rootId": item['rootId'] } return res except (ConnectionError, ValueError) as e: logger.error(f"Neo4j数据库连接失败: {str(e)}") return {} finally: if driver: driver.close() # 数据标签图谱展示(影响关系)下游 def label_impact_graph(nodeid): """ 生成数据标签的影响关系图谱 Args: nodeid: 节点ID Returns: 图谱数据 """ # 查询语句 cql = """ MATCH(n:DataLabel) WHERE id(n)=$nodeId RETURN { id:toString(id(n)), text:(n.name_zh), type:"label" } AS nodes, toString(id(n)) as rootId """ driver = None try: driver = connect_graph() with driver.session() as session: result = session.run(cql, nodeId=nodeid) res = {} for item in result: res = { "nodes": item['nodes'], "rootId": item['rootId'], "lines": [] } return res except (ConnectionError, ValueError) as e: logger.error(f"Neo4j数据库连接失败: {str(e)}") return {} finally: if driver: driver.close() # 数据标签按照提交内容查询相似分组,并且返回 def dynamic_label_list(name_filter=None): """ 根据内容查询相似的数据标签分组 Args: name_filter: 内容过滤条件 Returns: 标签分组列表 """ # 构建完整的查询语句 cql = """ MATCH (n:DataLabel) WITH n, apoc.text.levenshteinSimilarity(n.group, $name_filter) AS similarity WHERE similarity > 0.1 // 设置相似度阈值 RETURN DISTINCT n.group as name_zh, id(n) as nodeid """ driver = None try: driver = connect_graph() with driver.session() as session: result = session.run(cql, name_filter=name_filter or "") data = [] for record in result: data.append( { "name_zh": record['name_zh'], "id": record['nodeid'], } ) return data except (ConnectionError, ValueError) as e: logger.error(f"Neo4j数据库连接失败: {str(e)}") return [] finally: if driver: driver.close() def search_info(key, value): """ 搜索指定属性的节点信息 Args: key: 搜索属性键 value: 搜索属性值 Returns: 搜索结果列表 """ field_pattern = r"^[A-Za-z_][A-Za-z0-9_]*$" if not re.match(field_pattern, str(key)): logger.warning("非法属性键: %s", key) return [] query = """ MATCH (n) WHERE n[$field] =~ $pattern WITH n, properties(n) as properties, n.create_time as create_time, id(n) as nodeid RETURN properties, nodeid, create_time, labels(n) as labels LIMIT 30 """ driver = None try: driver = connect_graph() except (ConnectionError, ValueError) as e: logger.error(f"无法连接到Neo4j数据库: {str(e)}") return [] try: with driver.session() as session: result = session.run( query, field=key, pattern=f"(?i).*{value}.*", ) results = [] for record in result: results.append( { "properties": record["properties"], "id": record["nodeid"], "create_time": record["create_time"], "labels": record["labels"], } ) return results except Exception as e: logger.error(f"搜索节点信息失败: {str(e)}") return [] finally: if driver: driver.close() def label_info(id): """ 获取标签节点的信息 Args: id: 节点ID Returns: 标签节点信息 """ query = """ MATCH (n) WHERE id(n) = $nodeId RETURN { id:toString(id(n)), text:(n.name_zh), type:"label" } AS nodes, toString(id(n)) as rootId """ driver = None try: driver = connect_graph() with driver.session() as session: result = session.run(query, nodeId=id).data() return result[0] if result else {} except (ConnectionError, ValueError) as e: logger.error(f"无法连接到Neo4j数据库: {str(e)}") return {} finally: if driver: driver.close() def graph_all(domain_id, include_meta=True): """ 获取完整关系图谱 Args: domain_id: 节点ID include_meta: 是否包含元数据节点 Returns: dict: 包含 nodes 与 lines 的图谱数据 """ try: domain_id_int = int(domain_id) except (ValueError, TypeError): logger.error(f"节点ID不是有效的整数: {domain_id}") return {"nodes": [], "lines": []} try: with neo4j_driver.get_session() as session: nodes = {} lines = {} # 使用路径查询同时获取节点和关系 if include_meta: cypher = """ MATCH (n) WHERE id(n) = $domain_id OPTIONAL MATCH path = (n)-[r]-(m) RETURN n, collect(DISTINCT m) as related_nodes, collect(DISTINCT r) as relationships """ else: cypher = """ MATCH (n) WHERE id(n) = $domain_id OPTIONAL MATCH path = (n)-[r]-(m) WHERE NOT (m:DataMeta) RETURN n, collect(DISTINCT m) as related_nodes, collect(DISTINCT r) as relationships """ result = session.run(cypher, domain_id=domain_id_int) record = result.single() if not record: logger.warning(f"未找到节点: {domain_id_int}") return {"nodes": [], "lines": []} # 处理起始节点 n_node = record["n"] if n_node: n_props = dict(n_node) n_labels = list(n_node.labels) n_props["id"] = domain_id_int n_props["node_type"] = n_labels[0] if n_labels else "" nodes[domain_id_int] = n_props # 处理关联节点 related_nodes = record["related_nodes"] or [] for m_node in related_nodes: if m_node is None: continue m_elem_id = m_node.element_id m_id = int(m_elem_id.split(":")[-1]) if m_id not in nodes: m_props = dict(m_node) m_labels = list(m_node.labels) m_props["id"] = m_id m_props["node_type"] = m_labels[0] if m_labels else "" nodes[m_id] = m_props # 处理关系 relationships = record["relationships"] or [] for rel in relationships: if rel is None: continue rel_elem_id = rel.element_id rel_id = rel_elem_id.split(":")[-1] if rel_id not in lines: # 获取关系的起始和结束节点 ID start_elem_id = rel.start_node.element_id end_elem_id = rel.end_node.element_id start_id = start_elem_id.split(":")[-1] end_id = end_elem_id.split(":")[-1] # 获取关系类型 rel_type = type(rel).__name__ lines[rel_id] = { "id": rel_id, "from": start_id, "to": end_id, "text": rel_type, } logger.info( f"graph_all 结果: node_id={domain_id_int}, " f"nodes={len(nodes)}, lines={len(lines)}" ) return { "nodes": list(nodes.values()), "lines": list(lines.values()), } except Exception as e: logger.error(f"获取图谱失败: {str(e)}") import traceback logger.error(traceback.format_exc()) return {"nodes": [], "lines": []} def node_delete(node_id): """ 删除 DataLabel 节点及其所有关联关系 Args: node_id: 节点ID(整数) Returns: dict: 删除结果,包含 success 状态和 message 信息 """ driver = None try: driver = connect_graph() except (ConnectionError, ValueError) as e: logger.error(f"无法连接到Neo4j数据库: {str(e)}") return {"success": False, "message": "无法连接到数据库"} try: with driver.session() as session: # 首先检查节点是否存在且为 DataLabel 类型 check_query = """ MATCH (n:DataLabel) WHERE id(n) = $nodeId RETURN n """ check_result = session.run( check_query, nodeId=node_id, ).single() if not check_result: logger.warning(f"DataLabel 节点不存在: ID={node_id}") return { "success": False, "message": f"DataLabel 节点不存在 (ID: {node_id})", } # 删除节点及其所有关系 delete_query = """ MATCH (n:DataLabel) WHERE id(n) = $nodeId DETACH DELETE n RETURN count(n) as deleted_count """ delete_result = session.run( delete_query, nodeId=node_id, ).single() if not delete_result: logger.warning(f"删除结果为空: ID={node_id}") return { "success": False, "message": "删除失败,未获取到删除结果", } deleted_count = delete_result["deleted_count"] if deleted_count > 0: logger.info(f"成功删除 DataLabel 节点: ID={node_id}") return { "success": True, "message": ( f"成功删除 DataLabel 节点 (ID: {node_id})" ), } else: logger.warning(f"删除失败,节点可能已被删除: ID={node_id}") return {"success": False, "message": "删除失败,节点可能已被删除"} except Exception as e: logger.error(f"删除 DataLabel 节点失败: {str(e)}") return {"success": False, "message": f"删除失败: {str(e)}"} finally: if driver: driver.close()