|
@@ -236,12 +236,34 @@ def handle_node(receiver, head_data, data_source=None, resource_type=None):
|
|
MERGE (a)-[r:isbelongto]->(b)
|
|
MERGE (a)-[r:isbelongto]->(b)
|
|
RETURN r
|
|
RETURN r
|
|
"""
|
|
"""
|
|
- session.run(
|
|
|
|
|
|
+ rel_result = session.run(
|
|
rel_data_source_cypher,
|
|
rel_data_source_cypher,
|
|
resource_id=resource_id,
|
|
resource_id=resource_id,
|
|
ds_en_name=data_source_en_name
|
|
ds_en_name=data_source_en_name
|
|
)
|
|
)
|
|
- logger.info(f"已创建数据资源与数据源的关系: {resource_id} -> {data_source_en_name}")
|
|
|
|
|
|
+ rel_record = rel_result.single()
|
|
|
|
+
|
|
|
|
+ if rel_record:
|
|
|
|
+ logger.info(f"已创建数据资源与数据源的关系: {resource_id} -> {data_source_en_name}")
|
|
|
|
+ else:
|
|
|
|
+ # 添加严重错误日志
|
|
|
|
+ error_msg = f"创建数据资源与数据源的关系失败: {resource_id} -> {data_source_en_name}"
|
|
|
|
+ logger.error(error_msg)
|
|
|
|
+
|
|
|
|
+ # 检查数据源节点是否存在
|
|
|
|
+ check_ds_cypher = "MATCH (b:data_source) WHERE b.en_name = $ds_en_name RETURN b"
|
|
|
|
+ check_ds_result = session.run(check_ds_cypher, ds_en_name=data_source_en_name)
|
|
|
|
+ if not check_ds_result.single():
|
|
|
|
+ logger.error(f"数据源节点不存在: en_name={data_source_en_name}")
|
|
|
|
+
|
|
|
|
+ # 检查数据资源节点是否存在
|
|
|
|
+ check_res_cypher = "MATCH (a:data_resource) WHERE id(a) = $resource_id RETURN a"
|
|
|
|
+ check_res_result = session.run(check_res_cypher, resource_id=resource_id)
|
|
|
|
+ if not check_res_result.single():
|
|
|
|
+ logger.error(f"数据资源节点不存在: id={resource_id}")
|
|
|
|
+
|
|
|
|
+ # 严重错误应该抛出异常
|
|
|
|
+ raise RuntimeError(error_msg)
|
|
except Exception as e:
|
|
except Exception as e:
|
|
logger.error(f"处理数据源关系失败: {str(e)}")
|
|
logger.error(f"处理数据源关系失败: {str(e)}")
|
|
raise RuntimeError(f"处理数据源关系失败: {str(e)}")
|
|
raise RuntimeError(f"处理数据源关系失败: {str(e)}")
|
|
@@ -1247,14 +1269,17 @@ def handle_data_source(data_source):
|
|
ds.username = $username
|
|
ds.username = $username
|
|
RETURN ds
|
|
RETURN ds
|
|
"""
|
|
"""
|
|
-
|
|
|
|
|
|
+ # 这里列出的字段将会作为数据源的属性
|
|
connection_info = {
|
|
connection_info = {
|
|
"type": data_source.get("type", "").lower(),
|
|
"type": data_source.get("type", "").lower(),
|
|
"host": data_source.get("host"),
|
|
"host": data_source.get("host"),
|
|
"port": data_source.get("port"),
|
|
"port": data_source.get("port"),
|
|
"database": data_source.get("database"), # 支持新旧字段名
|
|
"database": data_source.get("database"), # 支持新旧字段名
|
|
"username": data_source.get("username"), # 支持新旧字段名
|
|
"username": data_source.get("username"), # 支持新旧字段名
|
|
- "password": data_source.get("password") # 支持新旧字段名
|
|
|
|
|
|
+ "password": data_source.get("password"), # 支持新旧字段名
|
|
|
|
+ "en_name": ds_en_name, # 确保包含en_name属性
|
|
|
|
+ "name": data_source["name"], # 确保包含name属性
|
|
|
|
+ "createTime": get_formatted_time() # 添加创建时间
|
|
}
|
|
}
|
|
|
|
|
|
check_result = session.run(
|
|
check_result = session.run(
|
|
@@ -1280,9 +1305,6 @@ def handle_data_source(data_source):
|
|
RETURN ds
|
|
RETURN ds
|
|
"""
|
|
"""
|
|
|
|
|
|
- # 添加创建时间
|
|
|
|
- data_source["createTime"] = get_formatted_time()
|
|
|
|
-
|
|
|
|
create_result = session.run(create_cypher, properties=connection_info)
|
|
create_result = session.run(create_cypher, properties=connection_info)
|
|
created_record = create_result.single()
|
|
created_record = create_result.single()
|
|
|
|
|