| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020 |
- """
- 数据接口核心业务逻辑模块
- 本模块包含了数据接口相关的所有核心业务逻辑函数,包括:
- - 数据标准(data_standard)相关功能
- - 数据标签(DataLabel)相关功能
- - 图谱生成
- - 动态标签识别等功能
- """
- import logging
- import re
- from app.core.graph.graph_operations import connect_graph
- from app.services.neo4j_driver import neo4j_driver
- # 配置logger
- logger = logging.getLogger(__name__)
- def _build_category_filter_conditions(category_filter, params):
- """
- 将 category_filter 转换为 Cypher 查询条件列表。
- 支持:
- - 字典: {field: value, ...}
- - 列表: [{"field": "...", "value": "..."}, {"category": "xxx"}]
- - 字符串: 兼容旧用法,等同于按 category 字段过滤
- """
- conditions = []
- param_index = 0
- def add_condition(field, value):
- nonlocal param_index
- if value is None:
- return
- if not isinstance(field, str):
- return
- if not re.match(r"^[A-Za-z_][A-Za-z0-9_]*$", field):
- logger.warning(f"忽略非法属性字段: {field}")
- return
- param_key = f"category_filter_{param_index}"
- param_index += 1
- conditions.append(f"n.{field} CONTAINS ${param_key}")
- params[param_key] = value
- if isinstance(category_filter, dict):
- for field, value in category_filter.items():
- add_condition(field, value)
- elif isinstance(category_filter, list):
- for item in category_filter:
- if not isinstance(item, dict):
- continue
- if "field" in item and "value" in item:
- add_condition(item.get("field"), item.get("value"))
- elif len(item) == 1:
- field, value = next(iter(item.items()))
- add_condition(field, value)
- elif category_filter:
- add_condition("category", category_filter)
- return conditions
- # 数据标准列表展示
- def standard_list(
- skip_count,
- page_size,
- name_en_filter=None,
- name_zh_filter=None,
- category_filter=None,
- create_time_filter=None,
- ):
- """
- 获取数据标准列表
- Args:
- skip_count: 跳过的记录数量
- page_size: 每页记录数量
- name_en_filter: 英文名称过滤条件
- name_zh_filter: 名称过滤条件
- category_filter: 分类过滤条件
- create_time_filter: 时间过滤条件
- Returns:
- tuple: (数据列表, 总记录数)
- """
- data = []
- # 构建查询条件
- where_clause = []
- params = {}
- if name_zh_filter:
- where_clause.append("n.name_zh CONTAINS $name_zh_filter")
- params['name_zh_filter'] = name_zh_filter
- if name_en_filter:
- where_clause.append("n.name_en CONTAINS $name_en_filter")
- params['name_en_filter'] = name_en_filter
- if category_filter:
- where_clause.append("n.category CONTAINS $category_filter")
- params['category_filter'] = category_filter
- if create_time_filter:
- where_clause.append("n.create_time CONTAINS $create_time_filter")
- params['create_time_filter'] = create_time_filter
- else:
- where_clause.append("TRUE")
- where_str = " AND ".join(where_clause)
- # 构建完整的查询语句
- cql = f"""
- MATCH (n:data_standard)
- WHERE {where_str}
- RETURN
- properties(n) as properties,
- n.create_time as create_time,
- id(n) as nodeid,
- size([(n)<-[]-() | 1]) + size([(n)-[]->() | 1]) as relationship_count
- ORDER BY create_time desc
- SKIP $skip_count
- LIMIT $page_size
- """
- params['skip_count'] = skip_count
- params['page_size'] = page_size
- # 修复:使用正确的session方式执行查询
- driver = None
- try:
- driver = connect_graph()
- with driver.session() as session:
- result = session.run(cql, **params)
- for record in result:
- properties = {
- key: value for key, value in record['properties'].items()
- if key not in ['input', 'code', 'output']
- }
- properties.setdefault("describe", None)
- new_attr = {
- 'id': record['nodeid'],
- 'number': record['relationship_count']
- }
- properties.update(new_attr)
- data.append(properties)
- # 获取总量
- total_query = (
- f"MATCH (n:data_standard) WHERE {where_str} "
- "RETURN COUNT(n) AS total"
- )
- total_record = session.run(total_query, **params).single()
- total = total_record["total"] if total_record else 0
- return data, total
- except (ConnectionError, ValueError) as e:
- logger.error(f"Neo4j数据库连接失败: {str(e)}")
- return [], 0
- finally:
- if driver:
- driver.close()
- # 数据标准图谱展示(血缘关系)父节点
- def standard_kinship_graph(nodeid):
- """
- 生成数据标准的血缘关系图谱
- Args:
- nodeid: 节点ID
- Returns:
- 图谱数据
- """
- # 查询语句
- cql = """
- MATCH(da:data_standard)
- WHERE id(da)=$nodeId
- OPTIONAL MATCH(a:DataResource)-[:clean_resource]-(da)
- OPTIONAL MATCH(b:DataModel)-[:clean_model]-(da)
- WITH
- collect({
- id:toString(id(a)),
- text:a.name,
- type:split(labels(a)[0],'_')[1]
- })+
- collect({
- id:toString(id(b)),
- text:b.name,
- type:split(labels(b)[0],'_')[1]
- })+
- collect({
- id:toString(id(da)),
- text:da.name,
- type:split(labels(da)[0],'_')[1]
- }) as nodes,
- da,
- collect({from:toString(id(a)),to:toString(id(da)),text:'标准'})+
- collect({from:toString(id(b)),to:toString(id(da)),text:'标准'}) as lines
- WITH
- toString(id(da)) as rootId,
- apoc.coll.toSet(lines) as lines,
- apoc.coll.toSet(nodes) as nodes
- RETURN nodes,lines,rootId
- """
- driver = None
- try:
- driver = connect_graph()
- with driver.session() as session:
- result = session.run(cql, nodeId=nodeid)
- res = {}
- for item in result:
- res = {
- "nodes": [
- record for record in item['nodes'] if record['id']
- ],
- "lines": [
- record
- for record in item['lines']
- if record['from'] and record['to']
- ],
- "rootId": item['rootId']
- }
- return res
- except (ConnectionError, ValueError) as e:
- logger.error(f"Neo4j数据库连接失败: {str(e)}")
- return {}
- finally:
- if driver:
- driver.close()
- # 数据标准图谱展示(影响关系)下游
- def standard_impact_graph(nodeid):
- """
- 生成数据标准的影响关系图谱
- Args:
- nodeid: 节点ID
- Returns:
- 图谱数据
- """
- # 查询语句
- cql = """
- MATCH(da:data_standard)
- WHERE id(da)=$nodeId
- OPTIONAL MATCH(da)-[:clean_model]-(m1:DataMeta)-[:clean_model]-(da)
- OPTIONAL MATCH(da)-[:clean_model]-(m2:DataMeta)-[:clean_model]-(da)
- WITH
- collect({
- id:toString(id(da)),
- text:da.name,
- type:split(labels(da)[0],'_')[1]
- })+
- collect({id:toString(id(m1)),text:m1.name})+
- collect({id:toString(id(m2)),text:m2.name}) as nodes,
- da,
- collect({
- from:toString(id(da)),
- to:toString(id(m1)),
- text:'标准清洗'
- })+
- collect({
- from:toString(id(da)),
- to:toString(id(m2)),
- text:'标准清洗'
- }) as lines
- WITH
- toString(id(da)) as rootId,
- apoc.coll.toSet(lines) as lines,
- apoc.coll.toSet(nodes) as nodes
- RETURN nodes,lines,rootId
- """
- driver = None
- try:
- driver = connect_graph()
- with driver.session() as session:
- result = session.run(cql, nodeId=nodeid)
- res = {}
- for item in result:
- res = {
- "nodes": [
- record for record in item['nodes'] if record['id']
- ],
- "lines": [
- record
- for record in item['lines']
- if record['from'] and record['to']
- ],
- "rootId": item['rootId']
- }
- return res
- except (ConnectionError, ValueError) as e:
- logger.error(f"Neo4j数据库连接失败: {str(e)}")
- return {}
- finally:
- if driver:
- driver.close()
- # 数据标准图谱展示(所有关系)
- def standard_all_graph(nodeid):
- """
- 生成数据标准的所有关系图谱
- Args:
- nodeid: 节点ID
- Returns:
- 图谱数据
- """
- # 查询语句
- cql = """
- MATCH(da:data_standard)
- WHERE id(da)=$nodeId
- OPTIONAL MATCH(a:DataResource)-[:clean_resource]-(da)
- OPTIONAL MATCH(b:DataModel)-[:clean_model]-(da)
- OPTIONAL MATCH(da)-[:clean_model]-(m1:DataMeta)-[:clean_model]-(da)
- OPTIONAL MATCH(da)-[:clean_model]-(m2:DataMeta)-[:clean_model]-(da)
- WITH
- collect({
- id:toString(id(a)),
- text:a.name,
- type:split(labels(a)[0],'_')[1]
- })+
- collect({
- id:toString(id(b)),
- text:b.name,
- type:split(labels(b)[0],'_')[1]
- })+
- collect({
- id:toString(id(da)),
- text:da.name,
- type:split(labels(da)[0],'_')[1]
- })+
- collect({id:toString(id(m1)),text:m1.name})+
- collect({id:toString(id(m2)),text:m2.name}) as nodes,
- da,
- collect({from:toString(id(a)),to:toString(id(da)),text:'标准'})+
- collect({from:toString(id(b)),to:toString(id(da)),text:'标准'})+
- collect({
- from:toString(id(da)),
- to:toString(id(m1)),
- text:'标准清洗'
- })+
- collect({
- from:toString(id(da)),
- to:toString(id(m2)),
- text:'标准清洗'
- }) as lines
- WITH
- toString(id(da)) as rootId,
- apoc.coll.toSet(lines) as lines,
- apoc.coll.toSet(nodes) as nodes
- RETURN nodes,lines,rootId
- """
- driver = None
- try:
- driver = connect_graph()
- with driver.session() as session:
- result = session.run(cql, nodeId=nodeid)
- res = {}
- for item in result:
- res = {
- "nodes": [
- record for record in item['nodes'] if record['id']
- ],
- "lines": [
- record
- for record in item['lines']
- if record['from'] and record['to']
- ],
- "rootId": item['rootId']
- }
- return res
- except (ConnectionError, ValueError) as e:
- logger.error(f"Neo4j数据库连接失败: {str(e)}")
- return {}
- finally:
- if driver:
- driver.close()
- # 数据标签列表展示
- def label_list(
- skip_count,
- page_size,
- name_en_filter=None,
- name_zh_filter=None,
- category_filter=None,
- group_filter=None,
- ):
- """
- 获取数据标签列表
- Args:
- skip_count: 跳过的记录数量
- page_size: 每页记录数量
- name_en_filter: 英文名称过滤条件
- name_zh_filter: 名称过滤条件
- category_filter: 分类过滤条件
- group_filter: 分组过滤条件
- Returns:
- tuple: (数据列表, 总记录数)
- """
- data = []
- # 构建查询条件
- where_clause = []
- params = {}
- if name_zh_filter:
- where_clause.append("n.name_zh CONTAINS $name_zh_filter")
- params['name_zh_filter'] = name_zh_filter
- if name_en_filter:
- where_clause.append("n.name_en CONTAINS $name_en_filter")
- params['name_en_filter'] = name_en_filter
- where_clause.extend(
- _build_category_filter_conditions(category_filter, params)
- )
- if group_filter:
- where_clause.append("n.group CONTAINS $group_filter")
- params['group_filter'] = group_filter
- if not where_clause:
- where_clause.append("TRUE")
- where_str = " AND ".join(where_clause)
- # 构建完整的查询语句
- cql = f"""
- MATCH (n:DataLabel)
- WHERE {where_str}
- WITH
- n,
- properties(n) as properties,
- n.create_time as create_time,
- id(n) as nodeid
- OPTIONAL MATCH (n)<-[r]-()
- WITH
- n,
- properties,
- create_time,
- nodeid,
- count(r) as incoming
- OPTIONAL MATCH (n)-[r]->()
- WITH
- n,
- properties,
- create_time,
- nodeid,
- incoming,
- count(r) as outgoing
- RETURN
- properties,
- create_time,
- nodeid,
- incoming + outgoing as relationship_count
- ORDER BY create_time desc
- SKIP $skip_count
- LIMIT $page_size
- """
- params['skip_count'] = skip_count
- params['page_size'] = page_size
- driver = None
- try:
- driver = connect_graph()
- with driver.session() as session:
- result = session.run(cql, **params)
- for record in result:
- properties = record['properties']
- new_attr = {
- 'id': record['nodeid'],
- 'number': record['relationship_count']
- }
- if "describe" not in properties:
- properties["describe"] = None
- if "scope" not in properties:
- properties["scope"] = None
- properties.update(new_attr)
- data.append(properties)
- # 获取总量
- total_query = (
- f"MATCH (n:DataLabel) WHERE {where_str} "
- "RETURN COUNT(n) AS total"
- )
- total_record = session.run(total_query, **params).single()
- total = total_record["total"] if total_record else 0
- return data, total
- except (ConnectionError, ValueError) as e:
- logger.error(f"Neo4j数据库连接失败: {str(e)}")
- return [], 0
- finally:
- if driver:
- driver.close()
- # 数据标签图谱展示
- def id_label_graph(id):
- """
- 根据ID生成数据标签图谱
- Args:
- id: 节点ID
- Returns:
- 图谱数据
- """
- query = """
- MATCH (n:DataLabel)
- WHERE id(n) = $nodeId
- OPTIONAL MATCH (a)-[:LABEL]-(n)
- WITH
- collect({
- from: toString(id(a)),
- to: toString(id(n)),
- text: "标签"
- }) AS line1,
- collect({
- id: toString(id(n)),
- text: n.name_zh,
- type:"label"
- }) AS node1,
- collect({
- id: toString(id(a)),
- text: a.name_zh,
- type: split(labels(a)[0], '_')[1]
- }) AS node2,
- n
- WITH
- apoc.coll.toSet(line1) AS lines,
- apoc.coll.toSet(node1 + node2) AS nodes,
- toString(id(n)) AS res
- RETURN lines, nodes, res
- """
- driver = None
- try:
- driver = connect_graph()
- with driver.session() as session:
- result = session.run(query, nodeId=id)
- res = {}
- for item in result:
- res = {
- "nodes": [
- record for record in item['nodes'] if record['id']
- ],
- "lines": [
- record
- for record in item['lines']
- if record['from'] and record['to']
- ],
- "rootId": item['res'],
- }
- return res
- except (ConnectionError, ValueError) as e:
- logger.error(f"Neo4j数据库连接失败: {str(e)}")
- return {}
- finally:
- if driver:
- driver.close()
- # 数据标签图谱展示(血缘关系)父节点/(所有关系)
- def label_kinship_graph(nodeid):
- """
- 生成数据标签的血缘关系图谱
- Args:
- nodeid: 节点ID
- Returns:
- 图谱数据
- """
- # 查询语句
- cql = """
- MATCH(la:DataLabel)
- WHERE id(la)=$nodeId
- OPTIONAL MATCH(a:DataResource)-[:LABEL]-(la)
- OPTIONAL MATCH(b:DataModel)-[:LABEL]-(la)
- OPTIONAL MATCH(meta:DataMeta)-[:LABEL]-(la)
- OPTIONAL MATCH(d:data_standard)-[:LABEL]-(la)
- OPTIONAL MATCH(e:DataMetric)-[:LABEL]-(la)
- WITH
- collect({
- id:toString(id(a)),
- text:a.name_zh,
- type:split(labels(a)[0],'_')[1]
- })+
- collect({
- id:toString(id(b)),
- text:b.name_zh,
- type:split(labels(b)[0],'_')[1]
- })+
- collect({
- id:toString(id(d)),
- text:d.name_zh,
- type:split(labels(d)[0],'_')[1]
- })+
- collect({
- id:toString(id(e)),
- text:e.name_zh,
- type:split(labels(e)[0],'_')[1]
- })+
- collect({
- id:toString(id(la)),
- text:la.name_zh,
- type:split(labels(la)[0],'_')[1]
- })+
- collect({id:toString(id(meta)),text:meta.name_zh}) as nodes,
- la,
- collect({from:toString(id(a)),to:toString(id(la)),text:'标签'})+
- collect({from:toString(id(b)),to:toString(id(la)),text:'标签'})+
- collect({from:toString(id(meta)),to:toString(id(la)),text:'标签'})+
- collect({from:toString(id(d)),to:toString(id(la)),text:'标签'})+
- collect({from:toString(id(e)),to:toString(id(la)),text:'标签'}) as lines
- WITH
- toString(id(la)) as rootId,
- apoc.coll.toSet(lines) as lines,
- apoc.coll.toSet(nodes) as nodes
- RETURN nodes,lines,rootId
- """
- driver = None
- try:
- driver = connect_graph()
- with driver.session() as session:
- result = session.run(cql, nodeId=nodeid)
- res = {}
- for item in result:
- res = {
- "nodes": [
- record for record in item['nodes'] if record['id']
- ],
- "lines": [
- record
- for record in item['lines']
- if record['from'] and record['to']
- ],
- "rootId": item['rootId']
- }
- return res
- except (ConnectionError, ValueError) as e:
- logger.error(f"Neo4j数据库连接失败: {str(e)}")
- return {}
- finally:
- if driver:
- driver.close()
- # 数据标签图谱展示(影响关系)下游
- def label_impact_graph(nodeid):
- """
- 生成数据标签的影响关系图谱
- Args:
- nodeid: 节点ID
- Returns:
- 图谱数据
- """
- # 查询语句
- cql = """
- MATCH(n:DataLabel)
- WHERE id(n)=$nodeId
- RETURN {
- id:toString(id(n)),
- text:(n.name_zh),
- type:"label"
- } AS nodes,
- toString(id(n)) as rootId
- """
- driver = None
- try:
- driver = connect_graph()
- with driver.session() as session:
- result = session.run(cql, nodeId=nodeid)
- res = {}
- for item in result:
- res = {
- "nodes": item['nodes'],
- "rootId": item['rootId'],
- "lines": []
- }
- return res
- except (ConnectionError, ValueError) as e:
- logger.error(f"Neo4j数据库连接失败: {str(e)}")
- return {}
- finally:
- if driver:
- driver.close()
- # 数据标签按照提交内容查询相似分组,并且返回
- def dynamic_label_list(name_filter=None):
- """
- 根据内容查询相似的数据标签分组
- Args:
- name_filter: 内容过滤条件
- Returns:
- 标签分组列表
- """
- # 构建完整的查询语句
- cql = """
- MATCH (n:DataLabel)
- WITH
- n,
- apoc.text.levenshteinSimilarity(n.group, $name_filter) AS similarity
- WHERE similarity > 0.1 // 设置相似度阈值
- RETURN DISTINCT n.group as name_zh, id(n) as nodeid
- """
- driver = None
- try:
- driver = connect_graph()
- with driver.session() as session:
- result = session.run(cql, name_filter=name_filter or "")
- data = []
- for record in result:
- data.append(
- {
- "name_zh": record['name_zh'],
- "id": record['nodeid'],
- }
- )
- return data
- except (ConnectionError, ValueError) as e:
- logger.error(f"Neo4j数据库连接失败: {str(e)}")
- return []
- finally:
- if driver:
- driver.close()
- def search_info(key, value):
- """
- 搜索指定属性的节点信息
- Args:
- key: 搜索属性键
- value: 搜索属性值
- Returns:
- 搜索结果列表
- """
- field_pattern = r"^[A-Za-z_][A-Za-z0-9_]*$"
- if not re.match(field_pattern, str(key)):
- logger.warning("非法属性键: %s", key)
- return []
- query = """
- MATCH (n)
- WHERE n[$field] =~ $pattern
- WITH
- n,
- properties(n) as properties,
- n.create_time as create_time,
- id(n) as nodeid
- RETURN properties, nodeid, create_time, labels(n) as labels
- LIMIT 30
- """
- driver = None
- try:
- driver = connect_graph()
- except (ConnectionError, ValueError) as e:
- logger.error(f"无法连接到Neo4j数据库: {str(e)}")
- return []
- try:
- with driver.session() as session:
- result = session.run(
- query,
- field=key,
- pattern=f"(?i).*{value}.*",
- )
- results = []
- for record in result:
- results.append(
- {
- "properties": record["properties"],
- "id": record["nodeid"],
- "create_time": record["create_time"],
- "labels": record["labels"],
- }
- )
- return results
- except Exception as e:
- logger.error(f"搜索节点信息失败: {str(e)}")
- return []
- finally:
- if driver:
- driver.close()
- def label_info(id):
- """
- 获取标签节点的信息
- Args:
- id: 节点ID
- Returns:
- 标签节点信息
- """
- query = """
- MATCH (n)
- WHERE id(n) = $nodeId
- RETURN {
- id:toString(id(n)),
- text:(n.name_zh),
- type:"label"
- } AS nodes,
- toString(id(n)) as rootId
- """
- driver = None
- try:
- driver = connect_graph()
- with driver.session() as session:
- result = session.run(query, nodeId=id).data()
- return result[0] if result else {}
- except (ConnectionError, ValueError) as e:
- logger.error(f"无法连接到Neo4j数据库: {str(e)}")
- return {}
- finally:
- if driver:
- driver.close()
- def graph_all(domain_id, include_meta=True):
- """
- 获取完整关系图谱
- Args:
- domain_id: 节点ID
- include_meta: 是否包含元数据节点
- Returns:
- dict: 包含 nodes 与 lines 的图谱数据
- """
- try:
- domain_id_int = int(domain_id)
- except (ValueError, TypeError):
- logger.error(f"节点ID不是有效的整数: {domain_id}")
- return {"nodes": [], "lines": []}
- try:
- with neo4j_driver.get_session() as session:
- nodes = {}
- lines = {}
- # 使用路径查询同时获取节点和关系
- if include_meta:
- cypher = """
- MATCH (n)
- WHERE id(n) = $domain_id
- OPTIONAL MATCH path = (n)-[r]-(m)
- RETURN n,
- collect(DISTINCT m) as related_nodes,
- collect(DISTINCT r) as relationships
- """
- else:
- cypher = """
- MATCH (n)
- WHERE id(n) = $domain_id
- OPTIONAL MATCH path = (n)-[r]-(m)
- WHERE NOT (m:DataMeta)
- RETURN n,
- collect(DISTINCT m) as related_nodes,
- collect(DISTINCT r) as relationships
- """
- result = session.run(cypher, domain_id=domain_id_int)
- record = result.single()
- if not record:
- logger.warning(f"未找到节点: {domain_id_int}")
- return {"nodes": [], "lines": []}
- # 处理起始节点
- n_node = record["n"]
- if n_node:
- n_props = dict(n_node)
- n_labels = list(n_node.labels)
- n_props["id"] = domain_id_int
- n_props["node_type"] = n_labels[0] if n_labels else ""
- nodes[domain_id_int] = n_props
- # 处理关联节点
- related_nodes = record["related_nodes"] or []
- for m_node in related_nodes:
- if m_node is None:
- continue
- m_elem_id = m_node.element_id
- m_id = int(m_elem_id.split(":")[-1])
- if m_id not in nodes:
- m_props = dict(m_node)
- m_labels = list(m_node.labels)
- m_props["id"] = m_id
- m_props["node_type"] = m_labels[0] if m_labels else ""
- nodes[m_id] = m_props
- # 处理关系
- relationships = record["relationships"] or []
- for rel in relationships:
- if rel is None:
- continue
- rel_elem_id = rel.element_id
- rel_id = rel_elem_id.split(":")[-1]
- if rel_id not in lines:
- # 获取关系的起始和结束节点 ID
- start_elem_id = rel.start_node.element_id
- end_elem_id = rel.end_node.element_id
- start_id = start_elem_id.split(":")[-1]
- end_id = end_elem_id.split(":")[-1]
- # 获取关系类型
- rel_type = type(rel).__name__
- lines[rel_id] = {
- "id": rel_id,
- "from": start_id,
- "to": end_id,
- "text": rel_type,
- }
- logger.info(
- f"graph_all 结果: node_id={domain_id_int}, "
- f"nodes={len(nodes)}, lines={len(lines)}"
- )
- return {
- "nodes": list(nodes.values()),
- "lines": list(lines.values()),
- }
- except Exception as e:
- logger.error(f"获取图谱失败: {str(e)}")
- import traceback
- logger.error(traceback.format_exc())
- return {"nodes": [], "lines": []}
- def node_delete(node_id):
- """
- 删除 DataLabel 节点及其所有关联关系
- Args:
- node_id: 节点ID(整数)
- Returns:
- dict: 删除结果,包含 success 状态和 message 信息
- """
- driver = None
- try:
- driver = connect_graph()
- except (ConnectionError, ValueError) as e:
- logger.error(f"无法连接到Neo4j数据库: {str(e)}")
- return {"success": False, "message": "无法连接到数据库"}
- try:
- with driver.session() as session:
- # 首先检查节点是否存在且为 DataLabel 类型
- check_query = """
- MATCH (n:DataLabel)
- WHERE id(n) = $nodeId
- RETURN n
- """
- check_result = session.run(
- check_query,
- nodeId=node_id,
- ).single()
- if not check_result:
- logger.warning(f"DataLabel 节点不存在: ID={node_id}")
- return {
- "success": False,
- "message": f"DataLabel 节点不存在 (ID: {node_id})",
- }
- # 删除节点及其所有关系
- delete_query = """
- MATCH (n:DataLabel)
- WHERE id(n) = $nodeId
- DETACH DELETE n
- RETURN count(n) as deleted_count
- """
- delete_result = session.run(
- delete_query,
- nodeId=node_id,
- ).single()
- if not delete_result:
- logger.warning(f"删除结果为空: ID={node_id}")
- return {
- "success": False,
- "message": "删除失败,未获取到删除结果",
- }
- deleted_count = delete_result["deleted_count"]
- if deleted_count > 0:
- logger.info(f"成功删除 DataLabel 节点: ID={node_id}")
- return {
- "success": True,
- "message": (
- f"成功删除 DataLabel 节点 (ID: {node_id})"
- ),
- }
- else:
- logger.warning(f"删除失败,节点可能已被删除: ID={node_id}")
- return {"success": False, "message": "删除失败,节点可能已被删除"}
- except Exception as e:
- logger.error(f"删除 DataLabel 节点失败: {str(e)}")
- return {"success": False, "message": f"删除失败: {str(e)}"}
- finally:
- if driver:
- driver.close()
|