|
@@ -4,7 +4,7 @@ import logging
|
|
from py2neo import Relationship
|
|
from py2neo import Relationship
|
|
import pandas as pd
|
|
import pandas as pd
|
|
from app.services.neo4j_driver import neo4j_driver
|
|
from app.services.neo4j_driver import neo4j_driver
|
|
-from app.core.graph.graph_operations import create_or_get_node, relationship_exists, get_node
|
|
|
|
|
|
+from app.core.graph.graph_operations import create_or_get_node, relationship_exists, get_node, connect_graph
|
|
import time
|
|
import time
|
|
|
|
|
|
logger = logging.getLogger("app")
|
|
logger = logging.getLogger("app")
|
|
@@ -109,6 +109,13 @@ def handle_node(receiver, head_data, data_source=None, resource_type=None):
|
|
'time': get_formatted_time(),
|
|
'time': get_formatted_time(),
|
|
'type': type_value # 根据资源类型设置不同的type值
|
|
'type': type_value # 根据资源类型设置不同的type值
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ # 记录describe字段是否存在于创建数据中
|
|
|
|
+ if "describe" in receiver:
|
|
|
|
+ logger.info(f"创建资源,describe字段将被设置为: {receiver.get('describe')}")
|
|
|
|
+ else:
|
|
|
|
+ logger.info("创建资源,describe字段不在创建数据中")
|
|
|
|
+
|
|
if 'additional_info' in receiver:
|
|
if 'additional_info' in receiver:
|
|
del receiver['additional_info']
|
|
del receiver['additional_info']
|
|
# 从receiver中移除data_source属性,避免将复杂对象作为节点属性
|
|
# 从receiver中移除data_source属性,避免将复杂对象作为节点属性
|
|
@@ -130,6 +137,9 @@ def handle_node(receiver, head_data, data_source=None, resource_type=None):
|
|
result = session.run(cypher, **receiver)
|
|
result = session.run(cypher, **receiver)
|
|
data_resource_node = result.single()["n"]
|
|
data_resource_node = result.single()["n"]
|
|
resource_id = data_resource_node.id # 使用id属性获取数值ID
|
|
resource_id = data_resource_node.id # 使用id属性获取数值ID
|
|
|
|
+
|
|
|
|
+ # 记录创建后的节点数据
|
|
|
|
+ logger.info(f"创建后的节点数据,describe字段: {data_resource_node.get('describe')}")
|
|
|
|
|
|
# 处理标签关系
|
|
# 处理标签关系
|
|
if tag_list:
|
|
if tag_list:
|
|
@@ -301,7 +311,18 @@ def handle_id_resource(resource_id):
|
|
return None
|
|
return None
|
|
|
|
|
|
# 构建返回数据
|
|
# 构建返回数据
|
|
|
|
+ logger.info(f"record: {record}")
|
|
|
|
+
|
|
data_resource = dict(record["n"])
|
|
data_resource = dict(record["n"])
|
|
|
|
+
|
|
|
|
+ logger.info(f"data_resource: {data_resource}")
|
|
|
|
+ logger.info(f"describe field in node: {record['n'].get('describe')}")
|
|
|
|
+
|
|
|
|
+ # 确保describe字段存在,即使为null也记录下来
|
|
|
|
+ if 'describe' in record["n"]:
|
|
|
|
+ data_resource["describe"] = record["n"].get('describe')
|
|
|
|
+ logger.info(f"设置describe字段: {data_resource['describe']}")
|
|
|
|
+
|
|
data_resource["id"] = record["n"].id
|
|
data_resource["id"] = record["n"].id
|
|
|
|
|
|
# 查询关联的标签
|
|
# 查询关联的标签
|
|
@@ -313,10 +334,18 @@ def handle_id_resource(resource_id):
|
|
tag_result = session.run(tag_cypher, resource_id=resource_id_int)
|
|
tag_result = session.run(tag_cypher, resource_id=resource_id_int)
|
|
tag_record = tag_result.single()
|
|
tag_record = tag_result.single()
|
|
|
|
|
|
|
|
+ # 设置标签信息
|
|
if tag_record:
|
|
if tag_record:
|
|
- tag = dict(tag_record["t"])
|
|
|
|
- tag["id"] = tag_record["t"].id
|
|
|
|
- data_resource["tag_info"] = tag
|
|
|
|
|
|
+ tag = {
|
|
|
|
+ "name": tag_record["t"].get("name"),
|
|
|
|
+ "id": tag_record["t"].id
|
|
|
|
+ }
|
|
|
|
+ else:
|
|
|
|
+ tag = {
|
|
|
|
+ "name": None,
|
|
|
|
+ "id": None
|
|
|
|
+ }
|
|
|
|
+ data_resource["tag"] = tag
|
|
|
|
|
|
# 查询关联的元数据 - 支持meta_data和Metadata两种标签
|
|
# 查询关联的元数据 - 支持meta_data和Metadata两种标签
|
|
meta_cypher = """
|
|
meta_cypher = """
|
|
@@ -327,15 +356,46 @@ def handle_id_resource(resource_id):
|
|
"""
|
|
"""
|
|
meta_result = session.run(meta_cypher, resource_id=resource_id_int)
|
|
meta_result = session.run(meta_cypher, resource_id=resource_id_int)
|
|
|
|
|
|
- meta_list = []
|
|
|
|
|
|
+ parsed_data = []
|
|
for meta_record in meta_result:
|
|
for meta_record in meta_result:
|
|
meta = dict(meta_record["m"])
|
|
meta = dict(meta_record["m"])
|
|
- meta["id"] = meta_record["m"].id
|
|
|
|
- meta_list.append(meta)
|
|
|
|
|
|
+ meta_data = {
|
|
|
|
+ "id": meta_record["m"].id,
|
|
|
|
+ "name": meta.get("name"),
|
|
|
|
+ "en_name": meta.get("en_name"),
|
|
|
|
+ "data_type": meta.get("data_type"),
|
|
|
|
+ "data_standard": {
|
|
|
|
+ "name": None,
|
|
|
|
+ "id": None
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ parsed_data.append(meta_data)
|
|
|
|
+
|
|
|
|
+ data_resource["parsed_data"] = parsed_data
|
|
|
|
+
|
|
|
|
+ # 确保所有必需字段都有默认值
|
|
|
|
+ required_fields = {
|
|
|
|
+ "leader": "",
|
|
|
|
+ "organization": "",
|
|
|
|
+ "name": "",
|
|
|
|
+ "en_name": "",
|
|
|
|
+ "data_sensitivity": "",
|
|
|
|
+ "location": "/",
|
|
|
|
+ "time": "",
|
|
|
|
+ "type": "",
|
|
|
|
+ "category": "",
|
|
|
|
+ "url": "",
|
|
|
|
+ "frequency": "",
|
|
|
|
+ "status": True,
|
|
|
|
+ "keywords": [],
|
|
|
|
+ "describe": ""
|
|
|
|
+ }
|
|
|
|
|
|
- data_resource["meta_list"] = meta_list
|
|
|
|
|
|
+ for field, default_value in required_fields.items():
|
|
|
|
+ if field not in data_resource or data_resource[field] is None:
|
|
|
|
+ data_resource[field] = default_value
|
|
|
|
|
|
- logger.info(f"成功获取资源详情,ID: {resource_id_int}")
|
|
|
|
|
|
+ logger.info(f"成功获取资源详情,ID: {resource_id_int}, describe: {data_resource.get('describe')}")
|
|
return data_resource
|
|
return data_resource
|
|
except Exception as e:
|
|
except Exception as e:
|
|
logger.error(f"处理单个数据资源查询失败: {str(e)}")
|
|
logger.error(f"处理单个数据资源查询失败: {str(e)}")
|
|
@@ -1145,9 +1205,15 @@ def data_resource_edit(data):
|
|
# 更新节点属性
|
|
# 更新节点属性
|
|
update_fields = {}
|
|
update_fields = {}
|
|
for key, value in data.items():
|
|
for key, value in data.items():
|
|
- if key != "id" and key != "tag":
|
|
|
|
|
|
+ if key != "id" and key != "parsed_data" and value is not None:
|
|
update_fields[key] = value
|
|
update_fields[key] = value
|
|
|
|
|
|
|
|
+ # 记录describe字段是否存在于待更新数据中
|
|
|
|
+ if "describe" in data:
|
|
|
|
+ logger.info(f"编辑资源,describe字段将被更新为: {data.get('describe')}")
|
|
|
|
+ else:
|
|
|
|
+ logger.info("编辑资源,describe字段不在更新数据中")
|
|
|
|
+
|
|
# 添加更新时间
|
|
# 添加更新时间
|
|
update_fields["updateTime"] = get_formatted_time()
|
|
update_fields["updateTime"] = get_formatted_time()
|
|
|
|
|
|
@@ -1165,7 +1231,10 @@ def data_resource_edit(data):
|
|
|
|
|
|
if not updated_node:
|
|
if not updated_node:
|
|
raise ValueError("资源不存在")
|
|
raise ValueError("资源不存在")
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ # 记录更新后的节点数据
|
|
|
|
+ logger.info(f"更新后的节点数据,describe字段: {updated_node['n'].get('describe')}")
|
|
|
|
+
|
|
# 处理标签关系
|
|
# 处理标签关系
|
|
tag_id = data.get("tag")
|
|
tag_id = data.get("tag")
|
|
if tag_id:
|
|
if tag_id:
|
|
@@ -1184,11 +1253,82 @@ def data_resource_edit(data):
|
|
CREATE (n)-[r:label]->(t)
|
|
CREATE (n)-[r:label]->(t)
|
|
RETURN r
|
|
RETURN r
|
|
"""
|
|
"""
|
|
-
|
|
|
|
session.run(create_rel_cypher, resource_id=int(resource_id), tag_id=int(tag_id))
|
|
session.run(create_rel_cypher, resource_id=int(resource_id), tag_id=int(tag_id))
|
|
|
|
|
|
|
|
+ # 处理元数据关系
|
|
|
|
+ parsed_data = data.get("parsed_data", [])
|
|
|
|
+ if parsed_data:
|
|
|
|
+ # 删除旧的元数据关系
|
|
|
|
+ delete_meta_cypher = """
|
|
|
|
+ MATCH (n:data_resource)-[r:contain]->()
|
|
|
|
+ WHERE id(n) = $resource_id
|
|
|
|
+ DELETE r
|
|
|
|
+ """
|
|
|
|
+ session.run(delete_meta_cypher, resource_id=int(resource_id))
|
|
|
|
+
|
|
|
|
+ # 删除旧的清洗资源关系
|
|
|
|
+ delete_clean_cypher = """
|
|
|
|
+ MATCH (n:data_resource)-[r:clean_resource]->()
|
|
|
|
+ WHERE id(n) = $resource_id
|
|
|
|
+ DELETE r
|
|
|
|
+ """
|
|
|
|
+ session.run(delete_clean_cypher, resource_id=int(resource_id))
|
|
|
|
+
|
|
|
|
+ # 创建新的元数据关系和相关关系
|
|
|
|
+ for meta in parsed_data:
|
|
|
|
+ meta_id = meta.get("id")
|
|
|
|
+ if meta_id:
|
|
|
|
+ # 创建元数据关系
|
|
|
|
+ create_meta_cypher = """
|
|
|
|
+ MATCH (n:data_resource), (m:meta_data)
|
|
|
|
+ WHERE id(n) = $resource_id AND id(m) = $meta_id
|
|
|
|
+ CREATE (n)-[r:contain]->(m)
|
|
|
|
+ RETURN r
|
|
|
|
+ """
|
|
|
|
+ session.run(create_meta_cypher, resource_id=int(resource_id), meta_id=int(meta_id))
|
|
|
|
+
|
|
|
|
+ # 处理主数据关系
|
|
|
|
+ master_data = meta.get("master_data")
|
|
|
|
+ if master_data:
|
|
|
|
+ # 创建主数据关系
|
|
|
|
+ create_master_cypher = """
|
|
|
|
+ MATCH (master), (meta:meta_data)
|
|
|
|
+ WHERE id(master) = $master_id AND id(meta) = $meta_id
|
|
|
|
+ MERGE (master)-[r:master]->(meta)
|
|
|
|
+ RETURN r
|
|
|
|
+ """
|
|
|
|
+ session.run(create_master_cypher, master_id=int(master_data), meta_id=int(meta_id))
|
|
|
|
+
|
|
|
|
+ # 处理数据标准关系
|
|
|
|
+ data_standard = meta.get("data_standard")
|
|
|
|
+ if data_standard and isinstance(data_standard, dict) and data_standard.get("id"):
|
|
|
|
+ standard_id = data_standard.get("id")
|
|
|
|
+ # 创建数据标准与元数据的关系
|
|
|
|
+ create_standard_meta_cypher = """
|
|
|
|
+ MATCH (standard), (meta:meta_data)
|
|
|
|
+ WHERE id(standard) = $standard_id AND id(meta) = $meta_id
|
|
|
|
+ MERGE (standard)-[r:clean_resource]->(meta)
|
|
|
|
+ RETURN r
|
|
|
|
+ """
|
|
|
|
+ session.run(create_standard_meta_cypher, standard_id=int(standard_id), meta_id=int(meta_id))
|
|
|
|
+
|
|
|
|
+ # 创建数据资源与数据标准的关系
|
|
|
|
+ create_resource_standard_cypher = """
|
|
|
|
+ MATCH (resource:data_resource), (standard)
|
|
|
|
+ WHERE id(resource) = $resource_id AND id(standard) = $standard_id
|
|
|
|
+ MERGE (resource)-[r:clean_resource]->(standard)
|
|
|
|
+ RETURN r
|
|
|
|
+ """
|
|
|
|
+ session.run(create_resource_standard_cypher, resource_id=int(resource_id), standard_id=int(standard_id))
|
|
|
|
+
|
|
# 返回更新后的节点
|
|
# 返回更新后的节点
|
|
- return dict(updated_node["n"])
|
|
|
|
|
|
+ node_data = dict(updated_node["n"])
|
|
|
|
+ node_data["id"] = updated_node["n"].id
|
|
|
|
+
|
|
|
|
+ # 记录最终返回的describe字段
|
|
|
|
+ logger.info(f"data_resource_edit返回数据,describe字段: {node_data.get('describe')}")
|
|
|
|
+
|
|
|
|
+ return node_data
|
|
except Exception as e:
|
|
except Exception as e:
|
|
logger.error(f"编辑数据资源失败: {str(e)}")
|
|
logger.error(f"编辑数据资源失败: {str(e)}")
|
|
raise
|
|
raise
|