""" 数据接口核心业务逻辑模块 本模块包含了数据接口相关的所有核心业务逻辑函数,包括: - 数据标准(data_standard)相关功能 - 数据标签(data_label)相关功能 - 图谱生成 - 动态标签识别等功能 """ import logging from app.core.graph.graph_operations import connect_graph # 配置logger logger = logging.getLogger(__name__) # 数据标准列表展示 def standard_list(skip_count, page_size, en_name_filter=None, name_filter=None, category_filter=None, time_filter=None): """ 获取数据标准列表 Args: skip_count: 跳过的记录数量 page_size: 每页记录数量 en_name_filter: 英文名称过滤条件 name_filter: 名称过滤条件 category_filter: 分类过滤条件 time_filter: 时间过滤条件 Returns: tuple: (数据列表, 总记录数) """ data = [] # 构建查询条件 where_clause = [] params = {} if name_filter: where_clause.append("n.name CONTAINS $name_filter") params['name_filter'] = name_filter if en_name_filter: where_clause.append("n.en_name CONTAINS $en_name_filter") params['en_name_filter'] = en_name_filter if category_filter: where_clause.append("n.category CONTAINS $category_filter") params['category_filter'] = category_filter if time_filter: where_clause.append("n.time CONTAINS $time_filter") params['time_filter'] = time_filter else: where_clause.append("TRUE") where_str = " AND ".join(where_clause) # 构建完整的查询语句 cql = f""" MATCH (n:data_standard) WHERE {where_str} RETURN properties(n) as properties,n.time as time,id(n) as nodeid, COUNT((()-[]->(n))) + COUNT(((n)-[]->())) as relationship_count ORDER BY time desc SKIP $skip_count LIMIT $page_size """ params['skip_count'] = skip_count params['page_size'] = page_size # 修复:使用正确的session方式执行查询 driver = connect_graph() if not driver: return [], 0 with driver.session() as session: result = session.run(cql, **params) for record in result: properties = { key: value for key, value in record['properties'].items() if key not in ['input', 'code', 'output'] } properties.setdefault("describe", None) new_attr = { 'id': record['nodeid'], 'number': record['relationship_count'] } properties.update(new_attr) data.append(properties) # 获取总量 total_query = f"MATCH (n:data_standard) WHERE {where_str} RETURN COUNT(n) AS total" total_result = session.run(total_query, **params).single()["total"] return data, total_result # 数据标准图谱展示(血缘关系)父节点 def standard_kinship_graph(nodeid): """ 生成数据标准的血缘关系图谱 Args: nodeid: 节点ID Returns: 图谱数据 """ # 查询语句 cql = """ MATCH(da:data_standard) WHERE id(da)=$nodeId OPTIONAL MATCH(a:data_resource)-[:clean_resource]-(da) OPTIONAL MATCH(b:data_model)-[:clean_model]-(da) WITH collect({id:toString(id(a)),text:a.name,type:split(labels(a)[0],'_')[1]})+ collect({id:toString(id(b)),text:b.name,type:split(labels(b)[0],'_')[1]})+ collect({id:toString(id(da)),text:da.name,type:split(labels(da)[0],'_')[1]}) as nodes,da, collect({from:toString(id(a)),to:toString(id(da)),text:'标准'})+ collect({from:toString(id(b)),to:toString(id(da)),text:'标准'})as lines WITH toString(id(da)) as rootId, apoc.coll.toSet(lines) as lines, apoc.coll.toSet(nodes) as nodes RETURN nodes,lines,rootId """ # 修复:使用正确的session方式执行查询 driver = connect_graph() if not driver: return {} with driver.session() as session: result = session.run(cql, nodeId=nodeid) res = {} for item in result: res = { "nodes": [record for record in item['nodes'] if record['id']], "lines": [record for record in item['lines'] if record['from'] and record['to']], "rootId": item['rootId'] } return res # 数据标准图谱展示(影响关系)下游 def standard_impact_graph(nodeid): """ 生成数据标准的影响关系图谱 Args: nodeid: 节点ID Returns: 图谱数据 """ # 查询语句 cql = """ MATCH(da:data_standard) WHERE id(da)=$nodeId OPTIONAL MATCH(da)-[:clean_model]-(m1:meta_node) OPTIONAL MATCH(da)-[:clean_model]-(m2:meta_node) WITH collect({id:toString(id(da)),text:da.name,type:split(labels(da)[0],'_')[1]})+ collect({id:toString(id(m1)),text:m1.name})+ collect({id:toString(id(m2)),text:m2.name})as nodes,da, collect({from:toString(id(da)),to:toString(id(m1)),text:'标准清洗'})+ collect({from:toString(id(da)),to:toString(id(m2)),text:'标准清洗'})as lines WITH toString(id(da)) as rootId, apoc.coll.toSet(lines) as lines, apoc.coll.toSet(nodes) as nodes RETURN nodes,lines,rootId """ # 修复:使用正确的session方式执行查询 driver = connect_graph() if not driver: return {} with driver.session() as session: result = session.run(cql, nodeId=nodeid) res = {} for item in result: res = { "nodes": [record for record in item['nodes'] if record['id']], "lines": [record for record in item['lines'] if record['from'] and record['to']], "rootId": item['rootId'] } return res # 数据标准图谱展示(所有关系) def standard_all_graph(nodeid): """ 生成数据标准的所有关系图谱 Args: nodeid: 节点ID Returns: 图谱数据 """ # 查询语句 cql = """ MATCH(da:data_standard) WHERE id(da)=$nodeId OPTIONAL MATCH(a:data_resource)-[:clean_resource]-(da) OPTIONAL MATCH(b:data_model)-[:clean_model]-(da) OPTIONAL MATCH(da)-[:clean_model]-(m1:meta_node) OPTIONAL MATCH(da)-[:clean_model]-(m2:meta_node) WITH collect({id:toString(id(a)),text:a.name,type:split(labels(a)[0],'_')[1]})+ collect({id:toString(id(b)),text:b.name,type:split(labels(b)[0],'_')[1]})+ collect({id:toString(id(da)),text:da.name,type:split(labels(da)[0],'_')[1]})+ collect({id:toString(id(m1)),text:m1.name})+ collect({id:toString(id(m2)),text:m2.name})as nodes,da, collect({from:toString(id(a)),to:toString(id(da)),text:'标准'})+ collect({from:toString(id(b)),to:toString(id(da)),text:'标准'})+ collect({from:toString(id(da)),to:toString(id(m1)),text:'标准清洗'})+ collect({from:toString(id(da)),to:toString(id(m2)),text:'标准清洗'})as lines WITH toString(id(da)) as rootId, apoc.coll.toSet(lines) as lines, apoc.coll.toSet(nodes) as nodes RETURN nodes,lines,rootId """ # 修复:使用正确的session方式执行查询 driver = connect_graph() if not driver: return {} with driver.session() as session: result = session.run(cql, nodeId=nodeid) res = {} for item in result: res = { "nodes": [record for record in item['nodes'] if record['id']], "lines": [record for record in item['lines'] if record['from'] and record['to']], "rootId": item['rootId'] } return res # 数据标签列表展示 def label_list(skip_count, page_size, en_name_filter=None, name_filter=None, category_filter=None, group_filter=None): """ 获取数据标签列表 Args: skip_count: 跳过的记录数量 page_size: 每页记录数量 en_name_filter: 英文名称过滤条件 name_filter: 名称过滤条件 category_filter: 分类过滤条件 group_filter: 分组过滤条件 Returns: tuple: (数据列表, 总记录数) """ data = [] # 构建查询条件 where_clause = [] params = {} if name_filter: where_clause.append("n.name CONTAINS $name_filter") params['name_filter'] = name_filter if en_name_filter: where_clause.append("n.en_name CONTAINS $en_name_filter") params['en_name_filter'] = en_name_filter if category_filter: where_clause.append("n.category CONTAINS $category_filter") params['category_filter'] = category_filter if group_filter: where_clause.append(f"n.group CONTAINS $group_filter") params['group_filter'] = group_filter else: where_clause.append("TRUE") where_str = " AND ".join(where_clause) # 构建完整的查询语句 cql = f""" MATCH (n:data_label) WHERE {where_str} WITH n, properties(n) as properties, n.time as time, id(n) as nodeid OPTIONAL MATCH (n)<-[r]-() WITH n, properties, time, nodeid, count(r) as incoming OPTIONAL MATCH (n)-[r]->() WITH n, properties, time, nodeid, incoming, count(r) as outgoing RETURN properties, time, nodeid, incoming + outgoing as relationship_count ORDER BY time desc SKIP $skip_count LIMIT $page_size """ params['skip_count'] = skip_count params['page_size'] = page_size # 修复:使用正确的session方式执行查询 driver = connect_graph() if not driver: logger.error("无法连接到数据库") return [], 0 with driver.session() as session: result = session.run(cql, **params) for record in result: properties = record['properties'] new_attr = { 'id': record['nodeid'], 'number': record['relationship_count'] } if "describe" not in properties: properties["describe"] = None if "scope" not in properties: properties["scope"] = None properties.update(new_attr) data.append(properties) # 获取总量 total_query = f"MATCH (n:data_label) WHERE {where_str} RETURN COUNT(n) AS total" total_result = session.run(total_query, **params).single()["total"] return data, total_result # 数据标签图谱展示 def id_label_graph(id): """ 根据ID生成数据标签图谱 Args: id: 节点ID Returns: 图谱数据 """ query = """ MATCH (n:data_label) WHERE id(n) = $nodeId OPTIONAL MATCH (a)-[:label]-(n) WITH collect({from: toString(id(a)), to: toString(id(n)), text: "标签"}) AS line1, collect({id: toString(id(n)), text: n.name, type:"label"}) AS node1, collect({id: toString(id(a)), text: a.name, type: split(labels(a)[0], '_')[1]}) AS node2, n WITH apoc.coll.toSet(line1) AS lines, apoc.coll.toSet(node1 + node2) AS nodes, toString(id(n)) AS res RETURN lines, nodes, res """ # 修复:使用正确的session方式执行查询 driver = connect_graph() if not driver: return {} with driver.session() as session: result = session.run(query, nodeId=id) res = {} for item in result: res = { "nodes": [record for record in item['nodes'] if record['id']], "lines": [record for record in item['lines'] if record['from'] and record['to']], "rootId": item['res'], } return res # 数据标签图谱展示(血缘关系)父节点/(所有关系) def label_kinship_graph(nodeid): """ 生成数据标签的血缘关系图谱 Args: nodeid: 节点ID Returns: 图谱数据 """ # 查询语句 cql = """ MATCH(la:data_label) WHERE id(la)=$nodeId OPTIONAL MATCH(a:data_resource)-[:label]-(la) OPTIONAL MATCH(b:data_model)-[:label]-(la) OPTIONAL MATCH(meta:meta_node)-[:label]-(la) OPTIONAL MATCH(d:data_standard)-[:label]-(la) OPTIONAL MATCH(e:data_metric)-[:label]-(la) WITH collect({id:toString(id(a)),text:a.name,type:split(labels(a)[0],'_')[1]})+ collect({id:toString(id(b)),text:b.name,type:split(labels(b)[0],'_')[1]})+ collect({id:toString(id(d)),text:d.name,type:split(labels(d)[0],'_')[1]})+ collect({id:toString(id(e)),text:e.name,type:split(labels(e)[0],'_')[1]})+ collect({id:toString(id(la)),text:la.name,type:split(labels(la)[0],'_')[1]})+ collect({id:toString(id(meta)),text:meta.name}) as nodes,la, collect({from:toString(id(a)),to:toString(id(la)),text:'标签'})+ collect({from:toString(id(b)),to:toString(id(la)),text:'标签'})+ collect({from:toString(id(meta)),to:toString(id(la)),text:'标签'})+ collect({from:toString(id(d)),to:toString(id(la)),text:'标签'})+ collect({from:toString(id(e)),to:toString(id(la)),text:'标签'})as lines WITH toString(id(la)) as rootId, apoc.coll.toSet(lines) as lines, apoc.coll.toSet(nodes) as nodes RETURN nodes,lines,rootId """ # 修复:使用正确的session方式执行查询 driver = connect_graph() if not driver: return {} with driver.session() as session: result = session.run(cql, nodeId=nodeid) res = {} for item in result: res = { "nodes": [record for record in item['nodes'] if record['id']], "lines": [record for record in item['lines'] if record['from'] and record['to']], "rootId": item['rootId'] } return res # 数据标签图谱展示(影响关系)下游 def label_impact_graph(nodeid): """ 生成数据标签的影响关系图谱 Args: nodeid: 节点ID Returns: 图谱数据 """ # 查询语句 cql = """ MATCH(n:data_label) WHERE id(n)=$nodeId RETURN {id:toString(id(n)),text:(n.name),type:"label"} AS nodes, toString(id(n)) as rootId """ # 修复:使用正确的session方式执行查询 driver = connect_graph() if not driver: return {} with driver.session() as session: result = session.run(cql, nodeId=nodeid) res = {} for item in result: res = { "nodes": item['nodes'], "rootId": item['rootId'], "lines": [] } return res # 数据标签按照提交内容查询相似分组,并且返回 def dynamic_label_list(name_filter=None): """ 根据内容查询相似的数据标签分组 Args: name_filter: 内容过滤条件 Returns: 标签分组列表 """ # 构建完整的查询语句 cql = f""" MATCH (n:data_label) WITH n, apoc.text.levenshteinSimilarity(n.group, "{name_filter}") AS similarity WHERE similarity > 0.1 // 设置相似度阈值 RETURN DISTINCT n.group as name, id(n) as nodeid """ # 修复:使用正确的session方式执行查询 driver = connect_graph() if not driver: return [] with driver.session() as session: result = session.run(cql) data = [] for record in result: data.append({ "name": record['name'], "id": record['nodeid'] }) return data def search_info(key, value): """ 搜索指定属性的节点信息 Args: key: 搜索属性键 value: 搜索属性值 Returns: 搜索结果列表 """ query = """ MATCH (n) WHERE n.{} =~ '(?i).*{}.*' WITH n, properties(n) as properties, n.time as time, id(n) as nodeid RETURN properties, nodeid, time, labels(n) as labels LIMIT 30 """.format(key, value) result = connect_graph.run(query) results = [] for record in result: results.append({ "properties": record["properties"], "id": record["nodeid"], "time": record["time"], "labels": record["labels"] }) return results def label_info(id): """ 获取标签节点的信息 Args: id: 节点ID Returns: 标签节点信息 """ query = """ MATCH (n) WHERE id(n) = $nodeId RETURN {id:toString(id(n)),text:(n.name),type:"label"} AS nodes, toString(id(n)) as rootId """ res = connect_graph.run(query, nodeId=id).data() return res[0] if res else {}