Selaa lähdekoodia

修改dataflow_id,dataflow_name,name_en等字段。
修改script关系创建。
修改判断是否存在关系的函数,使用ID判断。

maxiaolong 3 viikkoa sitten
vanhempi
commit
2c487849ef

+ 1 - 1
app/api/data_parse/routes.py

@@ -1,6 +1,6 @@
 from flask import jsonify, request, make_response, Blueprint, current_app, send_file
 from app.api.data_parse import bp
-from app.core.data_parse.parse import process_business_card, update_business_card, get_business_cards, update_business_card_status, create_talent_tag, get_talent_tag_list, update_talent_tag, delete_talent_tag, query_neo4j_graph, talent_get_tags, talent_update_tags, get_business_card, get_hotel_positions_list, add_hotel_positions, update_hotel_positions, query_hotel_positions, delete_hotel_positions, get_hotel_group_brands_list, add_hotel_group_brands, update_hotel_group_brands, query_hotel_group_brands, delete_hotel_group_brands, get_duplicate_records, process_duplicate_record, get_duplicate_record_detail, fix_broken_duplicate_records
+from app.core.data_parse.parse import update_business_card, get_business_cards, update_business_card_status, create_talent_tag, get_talent_tag_list, update_talent_tag, delete_talent_tag, query_neo4j_graph, talent_get_tags, talent_update_tags, get_business_card, get_hotel_positions_list, add_hotel_positions, update_hotel_positions, query_hotel_positions, delete_hotel_positions, get_hotel_group_brands_list, add_hotel_group_brands, update_hotel_group_brands, query_hotel_group_brands, delete_hotel_group_brands, get_duplicate_records, process_duplicate_record, get_duplicate_record_detail, fix_broken_duplicate_records
 # 导入新的名片图片解析函数和添加名片函数
 from app.core.data_parse.parse_card import process_business_card_image, add_business_card, delete_business_card
 from app.config.config import DevelopmentConfig, ProductionConfig

+ 284 - 123
app/core/data_flow/dataflows.py

@@ -4,8 +4,7 @@ from datetime import datetime
 import json
 from app.core.llm.llm_service import llm_client
 from app.core.graph.graph_operations import connect_graph, create_or_get_node, get_node, relationship_exists
-from app.core.meta_data import translate_and_parse
-from app.core.common.functions import get_formatted_time
+from app.core.meta_data import translate_and_parse, get_formatted_time
 from py2neo import Relationship
 from app import db
 from sqlalchemy import text
@@ -44,29 +43,31 @@ class DataFlowService:
             query = f"""
             MATCH (n:DataFlow)
             {where_clause}
-            RETURN n
+            RETURN n, id(n) as node_id
             ORDER BY n.created_at DESC
             SKIP $skip
             LIMIT $limit
             """
             
-            list_result = connect_graph.run(query, **params).data()
-            
-            # 查询总数
-            count_query = f"""
-            MATCH (n:DataFlow)
-            {where_clause}
-            RETURN count(n) as total
-            """
-            count_params = {'search': search} if search else {}
-            total = connect_graph.run(count_query, **count_params).evaluate() or 0
+            with connect_graph().session() as session:
+                list_result = session.run(query, **params).data()
+                
+                # 查询总数
+                count_query = f"""
+                MATCH (n:DataFlow)
+                {where_clause}
+                RETURN count(n) as total
+                """
+                count_params = {'search': search} if search else {}
+                count_result = session.run(count_query, **count_params).single()
+                total = count_result['total'] if count_result else 0
             
             # 格式化结果
             dataflows = []
             for record in list_result:
                 node = record['n']
                 dataflow = dict(node)
-                dataflow['id'] = node.identity
+                dataflow['id'] = record['node_id']  # 使用查询返回的node_id
                 dataflows.append(dataflow)
             
             return {
@@ -100,26 +101,27 @@ class DataFlowService:
             OPTIONAL MATCH (n)-[:label]-(la:data_label)
             OPTIONAL MATCH (n)-[:child]-(child)
             OPTIONAL MATCH (parent)-[:child]-(n)
-            RETURN n, 
+            RETURN n, id(n) as node_id,
                    collect(DISTINCT {id: id(la), name: la.name}) as tags,
                    collect(DISTINCT {id: id(child), name: child.name}) as children,
                    collect(DISTINCT {id: id(parent), name: parent.name}) as parents
             """
             
-            result = connect_graph.run(query, dataflow_id=dataflow_id).data()
-            
-            if not result:
-                return None
-            
-            record = result[0]
-            node = record['n']
-            dataflow = dict(node)
-            dataflow['id'] = node.identity
-            dataflow['tags'] = record['tags']
-            dataflow['children'] = record['children']
-            dataflow['parents'] = record['parents']
-            
-            return dataflow
+            with connect_graph().session() as session:
+                result = session.run(query, dataflow_id=dataflow_id).data()
+                
+                if not result:
+                    return None
+                
+                record = result[0]
+                node = record['n']
+                dataflow = dict(node)
+                dataflow['id'] = record['node_id']  # 使用查询返回的node_id
+                dataflow['tags'] = record['tags']
+                dataflow['children'] = record['children']
+                dataflow['parents'] = record['parents']
+                
+                return dataflow
         except Exception as e:
             logger.error(f"获取数据流详情失败: {str(e)}")
             raise e
@@ -137,7 +139,7 @@ class DataFlowService:
         """
         try:
             # 验证必填字段
-            required_fields = ['name', 'description']
+            required_fields = ['name', 'describe']
             for field in required_fields:
                 if field not in data:
                     raise ValueError(f"缺少必填字段: {field}")
@@ -147,50 +149,78 @@ class DataFlowService:
             # 使用LLM翻译名称生成英文名
             try:
                 result_list = translate_and_parse(dataflow_name)
-                en_name = result_list[0] if result_list else dataflow_name.lower().replace(' ', '_')
+                name_en = result_list[0] if result_list else dataflow_name.lower().replace(' ', '_')
             except Exception as e:
                 logger.warning(f"翻译失败,使用默认英文名: {str(e)}")
-                en_name = dataflow_name.lower().replace(' ', '_')
+                name_en = dataflow_name.lower().replace(' ', '_')
             
             # 准备节点数据
             node_data = {
-                'name': dataflow_name,
-                'en_name': en_name,
-                'description': data.get('description', ''),
+                'name_zh': dataflow_name,
+                'name_en': name_en,
+                'category': data.get('category', ''),
+                'organization': data.get('organization', ''),
+                'leader': data.get('leader', ''),
+                'frequency': data.get('frequency', ''),
+                'tag': data.get('tag', ''),
+                'describe': data.get('describe', ''),
                 'status': data.get('status', 'inactive'),
                 'created_at': get_formatted_time(),
-                'updated_at': get_formatted_time(),
-                'created_by': data.get('created_by', 'system'),
-                'config': json.dumps(data.get('config', {}), ensure_ascii=False)
-            }
+                'updated_at': get_formatted_time()
+            }  
             
             # 创建或获取数据流节点
