123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487 |
- """
- 数据接口核心业务逻辑模块
- 本模块包含了数据接口相关的所有核心业务逻辑函数,包括:
- - 数据标准(data_standard)相关功能
- - 数据标签(data_label)相关功能
- - 图谱生成
- - 动态标签识别等功能
- """
- import logging
- from app.core.graph.graph_operations import connect_graph
- # 配置logger
- logger = logging.getLogger(__name__)
- # 数据标准列表展示
- def standard_list(skip_count, page_size, en_name_filter=None,
- name_filter=None, category_filter=None, time_filter=None):
- """
- 获取数据标准列表
-
- Args:
- skip_count: 跳过的记录数量
- page_size: 每页记录数量
- en_name_filter: 英文名称过滤条件
- name_filter: 名称过滤条件
- category_filter: 分类过滤条件
- time_filter: 时间过滤条件
-
- Returns:
- tuple: (数据列表, 总记录数)
- """
- data = []
- # 构建查询条件
- where_clause = []
- params = {}
- if name_filter:
- where_clause.append("n.name CONTAINS $name_filter")
- params['name_filter'] = name_filter
- if en_name_filter:
- where_clause.append("n.en_name CONTAINS $en_name_filter")
- params['en_name_filter'] = en_name_filter
- if category_filter:
- where_clause.append("n.category CONTAINS $category_filter")
- params['category_filter'] = category_filter
- if time_filter:
- where_clause.append("n.time CONTAINS $time_filter")
- params['time_filter'] = time_filter
- else:
- where_clause.append("TRUE")
- where_str = " AND ".join(where_clause)
- # 构建完整的查询语句
- cql = f"""
- MATCH (n:data_standard)
- WHERE {where_str}
- RETURN properties(n) as properties,n.time as time,elementId(n) as nodeid,
- COUNT((()-[]->(n))) + COUNT(((n)-[]->())) as relationship_count
- ORDER BY time desc
- SKIP $skip_count
- LIMIT $page_size
- """
- params['skip_count'] = skip_count
- params['page_size'] = page_size
-
- # 修复:使用正确的session方式执行查询
- driver = connect_graph()
- if not driver:
- return [], 0
-
- with driver.session() as session:
- result = session.run(cql, **params)
- for record in result:
- properties = {
- key: value for key, value in record['properties'].items()
- if key not in ['input', 'code', 'output']
- }
- properties.setdefault("describe", None)
- new_attr = {
- 'id': record['nodeid'],
- 'number': record['relationship_count']
- }
- properties.update(new_attr)
- data.append(properties)
- # 获取总量
- total_query = f"MATCH (n:data_standard) WHERE {where_str} RETURN COUNT(n) AS total"
- total_result = session.run(total_query, **params).single()["total"]
-
- return data, total_result
- # 数据标准图谱展示(血缘关系)父节点
- def standard_kinship_graph(nodeid):
- """
- 生成数据标准的血缘关系图谱
-
- Args:
- nodeid: 节点ID
-
- Returns:
- 图谱数据
- """
- # 查询语句
- cql = """
- MATCH(da:data_standard)
- WHERE elementId(da)=$nodeId
- OPTIONAL MATCH(a:data_resource)-[:clean_resource]-(da)
- OPTIONAL MATCH(b:data_model)-[:clean_model]-(da)
- WITH
- collect({id:toString(elementId(a)),text:a.name,type:split(labels(a)[0],'_')[1]})+
- collect({id:toString(elementId(b)),text:b.name,type:split(labels(b)[0],'_')[1]})+
- collect({id:toString(elementId(da)),text:da.name,type:split(labels(da)[0],'_')[1]}) as nodes,da,
- collect({from:toString(elementId(a)),to:toString(elementId(da)),text:'标准'})+
- collect({from:toString(elementId(b)),to:toString(elementId(da)),text:'标准'})as lines
- WITH
- toString(elementId(da)) as rootId,
- apoc.coll.toSet(lines) as lines,
- apoc.coll.toSet(nodes) as nodes
- RETURN nodes,lines,rootId
- """
-
- # 修复:使用正确的session方式执行查询
- driver = connect_graph()
- if not driver:
- return {}
-
- with driver.session() as session:
- result = session.run(cql, nodeId=nodeid)
- res = {}
- for item in result:
- res = {
- "nodes": [record for record in item['nodes'] if record['id']],
- "lines": [record for record in item['lines'] if record['from'] and record['to']],
- "rootId": item['rootId']
- }
- return res
- # 数据标准图谱展示(影响关系)下游
- def standard_impact_graph(nodeid):
- """
- 生成数据标准的影响关系图谱
-
- Args:
- nodeid: 节点ID
-
- Returns:
- 图谱数据
- """
- # 查询语句
- cql = """
- MATCH(da:data_standard)
- WHERE elementId(da)=$nodeId
- OPTIONAL MATCH(da)-[:clean_model]-(m1:meta_node)
- OPTIONAL MATCH(da)-[:clean_model]-(m2:meta_node)
- WITH
- collect({id:toString(elementId(da)),text:da.name,type:split(labels(da)[0],'_')[1]})+
- collect({id:toString(elementId(m1)),text:m1.name})+
- collect({id:toString(elementId(m2)),text:m2.name})as nodes,da,
- collect({from:toString(elementId(da)),to:toString(elementId(m1)),text:'标准清洗'})+
- collect({from:toString(elementId(da)),to:toString(elementId(m2)),text:'标准清洗'})as lines
- WITH
- toString(elementId(da)) as rootId,
- apoc.coll.toSet(lines) as lines,
- apoc.coll.toSet(nodes) as nodes
- RETURN nodes,lines,rootId
- """
-
- # 修复:使用正确的session方式执行查询
- driver = connect_graph()
- if not driver:
- return {}
-
- with driver.session() as session:
- result = session.run(cql, nodeId=nodeid)
- res = {}
- for item in result:
- res = {
- "nodes": [record for record in item['nodes'] if record['id']],
- "lines": [record for record in item['lines'] if record['from'] and record['to']],
- "rootId": item['rootId']
- }
- return res
- # 数据标准图谱展示(所有关系)
- def standard_all_graph(nodeid):
- """
- 生成数据标准的所有关系图谱
-
- Args:
- nodeid: 节点ID
-
- Returns:
- 图谱数据
- """
- # 查询语句
- cql = """
- MATCH(da:data_standard)
- WHERE elementId(da)=$nodeId
- OPTIONAL MATCH(a:data_resource)-[:clean_resource]-(da)
- OPTIONAL MATCH(b:data_model)-[:clean_model]-(da)
- OPTIONAL MATCH(da)-[:clean_model]-(m1:meta_node)
- OPTIONAL MATCH(da)-[:clean_model]-(m2:meta_node)
- WITH
- collect({id:toString(elementId(a)),text:a.name,type:split(labels(a)[0],'_')[1]})+
- collect({id:toString(elementId(b)),text:b.name,type:split(labels(b)[0],'_')[1]})+
- collect({id:toString(elementId(da)),text:da.name,type:split(labels(da)[0],'_')[1]})+
- collect({id:toString(elementId(m1)),text:m1.name})+
- collect({id:toString(elementId(m2)),text:m2.name})as nodes,da,
- collect({from:toString(elementId(a)),to:toString(elementId(da)),text:'标准'})+
- collect({from:toString(elementId(b)),to:toString(elementId(da)),text:'标准'})+
- collect({from:toString(elementId(da)),to:toString(elementId(m1)),text:'标准清洗'})+
- collect({from:toString(elementId(da)),to:toString(elementId(m2)),text:'标准清洗'})as lines
- WITH
- toString(elementId(da)) as rootId,
- apoc.coll.toSet(lines) as lines,
- apoc.coll.toSet(nodes) as nodes
- RETURN nodes,lines,rootId
- """
- # 修复:使用正确的session方式执行查询
- driver = connect_graph()
- if not driver:
- return {}
-
- with driver.session() as session:
- result = session.run(cql, nodeId=nodeid)
- res = {}
- for item in result:
- res = {
- "nodes": [record for record in item['nodes'] if record['id']],
- "lines": [record for record in item['lines'] if record['from'] and record['to']],
- "rootId": item['rootId']
- }
- return res
- # 数据标签列表展示
- def label_list(skip_count, page_size, en_name_filter=None,
- name_filter=None, category_filter=None, group_filter=None):
- """
- 获取数据标签列表
-
- Args:
- skip_count: 跳过的记录数量
- page_size: 每页记录数量
- en_name_filter: 英文名称过滤条件
- name_filter: 名称过滤条件
- category_filter: 分类过滤条件
- group_filter: 分组过滤条件
-
- Returns:
- tuple: (数据列表, 总记录数)
- """
- data = []
- # 构建查询条件
- where_clause = []
- params = {}
- if name_filter:
- where_clause.append("n.name CONTAINS $name_filter")
- params['name_filter'] = name_filter
- if en_name_filter:
- where_clause.append("n.en_name CONTAINS $en_name_filter")
- params['en_name_filter'] = en_name_filter
- if category_filter:
- where_clause.append("n.category CONTAINS $category_filter")
- params['category_filter'] = category_filter
- if group_filter:
- where_clause.append(f"n.group CONTAINS $group_filter")
- params['group_filter'] = group_filter
- else:
- where_clause.append("TRUE")
- where_str = " AND ".join(where_clause)
- # 构建完整的查询语句
- cql = f"""
- MATCH (n:data_label)
- WHERE {where_str}
- RETURN properties(n) as properties,n.time as time,elementId(n) as nodeid,
- COUNT((()-[]->(n))) + COUNT(((n)-[]->())) as relationship_count
- ORDER BY time desc
- SKIP $skip_count
- LIMIT $page_size
- """
- params['skip_count'] = skip_count
- params['page_size'] = page_size
-
- # 修复:使用正确的session方式执行查询
- driver = connect_graph()
- if not driver:
- logger.error("无法连接到数据库")
- return [], 0
-
- with driver.session() as session:
- result = session.run(cql, **params)
- for record in result:
- properties = record['properties']
- new_attr = {
- 'id': record['nodeid'],
- 'number': record['relationship_count']
- }
- if "describe" not in properties:
- properties["describe"] = None
- if "scope" not in properties:
- properties["scope"] = None
- properties.update(new_attr)
- data.append(properties)
- # 获取总量
- total_query = f"MATCH (n:data_label) WHERE {where_str} RETURN COUNT(n) AS total"
- total_result = session.run(total_query, **params).single()["total"]
-
- return data, total_result
- # 数据标签图谱展示
- def id_label_graph(id):
- """
- 根据ID生成数据标签图谱
-
- Args:
- id: 节点ID
-
- Returns:
- 图谱数据
- """
- query = """
- MATCH (n:data_label)
- WHERE elementId(n) = $nodeId
- OPTIONAL MATCH (a)-[:label]-(n)
- WITH
- collect({from: toString(elementId(a)), to: toString(elementId(n)), text: "标签"}) AS line1,
- collect({id: toString(elementId(n)), text: n.name, type:"label"}) AS node1,
- collect({id: toString(elementId(a)), text: a.name, type: split(labels(a)[0], '_')[1]}) AS node2, n
- WITH apoc.coll.toSet(line1) AS lines,
- apoc.coll.toSet(node1 + node2) AS nodes,
- toString(elementId(n)) AS res
- RETURN lines, nodes, res
- """
- # 修复:使用正确的session方式执行查询
- driver = connect_graph()
- if not driver:
- return {}
-
- with driver.session() as session:
- result = session.run(query, nodeId=id)
- res = {}
- for item in result:
- res = {
- "nodes": [record for record in item['nodes'] if record['id']],
- "lines": [record for record in item['lines'] if record['from'] and record['to']],
- "rootId": item['res'],
- }
- return res
- # 数据标签图谱展示(血缘关系)父节点/(所有关系)
- def label_kinship_graph(nodeid):
- """
- 生成数据标签的血缘关系图谱
-
- Args:
- nodeid: 节点ID
-
- Returns:
- 图谱数据
- """
- # 查询语句
- cql = """
- MATCH(la:data_label)
- WHERE elementId(la)=$nodeId
- OPTIONAL MATCH(a:data_resource)-[:label]-(la)
- OPTIONAL MATCH(b:data_model)-[:label]-(la)
- OPTIONAL MATCH(meta:meta_node)-[:label]-(la)
- OPTIONAL MATCH(d:data_standard)-[:label]-(la)
- OPTIONAL MATCH(e:data_metric)-[:label]-(la)
- WITH
- collect({id:toString(elementId(a)),text:a.name,type:split(labels(a)[0],'_')[1]})+
- collect({id:toString(elementId(b)),text:b.name,type:split(labels(b)[0],'_')[1]})+
- collect({id:toString(elementId(d)),text:d.name,type:split(labels(d)[0],'_')[1]})+
- collect({id:toString(elementId(e)),text:e.name,type:split(labels(e)[0],'_')[1]})+
- collect({id:toString(elementId(la)),text:la.name,type:split(labels(la)[0],'_')[1]})+
- collect({id:toString(elementId(meta)),text:meta.name}) as nodes,la,
- collect({from:toString(elementId(a)),to:toString(elementId(la)),text:'标签'})+
- collect({from:toString(elementId(b)),to:toString(elementId(la)),text:'标签'})+
- collect({from:toString(elementId(meta)),to:toString(elementId(la)),text:'标签'})+
- collect({from:toString(elementId(d)),to:toString(elementId(la)),text:'标签'})+
- collect({from:toString(elementId(e)),to:toString(elementId(la)),text:'标签'})as lines
- WITH
- toString(elementId(la)) as rootId,
- apoc.coll.toSet(lines) as lines,
- apoc.coll.toSet(nodes) as nodes
- RETURN nodes,lines,rootId
- """
- # 修复:使用正确的session方式执行查询
- driver = connect_graph()
- if not driver:
- return {}
-
- with driver.session() as session:
- result = session.run(cql, nodeId=nodeid)
- res = {}
- for item in result:
- res = {
- "nodes": [record for record in item['nodes'] if record['id']],
- "lines": [record for record in item['lines'] if record['from'] and record['to']],
- "rootId": item['rootId']
- }
- return res
- # 数据标签图谱展示(影响关系)下游
- def label_impact_graph(nodeid):
- """
- 生成数据标签的影响关系图谱
-
- Args:
- nodeid: 节点ID
-
- Returns:
- 图谱数据
- """
- # 查询语句
- cql = """
- MATCH(n:data_label)
- WHERE elementId(n)=$nodeId
- RETURN {id:toString(elementId(n)),text:(n.name),type:"label"} AS nodes,
- toString(elementId(n)) as rootId
- """
- # 修复:使用正确的session方式执行查询
- driver = connect_graph()
- if not driver:
- return {}
-
- with driver.session() as session:
- result = session.run(cql, nodeId=nodeid)
- res = {}
- for item in result:
- res = {
- "nodes": item['nodes'],
- "rootId": item['rootId'],
- "lines": []
- }
- return res
- # 数据标签按照提交内容查询相似分组,并且返回
- def dynamic_label_list(name_filter=None):
- """
- 根据内容查询相似的数据标签分组
-
- Args:
- name_filter: 内容过滤条件
-
- Returns:
- 标签分组列表
- """
- # 构建完整的查询语句
- cql = f"""
- MATCH (n:data_label)
- WITH n, apoc.text.levenshteinSimilarity(n.group, "{name_filter}") AS similarity
- WHERE similarity > 0.1 // 设置相似度阈值
- RETURN DISTINCT n.group as name, elementId(n) as nodeid
- """
- # 修复:使用正确的session方式执行查询
- driver = connect_graph()
- if not driver:
- return []
-
- with driver.session() as session:
- result = session.run(cql)
- data = []
- for record in result:
- data.append({
- "name": record['name'],
- "id": record['nodeid']
- })
-
- return data
|