|
|
@@ -142,8 +142,8 @@ def handle_node(receiver, head_data, data_source=None, resource_type=None):
|
|
|
|
|
|
# 更新属性
|
|
|
update_attributes = {
|
|
|
- 'en_name': receiver.get('en_name', receiver.get('name', '')),
|
|
|
- 'time': get_formatted_time(),
|
|
|
+ 'name_en': receiver.get('name_en', receiver.get('name_zh', '')),
|
|
|
+ 'create_time': get_formatted_time(),
|
|
|
'type': type_value # 根据资源类型设置不同的type值
|
|
|
}
|
|
|
|
|
|
@@ -166,7 +166,7 @@ def handle_node(receiver, head_data, data_source=None, resource_type=None):
|
|
|
with neo4j_driver.get_session() as session:
|
|
|
props_str = ", ".join([f"{k}: ${k}" for k in receiver.keys()])
|
|
|
cypher = f"""
|
|
|
- MERGE (n:DataResource {{name: $name}})
|
|
|
+ MERGE (n:DataResource {{name_zh: $name_zh}})
|
|
|
ON CREATE SET n = {{{props_str}}}
|
|
|
ON MATCH SET {", ".join([f"n.{k} = ${k}" for k in receiver.keys()])}
|
|
|
RETURN n
|
|
|
@@ -206,12 +206,12 @@ def handle_node(receiver, head_data, data_source=None, resource_type=None):
|
|
|
for item in head_data:
|
|
|
# 创建元数据节点
|
|
|
meta_cypher = """
|
|
|
- MERGE (m:DataMeta {name: $name})
|
|
|
- ON CREATE SET m.en_name = $en_name,
|
|
|
+ MERGE (m:DataMeta {name_zh: $name_zh})
|
|
|
+ ON CREATE SET m.name_en = $name_en,
|
|
|
m.create_time = $create_time,
|
|
|
- m.data_type = $type,
|
|
|
+ m.data_type = $data_type,
|
|
|
m.status = true
|
|
|
- ON MATCH SET m.data_type = $type,
|
|
|
+ ON MATCH SET m.data_type = $data_type,
|
|
|
m.status = true
|
|
|
RETURN m
|
|
|
"""
|
|
|
@@ -219,10 +219,10 @@ def handle_node(receiver, head_data, data_source=None, resource_type=None):
|
|
|
create_time = get_formatted_time()
|
|
|
meta_result = session.run(
|
|
|
meta_cypher,
|
|
|
- name=item['name'],
|
|
|
- en_name=item['en_name'],
|
|
|
+ name_zh=item['name_zh'],
|
|
|
+ name_en=item['name_en'],
|
|
|
create_time=create_time,
|
|
|
- type=item['data_type'] # 使用data_type作为data_type属性
|
|
|
+ data_type=item['data_type'] # 使用data_type作为data_type属性
|
|
|
)
|
|
|
meta_record = meta_result.single()
|
|
|
|
|
|
@@ -231,7 +231,7 @@ def handle_node(receiver, head_data, data_source=None, resource_type=None):
|
|
|
meta_id = meta_node.id # 使用数值ID
|
|
|
|
|
|
# 打印日志确认节点创建成功和ID
|
|
|
- logger.info(f"创建或获取到元数据节点: ID={meta_id}, name={item['name']}")
|
|
|
+ logger.info(f"创建或获取到元数据节点: ID={meta_id}, name_zh={item['name_zh']}")
|
|
|
|
|
|
# 确认数据资源节点是否可以正确查询到
|
|
|
check_resource_cypher = """
|
|
|
@@ -266,43 +266,43 @@ def handle_node(receiver, head_data, data_source=None, resource_type=None):
|
|
|
else:
|
|
|
logger.warning(f"创建数据资源与元数据的关系失败: {resource_id} -> {meta_id}")
|
|
|
else:
|
|
|
- logger.error(f"未能创建或获取元数据节点: {item['name']}")
|
|
|
+ logger.error(f"未能创建或获取元数据节点: {item['name_zh']}")
|
|
|
|
|
|
# 处理数据源关系
|
|
|
if data_source and resource_type == 'ddl':
|
|
|
try:
|
|
|
# 创建或获取数据源节点
|
|
|
- # data_source_en_name = handle_data_source(data_source)
|
|
|
- data_source_en_name = data_source['en_name']
|
|
|
+ # data_source_name_en = handle_data_source(data_source)
|
|
|
+ data_source_name_en = data_source['name_en']
|
|
|
|
|
|
# 创建数据资源与数据源的关系
|
|
|
- if data_source_en_name:
|
|
|
+ if data_source_name_en:
|
|
|
# 创建 originates_from 关系
|
|
|
rel_data_source_cypher = """
|
|
|
MATCH (a:DataResource), (b:DataSource)
|
|
|
- WHERE id(a) = $resource_id AND b.en_name = $ds_en_name
|
|
|
+ WHERE id(a) = $resource_id AND b.name_en = $ds_name_en
|
|
|
MERGE (a)-[r:originates_from]->(b)
|
|
|
RETURN r
|
|
|
"""
|
|
|
rel_result = session.run(
|
|
|
rel_data_source_cypher,
|
|
|
resource_id=resource_id,
|
|
|
- ds_en_name=data_source_en_name
|
|
|
+ ds_name_en=data_source_name_en
|
|
|
)
|
|
|
rel_record = rel_result.single()
|
|
|
|
|
|
if rel_record:
|
|
|
- logger.info(f"已创建数据资源与数据源的关系: {resource_id} -> {data_source_en_name}")
|
|
|
+ logger.info(f"已创建数据资源与数据源的关系: {resource_id} -> {data_source_name_en}")
|
|
|
else:
|
|
|
# 添加严重错误日志
|
|
|
- error_msg = f"创建数据资源与数据源的关系失败: {resource_id} -> {data_source_en_name}"
|
|
|
+ error_msg = f"创建数据资源与数据源的关系失败: {resource_id} -> {data_source_name_en}"
|
|
|
logger.error(error_msg)
|
|
|
|
|
|
# 检查数据源节点是否存在
|
|
|
- check_ds_cypher = "MATCH (b:DataSource) WHERE b.en_name = $ds_en_name RETURN b"
|
|
|
- check_ds_result = session.run(check_ds_cypher, ds_en_name=data_source_en_name)
|
|
|
+ check_ds_cypher = "MATCH (b:DataSource) WHERE b.name_en = $ds_name_en RETURN b"
|
|
|
+ check_ds_result = session.run(check_ds_cypher, ds_name_en=data_source_name_en)
|
|
|
if not check_ds_result.single():
|
|
|
- logger.error(f"数据源节点不存在: en_name={data_source_en_name}")
|
|
|
+ logger.error(f"数据源节点不存在: name_en={data_source_name_en}")
|
|
|
|
|
|
# 检查数据资源节点是否存在
|
|
|
check_res_cypher = "MATCH (a:DataResource) WHERE id(a) = $resource_id RETURN a"
|
|
|
@@ -374,12 +374,12 @@ def handle_id_resource(resource_id):
|
|
|
# 设置标签信息
|
|
|
if tag_record:
|
|
|
tag = {
|
|
|
- "name": tag_record["t"].get("name"),
|
|
|
+ "name_zh": tag_record["t"].get("name_zh"),
|
|
|
"id": tag_record["t"].id
|
|
|
}
|
|
|
else:
|
|
|
tag = {
|
|
|
- "name": None,
|
|
|
+ "name_zh": None,
|
|
|
"id": None
|
|
|
}
|
|
|
data_resource["tag"] = tag
|
|
|
@@ -398,11 +398,11 @@ def handle_id_resource(resource_id):
|
|
|
meta = serialize_node_properties(meta_record["m"])
|
|
|
meta_data = {
|
|
|
"id": meta_record["m"].id,
|
|
|
- "name": meta.get("name"),
|
|
|
- "en_name": meta.get("en_name"),
|
|
|
+ "name_zh": meta.get("name_zh"),
|
|
|
+ "name_en": meta.get("name_en"),
|
|
|
"data_type": meta.get("data_type"),
|
|
|
"data_standard": {
|
|
|
- "name": None,
|
|
|
+ "name_zh": None,
|
|
|
"id": None
|
|
|
}
|
|
|
}
|
|
|
@@ -414,11 +414,11 @@ def handle_id_resource(resource_id):
|
|
|
required_fields = {
|
|
|
"leader": "",
|
|
|
"organization": "",
|
|
|
- "name": "",
|
|
|
- "en_name": "",
|
|
|
+ "name_zh": "",
|
|
|
+ "name_en": "",
|
|
|
"data_sensitivity": "",
|
|
|
"storage_location": "/",
|
|
|
- "time": "",
|
|
|
+ "create_time": "",
|
|
|
"type": "",
|
|
|
"category": "",
|
|
|
"url": "",
|
|
|
@@ -483,7 +483,7 @@ def id_resource_graph(resource_id):
|
|
|
logger.error(f"获取数据资源图谱失败: {str(e)}")
|
|
|
return {"nodes": [], "relationships": []}
|
|
|
|
|
|
-def resource_list(page, page_size, en_name_filter=None, name_filter=None,
|
|
|
+def resource_list(page, page_size, name_en_filter=None, name_zh_filter=None,
|
|
|
type_filter='all', category_filter=None, tag_filter=None):
|
|
|
"""获取数据资源列表"""
|
|
|
try:
|
|
|
@@ -491,11 +491,11 @@ def resource_list(page, page_size, en_name_filter=None, name_filter=None,
|
|
|
# 构建基础过滤条件(针对DataResource节点)
|
|
|
resource_conditions = []
|
|
|
|
|
|
- if en_name_filter:
|
|
|
- resource_conditions.append(f"n.en_name CONTAINS '{en_name_filter}'")
|
|
|
+ if name_en_filter:
|
|
|
+ resource_conditions.append(f"n.name_en CONTAINS '{name_en_filter}'")
|
|
|
|
|
|
- if name_filter:
|
|
|
- resource_conditions.append(f"n.name CONTAINS '{name_filter}'")
|
|
|
+ if name_zh_filter:
|
|
|
+ resource_conditions.append(f"n.name_zh CONTAINS '{name_zh_filter}'")
|
|
|
|
|
|
if type_filter and type_filter != 'all':
|
|
|
resource_conditions.append(f"n.type = '{type_filter}'")
|
|
|
@@ -516,7 +516,7 @@ def resource_list(page, page_size, en_name_filter=None, name_filter=None,
|
|
|
WHERE {resource_where}
|
|
|
WITH n
|
|
|
MATCH (n)-[:LABEL]->(t:DataLabel)
|
|
|
- WHERE t.name = '{tag_filter}'
|
|
|
+ WHERE t.name_zh = '{tag_filter}'
|
|
|
RETURN count(DISTINCT n) as count
|
|
|
"""
|
|
|
|
|
|
@@ -527,16 +527,16 @@ def resource_list(page, page_size, en_name_filter=None, name_filter=None,
|
|
|
WHERE {resource_where}
|
|
|
WITH n
|
|
|
MATCH (n)-[:LABEL]->(t:DataLabel)
|
|
|
- WHERE t.name = '{tag_filter}'
|
|
|
+ WHERE t.name_zh = '{tag_filter}'
|
|
|
RETURN DISTINCT n
|
|
|
- ORDER BY n.time DESC
|
|
|
+ ORDER BY n.create_time DESC
|
|
|
SKIP {skip} LIMIT {page_size}
|
|
|
"""
|
|
|
else:
|
|
|
# 只有标签过滤条件
|
|
|
count_cypher = f"""
|
|
|
MATCH (n:DataResource)-[:LABEL]->(t:DataLabel)
|
|
|
- WHERE t.name = '{tag_filter}'
|
|
|
+ WHERE t.name_zh = '{tag_filter}'
|
|
|
RETURN count(DISTINCT n) as count
|
|
|
"""
|
|
|
|
|
|
@@ -544,9 +544,9 @@ def resource_list(page, page_size, en_name_filter=None, name_filter=None,
|
|
|
skip = (page - 1) * page_size
|
|
|
cypher = f"""
|
|
|
MATCH (n:DataResource)-[:LABEL]->(t:DataLabel)
|
|
|
- WHERE t.name = '{tag_filter}'
|
|
|
+ WHERE t.name_zh = '{tag_filter}'
|
|
|
RETURN DISTINCT n
|
|
|
- ORDER BY n.time DESC
|
|
|
+ ORDER BY n.create_time DESC
|
|
|
SKIP {skip} LIMIT {page_size}
|
|
|
"""
|
|
|
else:
|
|
|
@@ -565,7 +565,7 @@ def resource_list(page, page_size, en_name_filter=None, name_filter=None,
|
|
|
MATCH (n:DataResource)
|
|
|
WHERE {resource_where}
|
|
|
RETURN n
|
|
|
- ORDER BY n.time DESC
|
|
|
+ ORDER BY n.create_time DESC
|
|
|
SKIP {skip} LIMIT {page_size}
|
|
|
"""
|
|
|
else:
|
|
|
@@ -577,7 +577,7 @@ def resource_list(page, page_size, en_name_filter=None, name_filter=None,
|
|
|
cypher = f"""
|
|
|
MATCH (n:DataResource)
|
|
|
RETURN n
|
|
|
- ORDER BY n.time DESC
|
|
|
+ ORDER BY n.create_time DESC
|
|
|
SKIP {skip} LIMIT {page_size}
|
|
|
"""
|
|
|
|
|
|
@@ -615,8 +615,8 @@ def resource_list(page, page_size, en_name_filter=None, name_filter=None,
|
|
|
logger.error(f"获取数据资源列表失败: {str(e)}")
|
|
|
return [], 0
|
|
|
|
|
|
-def id_data_search_list(resource_id, page, page_size, en_name_filter=None,
|
|
|
- name_filter=None, category_filter=None, tag_filter=None):
|
|
|
+def id_data_search_list(resource_id, page, page_size, name_en_filter=None,
|
|
|
+ name_zh_filter=None, category_filter=None, tag_filter=None):
|
|
|
"""获取特定数据资源关联的元数据列表"""
|
|
|
try:
|
|
|
with neo4j_driver.get_session() as session:
|
|
|
@@ -636,11 +636,11 @@ def id_data_search_list(resource_id, page, page_size, en_name_filter=None,
|
|
|
|
|
|
where_conditions = []
|
|
|
|
|
|
- if en_name_filter:
|
|
|
- where_conditions.append(f"m.en_name CONTAINS '{en_name_filter}'")
|
|
|
+ if name_en_filter:
|
|
|
+ where_conditions.append(f"m.name_en CONTAINS '{name_en_filter}'")
|
|
|
|
|
|
- if name_filter:
|
|
|
- where_conditions.append(f"m.name CONTAINS '{name_filter}'")
|
|
|
+ if name_zh_filter:
|
|
|
+ where_conditions.append(f"m.name_zh CONTAINS '{name_zh_filter}'")
|
|
|
|
|
|
if category_filter:
|
|
|
where_conditions.append(f"m.category = '{category_filter}'")
|
|
|
@@ -648,7 +648,7 @@ def id_data_search_list(resource_id, page, page_size, en_name_filter=None,
|
|
|
# 标签过滤需要额外的匹配
|
|
|
tag_match = ""
|
|
|
if tag_filter:
|
|
|
- tag_match = "MATCH (m)-[:HAS_TAG]->(t:Tag) WHERE t.name = $tag_filter"
|
|
|
+ tag_match = "MATCH (m)-[:HAS_TAG]->(t:Tag) WHERE t.name_zh = $tag_filter"
|
|
|
|
|
|
where_clause = " AND " + " AND ".join(where_conditions) if where_conditions else ""
|
|
|
|
|
|
@@ -671,7 +671,7 @@ def id_data_search_list(resource_id, page, page_size, en_name_filter=None,
|
|
|
{match_clause}{where_clause}
|
|
|
{tag_match}
|
|
|
RETURN m
|
|
|
- ORDER BY m.name
|
|
|
+ ORDER BY m.name_zh
|
|
|
SKIP {skip} LIMIT {page_size}
|
|
|
"""
|
|
|
|
|
|
@@ -1126,9 +1126,9 @@ def table_sql(sql):
|
|
|
for field_name, field_type in fields:
|
|
|
chinese_name = comments.get(field_name, "")
|
|
|
meta_list.append({
|
|
|
- "en_name": field_name,
|
|
|
+ "name_en": field_name,
|
|
|
"data_type": field_type,
|
|
|
- "name": chinese_name if chinese_name else field_name
|
|
|
+ "name_zh": chinese_name if chinese_name else field_name
|
|
|
})
|
|
|
|
|
|
# 检查表是否存在
|
|
|
@@ -1160,9 +1160,9 @@ def table_sql(sql):
|
|
|
def status_query(key_list):
|
|
|
query = """
|
|
|
unwind $Key_list as name
|
|
|
- OPTIONAL MATCH (n:DataModel {en_name: name})
|
|
|
- OPTIONAL MATCH (n:DataResource {en_name: name})
|
|
|
- OPTIONAL MATCH (n:DataMetric {en_name: name})
|
|
|
+ OPTIONAL MATCH (n:DataModel {name_en: name})
|
|
|
+ OPTIONAL MATCH (n:DataResource {name_en: name})
|
|
|
+ OPTIONAL MATCH (n:DataMetric {name_en: name})
|
|
|
WITH name, CASE
|
|
|
WHEN n IS NOT NULL THEN True
|
|
|
ELSE False
|
|
|
@@ -1264,7 +1264,7 @@ def model_resource_list(page, page_size, name_filter=None):
|
|
|
where_clause = ""
|
|
|
|
|
|
if name_filter:
|
|
|
- where_clause = f" WHERE n.name CONTAINS '{name_filter}'"
|
|
|
+ where_clause = f" WHERE n.name_zh CONTAINS '{name_filter}'"
|
|
|
|
|
|
# 计算总数
|
|
|
count_cypher = f"{match_clause}{where_clause} RETURN count(n) as count"
|
|
|
@@ -1276,7 +1276,7 @@ def model_resource_list(page, page_size, name_filter=None):
|
|
|
cypher = f"""
|
|
|
{match_clause}{where_clause}
|
|
|
RETURN n
|
|
|
- ORDER BY n.createTime DESC
|
|
|
+ ORDER BY n.create_time DESC
|
|
|
SKIP {skip} LIMIT {page_size}
|
|
|
"""
|
|
|
result = session.run(cypher)
|
|
|
@@ -1314,7 +1314,7 @@ def data_resource_edit(data):
|
|
|
logger.info("编辑资源,describe字段不在更新数据中")
|
|
|
|
|
|
# 添加更新时间
|
|
|
- update_fields["updateTime"] = get_formatted_time()
|
|
|
+ update_fields["create_time"] = get_formatted_time()
|
|
|
|
|
|
# 构建更新语句,确保至少有 updateTime 字段要更新
|
|
|
if update_fields:
|
|
|
@@ -1386,12 +1386,12 @@ def data_resource_edit(data):
|
|
|
# 预处理 parsed_data,确保每个 metadata 都有有效的 ID
|
|
|
for meta in parsed_data:
|
|
|
meta_id = meta.get("id")
|
|
|
- meta_name = meta.get("name")
|
|
|
+ meta_name = meta.get("name_zh")
|
|
|
|
|
|
if not meta_id and meta_name:
|
|
|
- # 如果没有 ID 但有 name,先根据 name 查找是否存在对应的 DataMeta 节点
|
|
|
+ # 如果没有 ID 但有 name_zh,先根据 name_zh 查找是否存在对应的 DataMeta 节点
|
|
|
find_meta_cypher = """
|
|
|
- MATCH (m:DataMeta {name: $meta_name})
|
|
|
+ MATCH (m:DataMeta {name_zh: $meta_name})
|
|
|
RETURN m
|
|
|
"""
|
|
|
find_result = session.run(find_meta_cypher, meta_name=meta_name)
|
|
|
@@ -1406,22 +1406,20 @@ def data_resource_edit(data):
|
|
|
# 如果没有找到,创建新的 DataMeta 节点
|
|
|
create_meta_cypher = """
|
|
|
CREATE (m:DataMeta {
|
|
|
- name: $name,
|
|
|
- en_name: $en_name,
|
|
|
+ name_zh: $name_zh,
|
|
|
+ name_en: $name_en,
|
|
|
data_type: $data_type,
|
|
|
- createTime: $create_time,
|
|
|
- updateTime: $update_time
|
|
|
+ create_time: $create_time
|
|
|
})
|
|
|
RETURN m
|
|
|
"""
|
|
|
create_time = get_formatted_time()
|
|
|
new_meta_result = session.run(
|
|
|
create_meta_cypher,
|
|
|
- name=meta_name,
|
|
|
- en_name=meta.get("en_name", meta_name),
|
|
|
+ name_zh=meta_name,
|
|
|
+ name_en=meta.get("name_en", meta_name),
|
|
|
data_type=meta.get("data_type", "varchar(255)"),
|
|
|
- create_time=create_time,
|
|
|
- update_time=create_time
|
|
|
+ create_time=create_time
|
|
|
)
|
|
|
new_meta = new_meta_result.single()
|
|
|
if new_meta:
|
|
|
@@ -1501,35 +1499,35 @@ def handle_data_source(data_source):
|
|
|
try:
|
|
|
with neo4j_driver.get_session() as session:
|
|
|
# 获取英文名称作为唯一标识
|
|
|
- ds_en_name = data_source.get("en_name")
|
|
|
- if not ds_en_name:
|
|
|
- logger.error("数据源缺少必要的en_name属性")
|
|
|
+ ds_name_en = data_source.get("name_en")
|
|
|
+ if not ds_name_en:
|
|
|
+ logger.error("数据源缺少必要的name_en属性")
|
|
|
return None
|
|
|
|
|
|
- # 如果没有设置name,使用en_name作为name
|
|
|
- if "name" not in data_source or not data_source["name"]:
|
|
|
- data_source["name"] = ds_en_name
|
|
|
+ # 如果没有设置name_zh,使用name_en作为name_zh
|
|
|
+ if "name_zh" not in data_source or not data_source["name_zh"]:
|
|
|
+ data_source["name_zh"] = ds_name_en
|
|
|
|
|
|
# 检查必填字段
|
|
|
required_fields = ["type", "host", "port", "database", "username"]
|
|
|
has_required_fields = all(data_source.get(field) for field in required_fields)
|
|
|
|
|
|
- # 查询是否已存在相同en_name的数据源
|
|
|
+ # 查询是否已存在相同name_en的数据源
|
|
|
existing_cypher = """
|
|
|
- MATCH (ds:DataSource {en_name: $en_name})
|
|
|
+ MATCH (ds:DataSource {name_en: $name_en})
|
|
|
RETURN ds
|
|
|
"""
|
|
|
|
|
|
- existing_result = session.run(existing_cypher, en_name=ds_en_name)
|
|
|
+ existing_result = session.run(existing_cypher, name_en=ds_name_en)
|
|
|
existing_record = existing_result.single()
|
|
|
|
|
|
if existing_record:
|
|
|
existing_data_source = serialize_node_properties(existing_record["ds"])
|
|
|
- logger.info(f"根据名称找到现有数据源: {existing_data_source.get('en_name')}")
|
|
|
- return existing_data_source.get("en_name")
|
|
|
+ logger.info(f"根据名称找到现有数据源: {existing_data_source.get('name_en')}")
|
|
|
+ return existing_data_source.get("name_en")
|
|
|
else:
|
|
|
# 数据源不存在,抛出异常
|
|
|
- raise ValueError(f"未找到名称为 {ds_en_name} 的数据源,请先创建该数据源或提供完整的数据源信息")
|
|
|
+ raise ValueError(f"未找到名称为 {ds_name_en} 的数据源,请先创建该数据源或提供完整的数据源信息")
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"处理数据源失败: {str(e)}")
|