-            dataflow_node = get_node('DataFlow', name=dataflow_name)
-            if dataflow_node:
+            dataflow_id = get_node('DataFlow', name=dataflow_name)
+            if dataflow_id:
                 raise ValueError(f"数据流 '{dataflow_name}' 已存在")
             
-            dataflow_node = create_or_get_node('DataFlow', **node_data)
-            
-            # 处理子节点关系
-            if data.get('children_ids'):
-                DataFlowService._handle_children_relationships(dataflow_node, data['children_ids'])
+            dataflow_id = create_or_get_node('DataFlow', **node_data)
             
             # 处理标签关系
-            if data.get('tag_id'):
-                DataFlowService._handle_tag_relationship(dataflow_node, data['tag_id'])
+            tag_id = data.get('tag')
+            if tag_id is not None:
+                try:
+                    DataFlowService._handle_tag_relationship(dataflow_id, tag_id)
+                except Exception as e:
+                    logger.warning(f"处理标签关系时出错: {str(e)}")
             
             # 成功创建图数据库节点后,写入PG数据库
             try:
-                DataFlowService._save_to_pg_database(data, dataflow_name, en_name)
+                DataFlowService._save_to_pg_database(data, dataflow_name, name_en)
                 logger.info(f"数据流信息已写入PG数据库: {dataflow_name}")
+                
+                # PG数据库记录成功写入后,在neo4j图数据库中创建script关系
+                try:
+                    DataFlowService._handle_script_relationships(data,dataflow_name,name_en)
+                    logger.info(f"脚本关系创建成功: {dataflow_name}")
+                except Exception as script_error:
+                    logger.warning(f"创建脚本关系失败: {str(script_error)}")
+                    
             except Exception as pg_error:
                 logger.error(f"写入PG数据库失败: {str(pg_error)}")
                 # 注意:这里可以选择回滚图数据库操作,但目前保持图数据库数据
                 # 在实际应用中,可能需要考虑分布式事务
                 
             # 返回创建的数据流信息
-            result = dict(dataflow_node)
-            result['id'] = dataflow_node.identity
+            # 查询创建的节点获取完整信息
+            query = "MATCH (n:DataFlow {name: $name}) RETURN n, id(n) as node_id"
+            with connect_graph().session() as session:
+                id_result = session.run(query, name=dataflow_name).single()
+                if id_result:
+                    dataflow_node = id_result['n']
+                    node_id = id_result['node_id']
+                    
+                    # 将节点属性转换为字典
+                    result = dict(dataflow_node)
+                    result['id'] = node_id
+                else:
+                    # 如果查询失败,返回基本信息
+                    result = {
+                        'id': dataflow_id if isinstance(dataflow_id, int) else None,
+                        'name': dataflow_name,
+                        'name_en': name_en,
+                        'created_at': get_formatted_time()
+                    }
             
             logger.info(f"创建数据流成功: {dataflow_name}")
             return result
@@ -200,7 +230,7 @@ class DataFlowService:
             raise e
     
     @staticmethod
