|
@@ -105,7 +105,7 @@ def handle_node(receiver, head_data, data_source=None, resource_type=None):
|
|
|
|
|
|
# 更新属性
|
|
# 更新属性
|
|
update_attributes = {
|
|
update_attributes = {
|
|
- 'en_name': receiver['en_name'],
|
|
|
|
|
|
+ 'en_name': receiver.get('en_name', receiver.get('name', '')),
|
|
'time': get_formatted_time(),
|
|
'time': get_formatted_time(),
|
|
'type': type_value # 根据资源类型设置不同的type值
|
|
'type': type_value # 根据资源类型设置不同的type值
|
|
}
|
|
}
|
|
@@ -232,7 +232,7 @@ def handle_node(receiver, head_data, data_source=None, resource_type=None):
|
|
if data_source_en_name:
|
|
if data_source_en_name:
|
|
# 创建 originates_from 关系
|
|
# 创建 originates_from 关系
|
|
rel_data_source_cypher = """
|
|
rel_data_source_cypher = """
|
|
- MATCH (a:data_resource), (b:data_source)
|
|
|
|
|
|
+ MATCH (a:data_resource), (b:DataSource)
|
|
WHERE id(a) = $resource_id AND b.en_name = $ds_en_name
|
|
WHERE id(a) = $resource_id AND b.en_name = $ds_en_name
|
|
MERGE (a)-[r:originates_from]->(b)
|
|
MERGE (a)-[r:originates_from]->(b)
|
|
RETURN r
|
|
RETURN r
|
|
@@ -252,7 +252,7 @@ def handle_node(receiver, head_data, data_source=None, resource_type=None):
|
|
logger.error(error_msg)
|
|
logger.error(error_msg)
|
|
|
|
|
|
# 检查数据源节点是否存在
|
|
# 检查数据源节点是否存在
|
|
- check_ds_cypher = "MATCH (b:data_source) WHERE b.en_name = $ds_en_name RETURN b"
|
|
|
|
|
|
+ check_ds_cypher = "MATCH (b:DataSource) WHERE b.en_name = $ds_en_name RETURN b"
|
|
check_ds_result = session.run(check_ds_cypher, ds_en_name=data_source_en_name)
|
|
check_ds_result = session.run(check_ds_cypher, ds_en_name=data_source_en_name)
|
|
if not check_ds_result.single():
|
|
if not check_ds_result.single():
|
|
logger.error(f"数据源节点不存在: en_name={data_source_en_name}")
|
|
logger.error(f"数据源节点不存在: en_name={data_source_en_name}")
|
|
@@ -1194,44 +1194,40 @@ def data_resource_edit(data):
|
|
raise
|
|
raise
|
|
|
|
|
|
def handle_data_source(data_source):
|
|
def handle_data_source(data_source):
|
|
- """处理数据源的检查和创建
|
|
|
|
- """
|
|
|
|
|
|
+ """处理数据源信息,创建或获取数据源节点"""
|
|
try:
|
|
try:
|
|
- # 1. 检查en_name是否存在
|
|
|
|
- ds_en_name = data_source.get("en_name")
|
|
|
|
- if not ds_en_name:
|
|
|
|
- raise ValueError("数据源信息不完整,缺少名称(en_name)")
|
|
|
|
-
|
|
|
|
- # 2. 处理name字段
|
|
|
|
- if "name" not in data_source or not data_source["name"]:
|
|
|
|
- data_source["name"] = ds_en_name
|
|
|
|
- logger.debug(f"数据源name为空,使用en_name作为替代: {ds_en_name}")
|
|
|
|
-
|
|
|
|
- # 3. 检查是否为简单查询模式
|
|
|
|
- required_fields = ["type", "host", "port", "database", "username"]
|
|
|
|
- has_required_fields = all(data_source.get(field) for field in required_fields)
|
|
|
|
-
|
|
|
|
with neo4j_driver.get_session() as session:
|
|
with neo4j_driver.get_session() as session:
|
|
- # 简单查询模式:只通过en_name查找已有数据源
|
|
|
|
- if not has_required_fields:
|
|
|
|
- logger.info(f"简单数据源查询模式,查找en_name为: {ds_en_name}")
|
|
|
|
- check_name_cypher = """
|
|
|
|
- MATCH (ds:data_source {en_name: $en_name})
|
|
|
|
- RETURN ds
|
|
|
|
- """
|
|
|
|
- check_result = session.run(check_name_cypher, en_name=ds_en_name)
|
|
|
|
- existing_record = check_result.single()
|
|
|
|
|
|
+ # 获取英文名称作为唯一标识
|
|
|
|
+ ds_en_name = data_source.get("en_name")
|
|
|
|
+ if not ds_en_name:
|
|
|
|
+ logger.error("数据源缺少必要的en_name属性")
|
|
|
|
+ return None
|
|
|
|
|
|
- if existing_record:
|
|
|
|
- # 数据源已存在,返回其名称
|
|
|
|
- existing_data_source = dict(existing_record["ds"])
|
|
|
|
- logger.info(f"根据名称找到现有数据源: {existing_data_source.get('en_name')}")
|
|
|
|
- return existing_data_source.get("en_name")
|
|
|
|
- else:
|
|
|
|
- # 数据源不存在,抛出异常
|
|
|
|
- raise ValueError(f"未找到名称为 {ds_en_name} 的数据源,请先创建该数据源或提供完整的数据源信息")
|
|
|
|
|
|
+ # 如果没有设置name,使用en_name作为name
|
|
|
|
+ if "name" not in data_source or not data_source["name"]:
|
|
|
|
+ data_source["name"] = ds_en_name
|
|
|
|
+
|
|
|
|
+ # 检查必填字段
|
|
|
|
+ required_fields = ["type", "host", "port", "database", "username"]
|
|
|
|
+ has_required_fields = all(data_source.get(field) for field in required_fields)
|
|
|
|
+
|
|
|
|
+ # 查询是否已存在相同en_name的数据源
|
|
|
|
+ existing_cypher = """
|
|
|
|
+ MATCH (ds:DataSource {en_name: $en_name})
|
|
|
|
+ RETURN ds
|
|
|
|
+ """
|
|
|
|
+
|
|
|
|
+ existing_result = session.run(existing_cypher, en_name=ds_en_name)
|
|
|
|
+ existing_record = existing_result.single()
|
|
|
|
+
|
|
|
|
+ if existing_record:
|
|
|
|
+ existing_data_source = dict(existing_record["ds"])
|
|
|
|
+ logger.info(f"根据名称找到现有数据源: {existing_data_source.get('en_name')}")
|
|
|
|
+ return existing_data_source.get("en_name")
|
|
|
|
+ else:
|
|
|
|
+ # 数据源不存在,抛出异常
|
|
|
|
+ raise ValueError(f"未找到名称为 {ds_en_name} 的数据源,请先创建该数据源或提供完整的数据源信息")
|
|
|
|
|
|
-
|
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
logger.error(f"处理数据源失败: {str(e)}")
|
|
logger.error(f"处理数据源失败: {str(e)}")
|
|
raise RuntimeError(f"处理数据源失败: {str(e)}")
|
|
raise RuntimeError(f"处理数据源失败: {str(e)}")
|