|
|
@@ -54,14 +54,24 @@ class DataFlowService:
|
|
|
)
|
|
|
params['search'] = search
|
|
|
|
|
|
- # 查询数据流列表
|
|
|
+ # 查询数据流列表(包含标签数组)
|
|
|
+ # 使用WITH子句先分页,再聚合标签,避免分页结果不准确
|
|
|
query = f"""
|
|
|
MATCH (n:DataFlow)
|
|
|
{where_clause}
|
|
|
- RETURN n, id(n) as node_id
|
|
|
+ WITH n
|
|
|
ORDER BY n.created_at DESC
|
|
|
SKIP $skip
|
|
|
LIMIT $limit
|
|
|
+ OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel)
|
|
|
+ RETURN n, id(n) as node_id,
|
|
|
+ n.created_at as created_at,
|
|
|
+ collect({{
|
|
|
+ id: id(label),
|
|
|
+ name_zh: label.name_zh,
|
|
|
+ name_en: label.name_en
|
|
|
+ }}) as tags
|
|
|
+ ORDER BY created_at DESC
|
|
|
"""
|
|
|
|
|
|
# 获取Neo4j驱动(如果连接失败会抛出ConnectionError异常)
|
|
|
@@ -95,6 +105,12 @@ class DataFlowService:
|
|
|
node = record['n']
|
|
|
dataflow = dict(node)
|
|
|
dataflow['id'] = record['node_id'] # 使用查询返回的node_id
|
|
|
+ # 处理标签数组,过滤掉空标签
|
|
|
+ tags = record.get('tags', [])
|
|
|
+ dataflow['tag'] = [
|
|
|
+ tag for tag in tags
|
|
|
+ if tag.get('id') is not None
|
|
|
+ ]
|
|
|
dataflows.append(dataflow)
|
|
|
|
|
|
return {
|
|
|
@@ -122,11 +138,17 @@ class DataFlowService:
|
|
|
数据流详情字典,如果不存在则返回None
|
|
|
"""
|
|
|
try:
|
|
|
- # 从Neo4j获取DataFlow节点的所有属性
|
|
|
+ # 从Neo4j获取DataFlow节点的所有属性(包含标签数组)
|
|
|
neo4j_query = """
|
|
|
MATCH (n:DataFlow)
|
|
|
WHERE id(n) = $dataflow_id
|
|
|
- RETURN n, id(n) as node_id
|
|
|
+ OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel)
|
|
|
+ RETURN n, id(n) as node_id,
|
|
|
+ collect({
|
|
|
+ id: id(label),
|
|
|
+ name_zh: label.name_zh,
|
|
|
+ name_en: label.name_en
|
|
|
+ }) as tags
|
|
|
"""
|
|
|
|
|
|
with connect_graph().session() as session:
|
|
|
@@ -144,6 +166,13 @@ class DataFlowService:
|
|
|
dataflow = dict(node)
|
|
|
dataflow['id'] = record['node_id']
|
|
|
|
|
|
+ # 处理标签数组,过滤掉空标签
|
|
|
+ tags = record.get('tags', [])
|
|
|
+ dataflow['tag'] = [
|
|
|
+ tag for tag in tags
|
|
|
+ if tag.get('id') is not None
|
|
|
+ ]
|
|
|
+
|
|
|
# 处理 script_requirement:如果是JSON字符串,解析为对象
|
|
|
script_requirement_str = dataflow.get('script_requirement', '')
|
|
|
if script_requirement_str:
|
|
|
@@ -220,7 +249,7 @@ class DataFlowService:
|
|
|
else:
|
|
|
script_requirement_str = ''
|
|
|
|
|
|
- # 准备节点数据
|
|
|
+ # 准备节点数据(tag不作为节点属性存储,而是通过LABEL关系关联)
|
|
|
node_data = {
|
|
|
'name_zh': dataflow_name,
|
|
|
'name_en': name_en,
|
|
|
@@ -228,7 +257,6 @@ class DataFlowService:
|
|
|
'organization': data.get('organization', ''),
|
|
|
'leader': data.get('leader', ''),
|
|
|
'frequency': data.get('frequency', ''),
|
|
|
- 'tag': data.get('tag', ''),
|
|
|
'describe': data.get('describe', ''),
|
|
|
'status': data.get('status', 'inactive'),
|
|
|
'update_mode': data.get('update_mode', 'append'),
|
|
|
@@ -245,12 +273,12 @@ class DataFlowService:
|
|
|
|
|
|
dataflow_id = create_or_get_node('DataFlow', **node_data)
|
|
|
|
|
|
- # 处理标签关系
|
|
|
- tag_id = data.get('tag')
|
|
|
- if tag_id is not None:
|
|
|
+ # 处理标签关系(支持多标签数组)
|
|
|
+ tag_list = data.get('tag', [])
|
|
|
+ if tag_list:
|
|
|
try:
|
|
|
- DataFlowService._handle_tag_relationship(
|
|
|
- dataflow_id, tag_id)
|
|
|
+ DataFlowService._handle_tag_relationships(
|
|
|
+ dataflow_id, tag_list)
|
|
|
except Exception as e:
|
|
|
logger.warning(f"处理标签关系时出错: {str(e)}")
|
|
|
|
|
|
@@ -747,8 +775,35 @@ class DataFlowService:
|
|
|
logger.warning(f"创建子节点关系失败 {child_id}: {str(e)}")
|
|
|
|
|
|
@staticmethod
|
|
|
- def _handle_tag_relationship(dataflow_id, tag_id):
|
|
|
- """处理标签关系"""
|
|
|
+ def _handle_tag_relationships(dataflow_id, tag_list):
|
|
|
+ """
|
|
|
+ 处理多标签关系
|
|
|
+
|
|
|
+ Args:
|
|
|
+ dataflow_id: 数据流节点ID
|
|
|
+ tag_list: 标签列表,可以是ID数组或包含id字段的对象数组
|
|
|
+ """
|
|
|
+ # 确保tag_list是列表格式
|
|
|
+ if not isinstance(tag_list, list):
|
|
|
+ tag_list = [tag_list] if tag_list else []
|
|
|
+
|
|
|
+ for tag_item in tag_list:
|
|
|
+ tag_id = None
|
|
|
+ if isinstance(tag_item, dict) and 'id' in tag_item:
|
|
|
+ tag_id = int(tag_item['id'])
|
|
|
+ elif isinstance(tag_item, (int, str)):
|
|
|
+ try:
|
|
|
+ tag_id = int(tag_item)
|
|
|
+ except (ValueError, TypeError):
|
|
|
+ pass
|
|
|
+
|
|
|
+ if tag_id:
|
|
|
+ DataFlowService._handle_single_tag_relationship(
|
|
|
+ dataflow_id, tag_id)
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _handle_single_tag_relationship(dataflow_id, tag_id):
|
|
|
+ """处理单个标签关系"""
|
|
|
try:
|
|
|
# 查找标签节点
|
|
|
query = "MATCH (n:DataLabel) WHERE id(n) = $tag_id RETURN n"
|
|
|
@@ -823,8 +878,12 @@ class DataFlowService:
|
|
|
|
|
|
result = session.run(update_query, params).data()
|
|
|
|
|
|
- # 处理 tag 关系
|
|
|
- if tag_list is not None and isinstance(tag_list, list):
|
|
|
+ # 处理 tag 关系(支持多标签数组)
|
|
|
+ if tag_list is not None:
|
|
|
+ # 确保是列表格式
|
|
|
+ if not isinstance(tag_list, list):
|
|
|
+ tag_list = [tag_list] if tag_list else []
|
|
|
+
|
|
|
# 先删除现有的 LABEL 关系
|
|
|
delete_query = """
|
|
|
MATCH (n:DataFlow)-[r:LABEL]->(:DataLabel)
|
|
|
@@ -846,7 +905,7 @@ class DataFlowService:
|
|
|
pass
|
|
|
|
|
|
if tag_id:
|
|
|
- DataFlowService._handle_tag_relationship(
|
|
|
+ DataFlowService._handle_single_tag_relationship(
|
|
|
dataflow_id, tag_id
|
|
|
)
|
|
|
|
|
|
@@ -1352,11 +1411,15 @@ class DataFlowService:
|
|
|
MATCH (bd:BusinessDomain)
|
|
|
WHERE id(bd) = $bd_id
|
|
|
OPTIONAL MATCH (bd)-[:INCLUDES]->(m:DataMeta)
|
|
|
- OPTIONAL MATCH (bd)-[:BELONGS_TO]->(label:DataLabel)
|
|
|
+ OPTIONAL MATCH (bd)-[:LABEL]->(label:DataLabel)
|
|
|
OPTIONAL MATCH (bd)-[:COME_FROM]->(ds:DataSource)
|
|
|
RETURN bd,
|
|
|
collect(DISTINCT m) as metadata,
|
|
|
- label.name_zh as label_name,
|
|
|
+ collect(DISTINCT {
|
|
|
+ id: id(label),
|
|
|
+ name_zh: label.name_zh,
|
|
|
+ name_en: label.name_en
|
|
|
+ }) as labels,
|
|
|
ds.type as ds_type,
|
|
|
ds.host as ds_host,
|
|
|
ds.port as ds_port,
|
|
|
@@ -1370,7 +1433,14 @@ class DataFlowService:
|
|
|
|
|
|
node = result['bd']
|
|
|
metadata = result['metadata']
|
|
|
- label_name = result['label_name']
|
|
|
+ # 处理标签数组,获取第一个有效标签名称用于判断
|
|
|
+ labels = result.get('labels', [])
|
|
|
+ valid_labels = [
|
|
|
+ label for label in labels if label.get('id') is not None
|
|
|
+ ]
|
|
|
+ label_name = (
|
|
|
+ valid_labels[0].get('name_zh') if valid_labels else None
|
|
|
+ )
|
|
|
|
|
|
# 生成DDL
|
|
|
node_props = dict(node)
|
|
|
@@ -1583,26 +1653,37 @@ class DataFlowService:
|
|
|
logger.info("开始查询BusinessDomain节点列表")
|
|
|
|
|
|
with connect_graph().session() as session:
|
|
|
- # 查询所有BusinessDomain节点及其BELONGS_TO关系指向的标签
|
|
|
+ # 查询所有BusinessDomain节点及其LABEL关系指向的标签(支持多标签)
|
|
|
query = """
|
|
|
MATCH (bd:BusinessDomain)
|
|
|
- OPTIONAL MATCH (bd)-[:BELONGS_TO]->(label:DataLabel)
|
|
|
+ OPTIONAL MATCH (bd)-[:LABEL]->(label:DataLabel)
|
|
|
RETURN id(bd) as id,
|
|
|
bd.name_zh as name_zh,
|
|
|
bd.name_en as name_en,
|
|
|
- label.name_zh as tag
|
|
|
- ORDER BY bd.create_time DESC
|
|
|
+ bd.create_time as create_time,
|
|
|
+ collect({
|
|
|
+ id: id(label),
|
|
|
+ name_zh: label.name_zh,
|
|
|
+ name_en: label.name_en
|
|
|
+ }) as tags
|
|
|
+ ORDER BY create_time DESC
|
|
|
"""
|
|
|
|
|
|
result = session.run(query)
|
|
|
|
|
|
bd_list = []
|
|
|
for record in result:
|
|
|
+ # 处理标签数组,过滤掉空标签
|
|
|
+ tags = record.get("tags", [])
|
|
|
+ tag_list = [
|
|
|
+ tag for tag in tags
|
|
|
+ if tag.get('id') is not None
|
|
|
+ ]
|
|
|
bd_item = {
|
|
|
"id": record["id"],
|
|
|
"name_zh": record.get("name_zh", "") or "",
|
|
|
"name_en": record.get("name_en", "") or "",
|
|
|
- "tag": record.get("tag", "") or "",
|
|
|
+ "tag": tag_list,
|
|
|
}
|
|
|
bd_list.append(bd_item)
|
|
|
|