-    def _save_to_pg_database(data: Dict[str, Any], script_name: str, en_name: str):
+    def _save_to_pg_database(data: Dict[str, Any], script_name: str, name_en: str):
         """
         将脚本信息保存到PG数据库
         
@@ -214,14 +244,14 @@ class DataFlowService:
             script_requirement = data.get('script_requirement', '')
             script_content = data.get('script_content', '')
             source_table = data.get('source_table', '')
-            target_table = data.get('target_table', en_name)  # 如果没有指定目标表,使用英文名
+            target_table = data.get('target_table', name_en)  # 如果没有指定目标表,使用英文名
             script_type = data.get('script_type', 'python')
             user_name = data.get('created_by', 'system')
             target_dt_column = data.get('target_dt_column', '')
             
             # 验证必需字段
             if not target_table:
-                target_table = en_name
+                target_table = name_en
             if not script_name:
                 raise ValueError("script_name不能为空")
             
@@ -273,35 +303,60 @@ class DataFlowService:
     @staticmethod
     def _handle_children_relationships(dataflow_node, children_ids):
         """处理子节点关系"""
+        logger.debug(f"处理子节点关系,原始children_ids: {children_ids}, 类型: {type(children_ids)}")
+        
+        # 确保children_ids是列表格式
+        if not isinstance(children_ids, (list, tuple)):
+            if children_ids is not None:
+                children_ids = [children_ids]  # 如果是单个值,转换为列表
+                logger.debug(f"将单个值转换为列表: {children_ids}")
+            else:
+                children_ids = []  # 如果是None,转换为空列表
+                logger.debug("将None转换为空列表")
+        
         for child_id in children_ids:
             try:
                 # 查找子节点
                 query = "MATCH (n) WHERE id(n) = $child_id RETURN n"
-                result = connect_graph.run(query, child_id=child_id).data()
-                
-                if result:
-                    child_node = result[0]['n']
-                    # 创建关系
-                    if not relationship_exists(dataflow_node, 'child', child_node):
-                        connect_graph.create(Relationship(dataflow_node, 'child', child_node))
-                        logger.info(f"创建子节点关系: {dataflow_node.identity} -> {child_id}")
+                with connect_graph().session() as session:
+                    result = session.run(query, child_id=child_id).data()
+                    
+                    if result:
+                        child_node = result[0]['n']
+                        
+                        # 获取dataflow_node的ID
+                        dataflow_id = getattr(dataflow_node, 'identity', None)
+                        if dataflow_id is None:
+                            # 如果没有identity属性,从名称查询ID
+                            query_id = "MATCH (n:DataFlow) WHERE n.name = $name RETURN id(n) as node_id"
+                            id_result = session.run(query_id, name=dataflow_node.get('name')).single()
+                            dataflow_id = id_result['node_id'] if id_result else None
+                        
+                        # 创建关系 - 使用ID调用relationship_exists
+                        if dataflow_id and not relationship_exists(dataflow_id, 'child', child_id):
+                            session.run("MATCH (a), (b) WHERE id(a) = $dataflow_id AND id(b) = $child_id CREATE (a)-[:child]->(b)", 
+                                      dataflow_id=dataflow_id, child_id=child_id)
+                            logger.info(f"创建子节点关系: {dataflow_id} -> {child_id}")
             except Exception as e:
                 logger.warning(f"创建子节点关系失败 {child_id}: {str(e)}")
     
     @staticmethod
-    def _handle_tag_relationship(dataflow_node, tag_id):
+    def _handle_tag_relationship(dataflow_id, tag_id):
         """处理标签关系"""
         try:
             # 查找标签节点
             query = "MATCH (n:data_label) WHERE id(n) = $tag_id RETURN n"
-            result = connect_graph.run(query, tag_id=tag_id).data()
-            
-            if result:
-                tag_node = result[0]['n']
-                # 创建关系
-                if not relationship_exists(dataflow_node, 'label', tag_node):
-                    connect_graph.create(Relationship(dataflow_node, 'label', tag_node))
-                    logger.info(f"创建标签关系: {dataflow_node.identity} -> {tag_id}")
+            with connect_graph().session() as session:
+                result = session.run(query, tag_id=tag_id).data()
+                
+                if result:
+                    tag_node = result[0]['n']
+                    
+                    # 创建关系 - 使用ID调用relationship_exists
+                    if dataflow_id and not relationship_exists(dataflow_id, 'label', tag_id):
+                        session.run("MATCH (a), (b) WHERE id(a) = $dataflow_id AND id(b) = $tag_id CREATE (a)-[:label]->(b)", 
+                                  dataflow_id=dataflow_id, tag_id=tag_id)
+                        logger.info(f"创建标签关系: {dataflow_id} -> {tag_id}")
         except Exception as e:
             logger.warning(f"创建标签关系失败 {tag_id}: {str(e)}")
     
@@ -320,43 +375,44 @@ class DataFlowService:
         try:
             # 查找节点
             query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
-            result = connect_graph.run(query, dataflow_id=dataflow_id).data()
-            
-            if not result:
-                return None
-            
-            # 更新节点属性
-            update_fields = []
-            params = {'dataflow_id': dataflow_id}
-            
-            for key, value in data.items():
-                if key not in ['id', 'created_at']:  # 保护字段
-                    if key == 'config' and isinstance(value, dict):
-                        value = json.dumps(value, ensure_ascii=False)
-                    update_fields.append(f"n.{key} = ${key}")
-                    params[key] = value
-            
-            if update_fields:
-                params['updated_at'] = get_formatted_time()
-                update_fields.append("n.updated_at = $updated_at")
+            with connect_graph().session() as session:
+                result = session.run(query, dataflow_id=dataflow_id).data()
                 
-                update_query = f"""
-                MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
-                SET {', '.join(update_fields)}
-                RETURN n
-                """
+                if not result:
+                    return None
                 
-                result = connect_graph.run(update_query, **params).data()
+                # 更新节点属性
+                update_fields = []
+                params = {'dataflow_id': dataflow_id}
                 
-                if result:
-                    node = result[0]['n']
-                    updated_dataflow = dict(node)
-                    updated_dataflow['id'] = node.identity
+                for key, value in data.items():
+                    if key not in ['id', 'created_at']:  # 保护字段
+                        if key == 'config' and isinstance(value, dict):
+                            value = json.dumps(value, ensure_ascii=False)
+                        update_fields.append(f"n.{key} = ${key}")
+                        params[key] = value
+                
+                if update_fields:
+                    params['updated_at'] = get_formatted_time()
+                    update_fields.append("n.updated_at = $updated_at")
                     
-                    logger.info(f"更新数据流成功: ID={dataflow_id}")
-                    return updated_dataflow
-            
-            return None
+                    update_query = f"""
+                    MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
+                    SET {', '.join(update_fields)}
+                    RETURN n, id(n) as node_id
+                    """
+                    
+                    result = session.run(update_query, **params).data()
+                    
+                    if result:
+                        node = result[0]['n']
+                        updated_dataflow = dict(node)
+                        updated_dataflow['id'] = result[0]['node_id']  # 使用查询返回的node_id
+                        
+                        logger.info(f"更新数据流成功: ID={dataflow_id}")
+                        return updated_dataflow
+                
+                return None
             
         except Exception as e:
             logger.error(f"更新数据流失败: {str(e)}")
@@ -381,13 +437,15 @@ class DataFlowService:
             RETURN count(n) as deleted_count
             """
             
