|
@@ -1254,16 +1254,25 @@ def data_resource_edit(data):
|
|
|
# 添加更新时间
|
|
|
update_fields["updateTime"] = get_formatted_time()
|
|
|
|
|
|
- # 构建更新语句
|
|
|
- set_clause = ", ".join([f"n.{k} = ${k}" for k in update_fields.keys()])
|
|
|
- cypher = f"""
|
|
|
- MATCH (n:DataResource)
|
|
|
- WHERE id(n) = $resource_id
|
|
|
- SET {set_clause}
|
|
|
- RETURN n
|
|
|
- """
|
|
|
+ # 构建更新语句,确保至少有 updateTime 字段要更新
|
|
|
+ if update_fields:
|
|
|
+ set_clause = ", ".join([f"n.{k} = ${k}" for k in update_fields.keys()])
|
|
|
+ cypher = f"""
|
|
|
+ MATCH (n:DataResource)
|
|
|
+ WHERE id(n) = $resource_id
|
|
|
+ SET {set_clause}
|
|
|
+ RETURN n
|
|
|
+ """
|
|
|
+ result = session.run(cypher, resource_id=int(resource_id), **update_fields)
|
|
|
+ else:
|
|
|
+ # 如果没有字段需要更新,只查询节点
|
|
|
+ cypher = """
|
|
|
+ MATCH (n:DataResource)
|
|
|
+ WHERE id(n) = $resource_id
|
|
|
+ RETURN n
|
|
|
+ """
|
|
|
+ result = session.run(cypher, resource_id=int(resource_id))
|
|
|
|
|
|
- result = session.run(cypher, resource_id=int(resource_id), **update_fields)
|
|
|
updated_node = result.single()
|
|
|
|
|
|
if not updated_node:
|
|
@@ -1294,22 +1303,75 @@ def data_resource_edit(data):
|
|
|
|
|
|
# 处理元数据关系
|
|
|
parsed_data = data.get("parsed_data", [])
|
|
|
+
|
|
|
+ # 首先删除旧的元数据关系和清洗资源关系(无论parsed_data是否为空都要执行)
|
|
|
+ delete_meta_cypher = """
|
|
|
+ MATCH (n:DataResource)-[r:contain]->()
|
|
|
+ WHERE id(n) = $resource_id
|
|
|
+ DELETE r
|
|
|
+ """
|
|
|
+ session.run(delete_meta_cypher, resource_id=int(resource_id))
|
|
|
+
|
|
|
+ delete_clean_cypher = """
|
|
|
+ MATCH (n:DataResource)-[r:clean_resource]->()
|
|
|
+ WHERE id(n) = $resource_id
|
|
|
+ DELETE r
|
|
|
+ """
|
|
|
+ session.run(delete_clean_cypher, resource_id=int(resource_id))
|
|
|
+
|
|
|
+ # 根据parsed_data是否为空来决定是否执行预处理和关系新建操作
|
|
|
if parsed_data:
|
|
|
- # 删除旧的元数据关系
|
|
|
- delete_meta_cypher = """
|
|
|
- MATCH (n:DataResource)-[r:contain]->()
|
|
|
- WHERE id(n) = $resource_id
|
|
|
- DELETE r
|
|
|
- """
|
|
|
- session.run(delete_meta_cypher, resource_id=int(resource_id))
|
|
|
-
|
|
|
- # 删除旧的清洗资源关系
|
|
|
- delete_clean_cypher = """
|
|
|
- MATCH (n:DataResource)-[r:clean_resource]->()
|
|
|
- WHERE id(n) = $resource_id
|
|
|
- DELETE r
|
|
|
- """
|
|
|
- session.run(delete_clean_cypher, resource_id=int(resource_id))
|
|
|
+ # 预处理 parsed_data,确保每个 metadata 都有有效的 ID
|
|
|
+ for meta in parsed_data:
|
|
|
+ meta_id = meta.get("id")
|
|
|
+ meta_name = meta.get("name")
|
|
|
+
|
|
|
+ if not meta_id and meta_name:
|
|
|
+ # 如果没有 ID 但有 name,先根据 name 查找是否存在对应的 DataMeta 节点
|
|
|
+ find_meta_cypher = """
|
|
|
+ MATCH (m:DataMeta {name: $meta_name})
|
|
|
+ RETURN m
|
|
|
+ """
|
|
|
+ find_result = session.run(find_meta_cypher, meta_name=meta_name)
|
|
|
+ existing_meta = find_result.single()
|
|
|
+
|
|
|
+ if existing_meta:
|
|
|
+ # 如果找到了,使用现有的 ID
|
|
|
+ meta_id = existing_meta["m"].id
|
|
|
+ meta["id"] = meta_id
|
|
|
+ logger.info(f"找到现有的DataMeta节点: {meta_name}, ID: {meta_id}")
|
|
|
+ else:
|
|
|
+ # 如果没有找到,创建新的 DataMeta 节点
|
|
|
+ create_meta_cypher = """
|
|
|
+ CREATE (m:DataMeta {
|
|
|
+ name: $name,
|
|
|
+ en_name: $en_name,
|
|
|
+ data_type: $data_type,
|
|
|
+ createTime: $create_time,
|
|
|
+ updateTime: $update_time
|
|
|
+ })
|
|
|
+ RETURN m
|
|
|
+ """
|
|
|
+ create_time = get_formatted_time()
|
|
|
+ new_meta_result = session.run(
|
|
|
+ create_meta_cypher,
|
|
|
+ name=meta_name,
|
|
|
+ en_name=meta.get("en_name", meta_name),
|
|
|
+ data_type=meta.get("data_type", "varchar(255)"),
|
|
|
+ create_time=create_time,
|
|
|
+ update_time=create_time
|
|
|
+ )
|
|
|
+ new_meta = new_meta_result.single()
|
|
|
+ if new_meta:
|
|
|
+ meta_id = new_meta["m"].id
|
|
|
+ meta["id"] = meta_id
|
|
|
+ logger.info(f"创建新的DataMeta节点: {meta_name}, ID: {meta_id}")
|
|
|
+ else:
|
|
|
+ logger.error(f"创建DataMeta节点失败: {meta_name}")
|
|
|
+ continue
|
|
|
+ elif not meta_id:
|
|
|
+ logger.warning(f"跳过没有ID和name的metadata: {meta}")
|
|
|
+ continue
|
|
|
|
|
|
# 创建新的元数据关系和相关关系
|
|
|
for meta in parsed_data:
|
|
@@ -1357,9 +1419,11 @@ def data_resource_edit(data):
|
|
|
RETURN r
|
|
|
"""
|
|
|
session.run(create_resource_standard_cypher, resource_id=int(resource_id), standard_id=int(standard_id))
|
|
|
+ else:
|
|
|
+ logger.info(f"parsed_data为空,只删除旧的元数据关系,不创建新的关系")
|
|
|
|
|
|
# 返回更新后的节点
|
|
|
- node_data = serialize_node_properties(updated_node["n"])
|
|
|
+ node_data = serialize_node_properties(updated_node["n"])
|
|
|
node_data["id"] = updated_node["n"].id
|
|
|
|
|
|
# 记录最终返回的describe字段
|