|
|
@@ -876,19 +876,62 @@ class DataProductService:
|
|
|
|
|
|
# 存储从目标节点提取的键值信息,用于检索上游节点数据
|
|
|
# 格式: {name_zh: value, name_en: value, ...}
|
|
|
+ # 包含主元数据和所有别名元数据的名称映射到同一个值
|
|
|
key_field_values: dict[str, Any] = {}
|
|
|
|
|
|
+ def get_all_alias_names(meta_id: int) -> list[dict[str, str]]:
|
|
|
+ """
|
|
|
+ 获取元数据及其所有别名(包括主元数据和别名元数据)的名称
|
|
|
+
|
|
|
+ 查询逻辑:
|
|
|
+ 1. 如果该元数据是别名,先找到主元数据: (meta)-[:ALIAS]->(primary)
|
|
|
+ 2. 然后找到主元数据的所有别名: (alias)-[:ALIAS]->(primary)
|
|
|
+ 3. 如果该元数据本身就是主元数据,直接找其所有别名
|
|
|
+ 4. 返回所有相关元数据的 name_zh 和 name_en
|
|
|
+
|
|
|
+ Args:
|
|
|
+ meta_id: 元数据节点 ID
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 包含所有相关元数据名称的列表 [{"name_zh": ..., "name_en": ...}, ...]
|
|
|
+ """
|
|
|
+ # 查询:获取元数据本身、其主元数据(如果是别名)、以及所有别名
|
|
|
+ alias_query = """
|
|
|
+ MATCH (meta:DataMeta) WHERE id(meta) = $meta_id
|
|
|
+ // 先尝试找主元数据(如果当前是别名)
|
|
|
+ OPTIONAL MATCH (meta)-[:ALIAS]->(primary:DataMeta)
|
|
|
+ // 确定真正的主元数据:如果有 primary 则用 primary,否则 meta 本身就是主元数据
|
|
|
+ WITH meta, COALESCE(primary, meta) as real_primary
|
|
|
+ // 找到主元数据的所有别名
|
|
|
+ OPTIONAL MATCH (alias:DataMeta)-[:ALIAS]->(real_primary)
|
|
|
+ // 收集所有相关元数据:主元数据 + 所有别名(包括原始 meta,如果它是别名的话)
|
|
|
+ WITH real_primary, collect(DISTINCT alias) as aliases
|
|
|
+ WITH real_primary, aliases + [real_primary] as all_metas
|
|
|
+ UNWIND all_metas as m
|
|
|
+ WITH DISTINCT m
|
|
|
+ WHERE m IS NOT NULL
|
|
|
+ RETURN m.name_zh as name_zh, m.name_en as name_en
|
|
|
+ """
|
|
|
+ results = session.run(alias_query, {"meta_id": meta_id}).data()
|
|
|
+ return [
|
|
|
+ {"name_zh": r.get("name_zh", ""), "name_en": r.get("name_en", "")}
|
|
|
+ for r in results
|
|
|
+ if r.get("name_zh") or r.get("name_en")
|
|
|
+ ]
|
|
|
+
|
|
|
def extract_key_fields_from_target(
|
|
|
fields: list[dict[str, Any]],
|
|
|
) -> dict[str, Any]:
|
|
|
"""
|
|
|
从目标节点的字段中提取有"键值"标签的字段及其对应的值
|
|
|
+ 同时考虑 ALIAS 别名关系,获取主元数据和所有别名的名称
|
|
|
|
|
|
Args:
|
|
|
fields: 目标节点的字段列表
|
|
|
|
|
|
Returns:
|
|
|
键值字段名与值的映射 {field_name: value}
|
|
|
+ 包含主元数据和所有别名元数据的名称,都映射到同一个值
|
|
|
"""
|
|
|
key_values: dict[str, Any] = {}
|
|
|
for field in fields:
|
|
|
@@ -900,15 +943,40 @@ class DataProductService:
|
|
|
if is_key_field:
|
|
|
name_zh = field.get("name_zh", "")
|
|
|
name_en = field.get("name_en", "")
|
|
|
+ meta_id = field.get("meta_id")
|
|
|
+
|
|
|
# 从 sample_data 中获取键值字段的值
|
|
|
+ key_value = None
|
|
|
if name_zh and name_zh in sample_data:
|
|
|
- key_values[name_zh] = sample_data[name_zh]
|
|
|
- if name_en:
|
|
|
- key_values[name_en] = sample_data[name_zh]
|
|
|
+ key_value = sample_data[name_zh]
|
|
|
elif name_en and name_en in sample_data:
|
|
|
- key_values[name_en] = sample_data[name_en]
|
|
|
+ key_value = sample_data[name_en]
|
|
|
+
|
|
|
+ if key_value is not None:
|
|
|
+ # 添加当前字段的名称映射
|
|
|
if name_zh:
|
|
|
- key_values[name_zh] = sample_data[name_en]
|
|
|
+ key_values[name_zh] = key_value
|
|
|
+ if name_en:
|
|
|
+ key_values[name_en] = key_value
|
|
|
+
|
|
|
+ # 如果有 meta_id,查询所有别名的名称并添加映射
|
|
|
+ if meta_id:
|
|
|
+ alias_names = get_all_alias_names(meta_id)
|
|
|
+ for alias in alias_names:
|
|
|
+ alias_zh = alias.get("name_zh", "")
|
|
|
+ alias_en = alias.get("name_en", "")
|
|
|
+ if alias_zh and alias_zh not in key_values:
|
|
|
+ key_values[alias_zh] = key_value
|
|
|
+ if alias_en and alias_en not in key_values:
|
|
|
+ key_values[alias_en] = key_value
|
|
|
+
|
|
|
+ logger.debug(
|
|
|
+ f"键值字段 '{name_zh or name_en}' 的别名映射: "
|
|
|
+ f"meta_id={meta_id}, "
|
|
|
+ f"alias_count={len(alias_names)}, "
|
|
|
+ f"all_names={[a.get('name_zh') or a.get('name_en') for a in alias_names]}"
|
|
|
+ )
|
|
|
+
|
|
|
return key_values
|
|
|
|
|
|
def query_matched_data_by_keys(
|
|
|
@@ -916,7 +984,7 @@ class DataProductService:
|
|
|
bd_name_en: str,
|
|
|
fields: list[dict[str, Any]],
|
|
|
key_values: dict[str, Any],
|
|
|
- ) -> dict[str, Any]:
|
|
|
+ ) -> list[dict[str, Any]]:
|
|
|
"""
|
|
|
根据键值从 BusinessDomain 对应的数据表中检索匹配数据
|
|
|
|
|
|
@@ -927,10 +995,10 @@ class DataProductService:
|
|
|
key_values: 键值字段名与值的映射
|
|
|
|
|
|
Returns:
|
|
|
- 匹配的数据,格式为 {field_name: value, ...}
|
|
|
+ 匹配的数据列表,格式为 [{field_name: value, ...}, ...]
|
|
|
"""
|
|
|
if not key_values or not bd_name_en:
|
|
|
- return {}
|
|
|
+ return []
|
|
|
|
|
|
try:
|
|
|
# 查找该 BusinessDomain 关联的数据源
|
|
|
@@ -959,10 +1027,8 @@ class DataProductService:
|
|
|
).scalar()
|
|
|
|
|
|
if not exists:
|
|
|
- logger.debug(
|
|
|
- f"表 {schema}.{table_name} 不存在,跳过数据检索"
|
|
|
- )
|
|
|
- return {}
|
|
|
+ logger.debug(f"表 {schema}.{table_name} 不存在,跳过数据检索")
|
|
|
+ return []
|
|
|
|
|
|
# 获取该表的实际列名
|
|
|
columns_sql = text(
|
|
|
@@ -1014,39 +1080,38 @@ class DataProductService:
|
|
|
logger.debug(
|
|
|
f"表 {schema}.{table_name} 没有匹配的键值字段,跳过数据检索"
|
|
|
)
|
|
|
- return {}
|
|
|
+ return []
|
|
|
|
|
|
# 构建并执行查询
|
|
|
where_clause = " AND ".join(where_conditions)
|
|
|
query_sql = text(
|
|
|
- f'SELECT * FROM "{schema}"."{table_name}" '
|
|
|
- f"WHERE {where_clause} LIMIT 1"
|
|
|
+ f'SELECT * FROM "{schema}"."{table_name}" WHERE {where_clause}'
|
|
|
)
|
|
|
result = db.session.execute(query_sql, params)
|
|
|
- row = result.fetchone()
|
|
|
+ rows = result.fetchall()
|
|
|
|
|
|
- if row:
|
|
|
- # 将查询结果转换为字典
|
|
|
+ if rows:
|
|
|
+ # 将所有查询结果转换为字典列表
|
|
|
column_names = list(result.keys())
|
|
|
- matched_data = dict(zip(column_names, row))
|
|
|
+ matched_data_list = [dict(zip(column_names, row)) for row in rows]
|
|
|
logger.debug(
|
|
|
- f"从表 {schema}.{table_name} 检索到匹配数据: "
|
|
|
+ f"从表 {schema}.{table_name} 检索到 {len(matched_data_list)} 条匹配数据: "
|
|
|
f"keys={list(params.values())}"
|
|
|
)
|
|
|
- return matched_data
|
|
|
+ return matched_data_list
|
|
|
else:
|
|
|
logger.debug(
|
|
|
f"表 {schema}.{table_name} 未找到匹配数据: "
|
|
|
f"conditions={where_conditions}"
|
|
|
)
|
|
|
- return {}
|
|
|
+ return []
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.warning(
|
|
|
f"从表检索数据失败: bd_id={bd_id}, table={bd_name_en}, "
|
|
|
f"error={str(e)}"
|
|
|
)
|
|
|
- return {}
|
|
|
+ return []
|
|
|
|
|
|
def get_business_domain_node(
|
|
|
bd_id: int, depth: int, is_target: bool = False
|
|
|
@@ -1090,9 +1155,10 @@ class DataProductService:
|
|
|
bd_name_en = bd_node.get("name_en", "")
|
|
|
|
|
|
# 根据是否为目标节点,确定 matched_data 的获取方式
|
|
|
+ # matched_data 统一为列表格式
|
|
|
if is_target:
|
|
|
- # 目标节点:直接使用上传的 sample_data
|
|
|
- matched_data = sample_data.copy() if sample_data else {}
|
|
|
+ # 目标节点:直接使用上传的 sample_data(包装为列表)
|
|
|
+ matched_data = [sample_data.copy()] if sample_data else []
|
|
|
# 提取键值字段的值,用于后续检索上游节点数据
|
|
|
key_field_values = extract_key_fields_from_target(fields)
|
|
|
logger.info(
|
|
|
@@ -1100,7 +1166,7 @@ class DataProductService:
|
|
|
f"key_fields={list(key_field_values.keys())}"
|
|
|
)
|
|
|
else:
|
|
|
- # 非目标节点:使用键值在对应数据表中检索数据
|
|
|
+ # 非目标节点:使用键值在对应数据表中检索数据(返回列表)
|
|
|
matched_data = query_matched_data_by_keys(
|
|
|
bd_id=bd_id,
|
|
|
bd_name_en=bd_name_en,
|
|
|
@@ -2212,9 +2278,7 @@ class DataOrderService:
|
|
|
m.name_en as name_en,
|
|
|
m.data_type as data_type
|
|
|
"""
|
|
|
- meta_results = session.run(
|
|
|
- meta_query, {"bd_id": domain_id}
|
|
|
- ).data()
|
|
|
+ meta_results = session.run(meta_query, {"bd_id": domain_id}).data()
|
|
|
|
|
|
for meta in meta_results:
|
|
|
name_zh = meta.get("name_zh", "").strip()
|
|
|
@@ -2576,9 +2640,7 @@ class DataOrderService:
|
|
|
final_name_en = new_name_en
|
|
|
existing_meta_names.add(final_name_zh)
|
|
|
|
|
|
- logger.info(
|
|
|
- f"计算字段名称冲突,重命名: {name_zh} -> {final_name_zh}"
|
|
|
- )
|
|
|
+ logger.info(f"计算字段名称冲突,重命名: {name_zh} -> {final_name_zh}")
|
|
|
|
|
|
# 使用 MERGE 创建或复用 DataMeta 节点
|
|
|
meta_merge_query = """
|