-            result = connect_graph.run(query, dataflow_id=dataflow_id).evaluate()
-            
-            if result and result > 0:
-                logger.info(f"删除数据流成功: ID={dataflow_id}")
-                return True
-            
-            return False
+            with connect_graph().session() as session:
+                delete_result = session.run(query, dataflow_id=dataflow_id).single()
+                result = delete_result['deleted_count'] if delete_result else 0
+                
+                if result and result > 0:
+                    logger.info(f"删除数据流成功: ID={dataflow_id}")
+                    return True
+                
+                return False
             
         except Exception as e:
             logger.error(f"删除数据流失败: {str(e)}")
@@ -408,10 +466,11 @@ class DataFlowService:
         try:
             # 检查数据流是否存在
             query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
-            result = connect_graph.run(query, dataflow_id=dataflow_id).data()
-            
-            if not result:
-                raise ValueError(f"数据流不存在: ID={dataflow_id}")
+            with connect_graph().session() as session:
+                result = session.run(query, dataflow_id=dataflow_id).data()
+                
+                if not result:
+                    raise ValueError(f"数据流不存在: ID={dataflow_id}")
             
             execution_id = f"exec_{dataflow_id}_{int(datetime.now().timestamp())}"
             
@@ -447,10 +506,11 @@ class DataFlowService:
             # TODO: 这里应该查询实际的执行状态
             # 目前返回模拟状态
             query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
-            result = connect_graph.run(query, dataflow_id=dataflow_id).data()
-            
-            if not result:
-                raise ValueError(f"数据流不存在: ID={dataflow_id}")
+            with connect_graph().session() as session:
+                result = session.run(query, dataflow_id=dataflow_id).data()
+                
+                if not result:
+                    raise ValueError(f"数据流不存在: ID={dataflow_id}")
             
             status = ['running', 'completed', 'failed', 'pending'][dataflow_id % 4]
             
@@ -483,10 +543,11 @@ class DataFlowService:
             # TODO: 这里应该查询实际的执行日志
             # 目前返回模拟日志
             query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
-            result = connect_graph.run(query, dataflow_id=dataflow_id).data()
-            
-            if not result:
-                raise ValueError(f"数据流不存在: ID={dataflow_id}")
+            with connect_graph().session() as session:
+                result = session.run(query, dataflow_id=dataflow_id).data()
+                
+                if not result:
+                    raise ValueError(f"数据流不存在: ID={dataflow_id}")
             
             mock_logs = [
                 {
@@ -563,4 +624,104 @@ class DataFlowService:
             
         except Exception as e:
             logger.error(f"生成脚本失败: {str(e)}")
+            raise e
+
+    @staticmethod
+    def _handle_script_relationships(data: Dict[str, Any],dataflow_name:str,name_en:str):
+        """
+        处理脚本关系,在Neo4j图数据库中创建从source_table到target_table之间的DERIVED_FROM关系
+        
+        Args:
+            data: 包含脚本信息的数据字典,应包含script_name, script_type, schedule_status, source_table, target_table, update_mode
+        """
+        try:
+            # 从data中读取键值对
+            script_name = dataflow_name,
+            script_type = data.get('script_type', 'sql')
+            schedule_status = data.get('status', 'inactive')
+            source_table = data.get('source_table', '')
+            target_table = data.get('target_table', '')
+            update_mode = data.get('update_mode', 'full')
+            
+            # 验证必要字段
+            if not source_table or not target_table:
+                logger.warning(f"source_table或target_table为空,跳过关系创建: source_table={source_table}, target_table={target_table}")
+                return
+            
+            logger.info(f"开始创建脚本关系: {source_table} -> {target_table}")
+            
+            with connect_graph().session() as session:
+                # 创建或获取source_table节点
+                source_node_query = """
+                MERGE (source:table {name: $source_table})
+                ON CREATE SET source.created_at = $created_at,
+                             source.type = 'source'
+                RETURN source, id(source) as source_id
+                """
+                
+                # 创建或获取target_table节点
+                target_node_query = """
+                MERGE (target:table {name: $target_table})
+                ON CREATE SET target.created_at = $created_at,
+                             target.type = 'target'
+                RETURN target, id(target) as target_id
+                """
+                
+                current_time = get_formatted_time()
+                
+                # 执行创建节点的查询
+                source_result = session.run(source_node_query, 
+                                          source_table=source_table, 
+                                          created_at=current_time).single()
+                target_result = session.run(target_node_query, 
+                                          target_table=target_table, 
+                                          created_at=current_time).single()
+                
+                if source_result and target_result:
+                    source_id = source_result['source_id']
+                    target_id = target_result['target_id']
+                    
+                    # 检查关系是否已存在
+                    relationship_check_query = """
+                    MATCH (source:table)-[r:DERIVED_FROM]-(target:table)
+                    WHERE id(source) = $source_id AND id(target) = $target_id
+                    RETURN r
+                    """
+                    
+                    existing_relationship = session.run(relationship_check_query, 
+                                                      source_id=source_id, 
+                                                      target_id=target_id).single()
+                    
+                    if not existing_relationship:
+                        # 创建DERIVED_FROM关系,从source_table到target_table
+                        create_relationship_query = """
+                        MATCH (source:table), (target:table)
+                        WHERE id(source) = $source_id AND id(target) = $target_id
+                        CREATE (target)-[r:DERIVED_FROM]->(source)
+                        SET r.script_name = $script_name,
+                            r.script_type = $script_type,
+                            r.schedule_status = $schedule_status,
+                            r.update_mode = $update_mode,
+                            r.created_at = $created_at,
+                            r.updated_at = $created_at
+                        RETURN r
+                        """
+                        
+                        session.run(create_relationship_query,
+                                  source_id=source_id,
+                                  target_id=target_id,
+                                  script_name=script_name,
+                                  script_type=script_type,
+                                  schedule_status=schedule_status,
+                                  update_mode=update_mode,
+                                  created_at=current_time)
+                        
+                        logger.info(f"成功创建DERIVED_FROM关系: {target_table} -> {source_table} (script: {script_name})")
+                    else:
+                        logger.info(f"DERIVED_FROM关系已存在: {target_table} -> {source_table}")
+                else:
+                    logger.error(f"创建表节点失败: source_table={source_table}, target_table={target_table}")
+                    
+        except Exception as e:
+            logger.error(f"处理脚本关系失败: {str(e)}")
             raise e 

+ 19 - 13
app/core/data_metric/metric_interface.py

@@ -219,19 +219,23 @@ def handle_data_metric(metric_name, result_list, receiver):
     for child_id in child_list:
         child = get_node_by_id_no_label(child_id)
         # 建立关系:当前节点的childrenId指向,以及关系child
-        if child and not relationship_exists(data_metric_node, 'child', child):
+        if child:
             # 获取节点ID
-            dm_id = data_metric_node.id if hasattr(data_metric_node, 'id') else data_metric_node
-            child_id = child.id if hasattr(child, 'id') else child
-            connect_graph.create(Relationship(data_metric_node, 'child', child))
+            dm_id = data_metric_node.id if hasattr(data_metric_node, 'id') else data_metric_node.identity if hasattr(data_metric_node, 'identity') else None
+            child_node_id = child.id if hasattr(child, 'id') else child.identity if hasattr(child, 'identity') else child_id
+            
+            if dm_id and child_node_id and not relationship_exists(dm_id, 'child', child_node_id):
+                connect_graph.create(Relationship(data_metric_node, 'child', child))
 
     if receiver.get('tag'):
         tag = get_node_by_id('data_label', receiver['tag'])
-        if tag and not relationship_exists(data_metric_node, 'label', tag):
+        if tag:
             # 获取节点ID
-            dm_id = data_metric_node.id if hasattr(data_metric_node, 'id') else data_metric_node
-            tag_id = tag.id if hasattr(tag, 'id') else tag
-            connect_graph.create(Relationship(data_metric_node, 'label', tag))
+            dm_id = data_metric_node.id if hasattr(data_metric_node, 'id') else data_metric_node.identity if hasattr(data_metric_node, 'identity') else None
+            tag_node_id = tag.id if hasattr(tag, 'id') else tag.identity if hasattr(tag, 'identity') else receiver['tag']
+            
+            if dm_id and tag_node_id and not relationship_exists(dm_id, 'label', tag_node_id):
+                connect_graph.create(Relationship(data_metric_node, 'label', tag))
 
     return data_metric_node.id, id_list
 
@@ -586,12 +590,14 @@ def data_metric_edit(data):
     for child_id in child_list:
         child = get_node_by_id_no_label(child_id)
         # 建立关系:当前节点的childrenId指向,以及关系child
-        if child and not relationship_exists(node_a, 'child', child):
+        if child:
             # 获取节点ID
-            node_a_id = node_a.id if hasattr(node_a, 'id') else node_a
-            child_id = child.id if hasattr(child, 'id') else child
-            connection = Relationship(node_a, 'child', child)
-            connect_graph.create(connection)
+            dm_id = node_a.id if hasattr(node_a, 'id') else node_a.identity if hasattr(node_a, 'identity') else None
+            child_node_id = child.id if hasattr(child, 'id') else child.identity if hasattr(child, 'identity') else child_id
+            
+            if dm_id and child_node_id and not relationship_exists(dm_id, 'child', child_node_id):
+                connection = Relationship(node_a, 'child', child)
+                connect_graph.create(connection)
 
     # 处理数据标签及其关系
     if data.get("tag"):

+ 74 - 259
app/core/data_parse/parse.py

@@ -854,6 +854,22 @@ def parse_text_with_qwen25VLplus(image_data):
             if field not in extracted_data:
                 extracted_data[field] = [] if field == 'career_path' else ""
         
+        # 为career_path增加一条记录
+        if extracted_data.get('hotel_zh') or extracted_data.get('hotel_en') or extracted_data.get('title_zh') or extracted_data.get('title_en'):
+            career_entry = {
+                'date': datetime.now().strftime('%Y-%m-%d'),
+                'hotel_en': extracted_data.get('hotel_en', ''),
+                'hotel_zh': extracted_data.get('hotel_zh', ''),
+                'image_path': '',
+                'source': 'business_card_creation',
+                'title_en': extracted_data.get('title_en', ''),
+                'title_zh': extracted_data.get('title_zh', '')
+            }
+            
+            # 直接清空原有的career_path内容,用career_entry写入
+            extracted_data['career_path'] = [career_entry]
+            logging.info(f"为解析结果设置了career_path记录: {career_entry}")
+        
         return extracted_data
         
     except Exception as e:
@@ -861,265 +877,6 @@ def parse_text_with_qwen25VLplus(image_data):
         logging.error(error_msg, exc_info=True)
         raise Exception(error_msg)
 
-def process_business_card(image_file):
-    """
-    处理名片图片并提取信息
-    
-    Args:
-        image_file (FileStorage): 上传的名片图片文件
-        
-    Returns:
-        dict: 处理结果,包含提取的信息和状态
-    """
-    minio_path = None
-    
-    try:
-        # 读取图片数据
-        image_data = image_file.read()
-        image_file.seek(0)  # 重置文件指针以便后续读取
-        
-        try:
-            # 优先使用 Qwen 2.5 VL Plus 模型直接从图像提取信息
-            try:
-                logging.info("尝试使用 Qwen 2.5 VL Plus 模型解析名片")
-                extracted_data = parse_text_with_qwen25VLplus(image_data)
-                logging.info("成功使用 Qwen 2.5 VL Plus 模型解析名片")
-            except Exception as qwen_error:
-                logging.warning(f"Qwen 模型解析失败,错误原因: {str(qwen_error)}")
-                # extracted_data = extract_text_from_image(image_data)
-        except Exception as e:
-            return {
-                'code': 500,
-                'success': False,
-                'message': f"名片解析失败: {str(e)}",
-                'data': None
-            }
-        
-        # 检查重复记录
-        try:
-            duplicate_check = check_duplicate_business_card(extracted_data)
-            logging.info(f"重复记录检查结果: {duplicate_check['reason']}")
-        except Exception as e:
-            logging.error(f"重复记录检查失败: {str(e)}", exc_info=True)
-            # 如果检查失败,默认创建新记录
-            duplicate_check = {
-                'is_duplicate': False,
-                'action': 'create_new',
-                'existing_card': None,
-                'reason': f'重复检查失败,创建新记录: {str(e)}'
-            }
-        
-        try:
-            # 生成唯一的文件名
-            file_ext = os.path.splitext(image_file.filename)[1].lower()
-            if not file_ext:
-                file_ext = '.jpg'  # 默认扩展名
-            
-            unique_filename = f"{uuid.uuid4().hex}{file_ext}"
-            minio_path = f"{unique_filename}"
-            
-            # 尝试上传到MinIO
-            minio_client = get_minio_client()
-            if minio_client:
-                try:
-                    # 上传文件
-                    logging.info(f"上传文件到MinIO: {minio_path}")
-                    minio_client.put_object(
-                        Bucket=minio_bucket,
-                        Key=minio_path,
-                        Body=image_file,
-                        ContentType=image_file.content_type
-                    )
-                    logging.info(f"图片已上传到MinIO: {minio_path}")
-                except Exception as upload_err:
-                    logging.error(f"上传文件到MinIO时出错: {str(upload_err)}")
-                    # 即使上传失败,仍继续处理,但路径为None
-                    minio_path = None
-            else:
-                minio_path = None
-                logging.warning("MinIO客户端未初始化,图片未上传")
-        except Exception as e:
-            logging.error(f"上传图片到MinIO失败: {str(e)}", exc_info=True)
-            minio_path = None
-        
-        try:
-            # 根据重复检查结果执行不同操作
-            if duplicate_check['action'] == 'update':
-                # 更新现有记录
-                existing_card = duplicate_check['existing_card']
-                
-                # 更新基本信息
-                existing_card.name_en = extracted_data.get('name_en', existing_card.name_en)
-                existing_card.title_zh = extracted_data.get('title_zh', existing_card.title_zh)
-                existing_card.title_en = extracted_data.get('title_en', existing_card.title_en)
-                existing_card.phone = extracted_data.get('phone', existing_card.phone)
-                existing_card.email = extracted_data.get('email', existing_card.email)
-                existing_card.hotel_zh = extracted_data.get('hotel_zh', existing_card.hotel_zh)
-                existing_card.hotel_en = extracted_data.get('hotel_en', existing_card.hotel_en)
-                existing_card.address_zh = extracted_data.get('address_zh', existing_card.address_zh)
-                existing_card.address_en = extracted_data.get('address_en', existing_card.address_en)
-                existing_card.postal_code_zh = extracted_data.get('postal_code_zh', existing_card.postal_code_zh)
-                existing_card.postal_code_en = extracted_data.get('postal_code_en', existing_card.postal_code_en)
-                existing_card.brand_zh = extracted_data.get('brand_zh', existing_card.brand_zh)
-                existing_card.brand_en = extracted_data.get('brand_en', existing_card.brand_en)
-                existing_card.affiliation_zh = extracted_data.get('affiliation_zh', existing_card.affiliation_zh)
-                existing_card.affiliation_en = extracted_data.get('affiliation_en', existing_card.affiliation_en)
-                # 处理生日字段
-                if extracted_data.get('birthday'):
-                    try:
-                        existing_card.birthday = datetime.strptime(extracted_data.get('birthday'), '%Y-%m-%d').date()
-                    except ValueError:
-                        # 如果日期格式不正确,保持原值
-                        pass
-                existing_card.residence = extracted_data.get('residence', existing_card.residence)
-                existing_card.brand_group = extracted_data.get('brand_group', existing_card.brand_group)
-                existing_card.image_path = minio_path  # 更新为最新的图片路径
-                existing_card.updated_by = 'system'
-                
-                # 更新职业轨迹,传递图片路径
-                existing_card.career_path = update_career_path(existing_card, extracted_data, minio_path)
-                
-                db.session.commit()
-                
-                logging.info(f"已更新现有名片记录,ID: {existing_card.id}")
-                
-                return {
-                    'code': 200,
-                    'success': True,
-                    'message': f'名片解析成功,已更新现有记录。{duplicate_check["reason"]}',
-                    'data': existing_card.to_dict()
-                }
-                
-            elif duplicate_check['action'] == 'create_with_duplicates':
-                # 创建新记录作为主记录,并保存疑似重复记录信息
-                main_card, duplicate_record = create_main_card_with_duplicates(
-                    extracted_data, 
-                    minio_path, 
-                    duplicate_check['suspected_duplicates'],
-                    duplicate_check['reason']
-                )
-                
-                return {
-                    'code': 202,  # Accepted,表示已接受但需要进一步处理
-                    'success': True,
-                    'message': f'创建新记录成功,发现疑似重复记录待处理。{duplicate_check["reason"]}',
-                    'data': {
-                        'main_card': main_card.to_dict(),
-                        'duplicate_record_id': duplicate_record.id,
-                        'suspected_duplicates_count': len(duplicate_check['suspected_duplicates']),
-                        'processing_status': 'pending',
-                        'duplicate_reason': duplicate_record.duplicate_reason,
-                        'created_at': duplicate_record.created_at.strftime('%Y-%m-%d %H:%M:%S')
-                    }
-                }
-                
-            else:
-                # 创建新记录
-                # 准备初始职业轨迹,包含当前名片信息和图片路径
-                initial_career_path = extracted_data.get('career_path', [])
-                if extracted_data.get('hotel_zh') or extracted_data.get('hotel_en') or extracted_data.get('title_zh') or extracted_data.get('title_en'):
-                    initial_entry = {
-                        'date': datetime.now().strftime('%Y-%m-%d'),
-                        'hotel_zh': extracted_data.get('hotel_zh', ''),
-                        'hotel_en': extracted_data.get('hotel_en', ''),
-                        'title_zh': extracted_data.get('title_zh', ''),
-                        'title_en': extracted_data.get('title_en', ''),
-                        'image_path': minio_path or '',  # 当前名片的图片路径
-                        'source': 'business_card_creation'
-                    }
-                    initial_career_path.append(initial_entry)
-                
-                business_card = BusinessCard(
-                    name_zh=extracted_data.get('name_zh', ''),
-                    name_en=extracted_data.get('name_en', ''),
-                    title_zh=extracted_data.get('title_zh', ''),
-                    title_en=extracted_data.get('title_en', ''),
-                    mobile=extracted_data.get('mobile', ''),
-                    phone=extracted_data.get('phone', ''),
-                    email=extracted_data.get('email', ''),
-                    hotel_zh=extracted_data.get('hotel_zh', ''),
-                    hotel_en=extracted_data.get('hotel_en', ''),
-                    address_zh=extracted_data.get('address_zh', ''),
-                    address_en=extracted_data.get('address_en', ''),
-                    postal_code_zh=extracted_data.get('postal_code_zh', ''),
-                    postal_code_en=extracted_data.get('postal_code_en', ''),
-                    brand_zh=extracted_data.get('brand_zh', ''),
-                    brand_en=extracted_data.get('brand_en', ''),
-                    affiliation_zh=extracted_data.get('affiliation_zh', ''),
-                    affiliation_en=extracted_data.get('affiliation_en', ''),
-                    birthday=datetime.strptime(extracted_data.get('birthday'), '%Y-%m-%d').date() if extracted_data.get('birthday') else None,
-                    residence=extracted_data.get('residence', ''),
-                    image_path=minio_path,  # 最新的图片路径
-                    career_path=initial_career_path,  # 包含图片路径的职业轨迹
-                    brand_group=extracted_data.get('brand_group', ''),
-                    status='active',
-                    updated_by='system'
-                )
-                
-                db.session.add(business_card)
-                db.session.commit()
-                
-                logging.info(f"名片信息已保存到数据库,ID: {business_card.id}")
-                
-                return {
-                    'code': 200,
-                    'success': True,
-                    'message': f'名片解析成功。{duplicate_check["reason"]}',
-                    'data': business_card.to_dict()
-                }
-        except Exception as e:
-            db.session.rollback()
-            error_msg = f"保存名片信息到数据库失败: {str(e)}"
-            logging.error(error_msg, exc_info=True)
-            
-            # 即使数据库操作失败,仍返回提取的信息
-            return {
-                'code': 500,
-                'success': False,
-                'message': error_msg,
-                'data': {
-                    'id': None,
-                    'name_zh': extracted_data.get('name_zh', ''),
-                    'name_en': extracted_data.get('name_en', ''),
-                    'title_zh': extracted_data.get('title_zh', ''),
-                    'title_en': extracted_data.get('title_en', ''),
-                    'mobile': extracted_data.get('mobile', ''),
-                    'phone': extracted_data.get('phone', ''),
-                    'email': extracted_data.get('email', ''),
-                    'hotel_zh': extracted_data.get('hotel_zh', ''),
-                    'hotel_en': extracted_data.get('hotel_en', ''),
-                    'address_zh': extracted_data.get('address_zh', ''),
-                    'address_en': extracted_data.get('address_en', ''),
-                    'postal_code_zh': extracted_data.get('postal_code_zh', ''),
-                    'postal_code_en': extracted_data.get('postal_code_en', ''),
-                    'brand_zh': extracted_data.get('brand_zh', ''),
-                    'brand_en': extracted_data.get('brand_en', ''),
-                    'affiliation_zh': extracted_data.get('affiliation_zh', ''),
-                    'affiliation_en': extracted_data.get('affiliation_en', ''),
-                    'birthday': extracted_data.get('birthday', ''),
-                    'residence': extracted_data.get('residence', ''),
-                    'image_path': minio_path,  # 返回相对路径
-                    'career_path': initial_career_path,  # 包含图片路径的职业轨迹
-                    'brand_group': extracted_data.get('brand_group', ''),
-                    'created_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
-                    'updated_at': None,
-                    'updated_by': 'system',
-                    'status': 'active'
-                }
-            }
-            
-    except Exception as e:
-        db.session.rollback()
-        error_msg = f"名片处理失败: {str(e)}"
-        logging.error(error_msg, exc_info=True)
-        
-        return {
-            'code': 500,
-            'success': False,
-            'message': error_msg,
-            'data': None
-        }
-
 def update_business_card(card_id, data):
     """
     更新名片信息
@@ -3178,6 +2935,64 @@ def get_duplicate_record_detail(duplicate_id):
         # 添加主记录信息
         if duplicate_record.main_card:
             record_dict['main_card'] = duplicate_record.main_card.to_dict()
+        else:
+            record_dict['main_card'] = None
+        
+        # 解析suspected_duplicates字段中的JSON信息,并获取详细的名片信息
+        suspected_duplicates_details = []
+        if duplicate_record.suspected_duplicates:
+            try:
+                # 确保suspected_duplicates是列表格式
+                suspected_list = duplicate_record.suspected_duplicates
+                if not isinstance(suspected_list, list):
+                    logging.warning(f"suspected_duplicates不是列表格式: {type(suspected_list)}")
+                    suspected_list = []
+                
+                # 遍历每个疑似重复记录ID
+                for suspected_item in suspected_list:
+                    try:
+                        # 支持两种格式:直接ID或包含ID的字典
+                        if isinstance(suspected_item, dict):
+                            card_id = suspected_item.get('id')
+                        else:
+                            card_id = suspected_item
+                        
+                        if card_id:
+                            # 调用get_business_card函数获取详细信息
+                            card_result = get_business_card(card_id)
+                            if card_result['success'] and card_result['data']:
+                                suspected_duplicates_details.append(card_result['data'])
+                                logging.info(f"成功获取疑似重复记录详情,ID: {card_id}")
+                            else:
+                                logging.warning(f"无法获取疑似重复记录详情,ID: {card_id}, 原因: {card_result['message']}")
+                                # 添加错误信息记录
+                                suspected_duplicates_details.append({
+                                    'id': card_id,
+                                    'error': card_result['message'],
+                                    'success': False
+                                })
+                        else:
+                            logging.warning(f"疑似重复记录项缺少ID信息: {suspected_item}")
+                    
+                    except Exception as item_error:
+                        logging.error(f"处理疑似重复记录项时出错: {suspected_item}, 错误: {str(item_error)}")
+                        suspected_duplicates_details.append({
+                            'original_item': suspected_item,
+                            'error': f"处理出错: {str(item_error)}",
+                            'success': False
+                        })
+                
+            except Exception as parse_error:
+                logging.error(f"解析suspected_duplicates JSON时出错: {str(parse_error)}")
+                suspected_duplicates_details = [{
+                    'error': f"解析JSON出错: {str(parse_error)}",
+                    'original_data': duplicate_record.suspected_duplicates,
+                    'success': False
+                }]
+        
+        # 将详细的疑似重复记录信息添加到返回数据中
+        record_dict['suspected_duplicates_details'] = suspected_duplicates_details
+        record_dict['suspected_duplicates_count'] = len(suspected_duplicates_details)
         
         return {
             'code': 200,

+ 17 - 8
app/core/graph/graph_operations.py

@@ -352,14 +352,14 @@ def get_node(label, **properties):
         logger.error(f"Error in get_node: {str(e)}")
         return None
 
-def relationship_exists(start_node, rel_type, end_node, **properties):
+def relationship_exists(start_node_id, rel_type, end_node_id, **properties):
     """
     检查两个节点之间是否存在指定类型和属性的关系
     
     Args:
-        start_node: 起始节点或节点ID
+        start_node_id: 起始节点ID (必须是整数ID)
         rel_type: 关系类型
-        end_node: 结束节点或节点ID
+        end_node_id: 结束节点ID (必须是整数ID)
         **properties: 关系的属性
         
     Returns:
@@ -367,9 +367,18 @@ def relationship_exists(start_node, rel_type, end_node, **properties):
     """
     try:
         with connect_graph().session() as session:
-            # 确定节点ID
-            start_id = start_node.id if hasattr(start_node, 'id') else start_node
-            end_id = end_node.id if hasattr(end_node, 'id') else end_node
+            # 确保输入的是有效的节点ID
+            if not isinstance(start_node_id, (int, str)) or not isinstance(end_node_id, (int, str)):
+                logger.warning(f"无效的节点ID类型: start_node_id={type(start_node_id)}, end_node_id={type(end_node_id)}")
+                return False
+                
+            # 转换为整数
+            try:
+                start_id = int(start_node_id)
+                end_id = int(end_node_id)
+            except (ValueError, TypeError):
+                logger.warning(f"无法转换节点ID为整数: start_node_id={start_node_id}, end_node_id={end_node_id}")
+                return False
             
             # 构建查询语句
             query = """
@@ -388,8 +397,8 @@ def relationship_exists(start_node, rel_type, end_node, **properties):
             
             # 执行查询
             params = {
-                'start_id': int(start_id),
-                'end_id': int(end_id),
+                'start_id': start_id,
+                'end_id': end_id,
                 **properties
             }
             result = session.run(query, **params).single()

+ 4 - 2
app/services/package_function.py

@@ -123,7 +123,8 @@ def create_person_workplace(code_list, flag, relatives_type):
             id(wrk_m) as id_wrk_m,
             CASE WHEN exists(wrk_m.organization_no) THEN 1 ELSE 0 END as relatives_status
     """
-    result = connect_graph.run(query, codes=code_list).data()
+    with connect_graph().session() as session:
+        result = session.run(query, codes=code_list).data()
     handle_function = relation_dict.get(condition, [])
 
     for row in result:
@@ -249,7 +250,8 @@ def person_relative(links, code_list, status):
     """.format("WITH CASE WHEN exists(m.code) THEN 1 ELSE 0 END AS status,r "
                "WHERE status = $relatives_status" if isinstance(status, int) else "")
 
-    result = connect_graph.run(query, codes=code_list, relatives_status=status).data()
+    with connect_graph().session() as session:
+        result = session.run(query, codes=code_list, relatives_status=status).data()
     for row in result:
         startnode = row['startnode']
         endnode = row['endnode']

+ 1 - 1
docs/data_flow_apis.md

@@ -5,7 +5,7 @@
 DataFlow API 提供了完整的数据流管理功能,包括数据流的创建、查询、更新、删除、执行等操作。系统基于Neo4j图数据库和PostgreSQL关系数据库的混合架构,支持复杂的数据流编排和管理。
 
 **版本**: v1.0  
-**基础URL**: `/api/data-flow`  
+**基础URL**: `/api/dataflow`  
 **内容类型**: `application/json`  
 **字符编码**: UTF-8