فهرست منبع

修改人才档案和重复记录的处理逻辑,新增酒店职位数据和酒店集团品牌数据同步到Neo4j图数据库程序
创建人才、酒店、职位的知识图谱

maxiaolong 1 هفته پیش
والد
کامیت
726aba92d1

+ 159 - 0
app/core/data_parse/NEO4J_NODE_CREATION_LOGIC.md

@@ -0,0 +1,159 @@
+# Neo4j Talent节点创建逻辑说明
+
+## 概述
+
+本文档说明了在`parse_task.py`的`add_single_talent`函数中,Neo4j Talent节点的创建逻辑和时机。
+
+## 节点创建逻辑
+
+### 1. 更新现有人才记录时
+**场景**: `duplicate_check['action'] == 'update_existing`
+- **行为**: 在Neo4j中更新或创建Talent节点
+- **原因**: 现有人才记录被更新,需要同步到图数据库
+- **属性**: 包含基本的联系信息(姓名、手机、邮箱、ID、更新时间)
+
+### 2. 创建新记录作为主记录并保存疑似重复记录时
+**场景**: `duplicate_check['action'] == 'create_with_duplicates`
+- **行为**: **不在Neo4j中创建Talent节点**
+- **原因**: 疑似重复记录需要进一步人工确认和处理
+- **说明**: 等待疑似重复记录处理完成后,再决定是否创建节点
+
+### 3. 创建全新记录时
+**场景**: `duplicate_check['action'] == 'create_new` 或默认情况
+- **行为**: 在Neo4j中创建Talent节点
+- **原因**: 全新的人才记录,需要同步到图数据库
+- **属性**: 包含完整的12个属性(姓名、联系方式、状态、生日、年龄、居住地、籍贯、ID、更新时间)
+
+## 代码实现对比
+
+### 更新现有人才记录
+```python
+# 在Neo4j图数据库中更新Talent节点
+try:
+    from app.core.graph.graph_operations import create_or_get_node
+    
+    # 创建Talent节点属性
+    talent_properties = {
+        'name_zh': existing_card.name_zh,
+        'name_en': existing_card.name_en,
+        'mobile': existing_card.mobile,
+        'email': existing_card.email,
+        'pg_id': existing_card.id,
+        'updated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+    }
+    
+    # 在Neo4j中更新或创建Talent节点
+    neo4j_node_id = create_or_get_node('Talent', **talent_properties)
+    logging.info(f"成功在Neo4j中更新Talent节点,Neo4j ID: {neo4j_node_id}, PostgreSQL ID: {existing_card.id}")
+    
+except Exception as neo4j_error:
+    logging.error(f"在Neo4j中更新Talent节点失败: {str(neo4j_error)}")
+```
+
+### 创建新记录并保存疑似重复记录
+```python
+# 注意:当创建新记录作为主记录并保存疑似重复记录信息时,不在Neo4j图数据库中创建Talent节点
+# 这是因为疑似重复记录需要进一步人工确认和处理
+logging.info(f"跳过Neo4j Talent节点创建,等待疑似重复记录处理完成,PostgreSQL ID: {main_card.id}")
+```
+
+### 创建全新记录
+```python
+# 在Neo4j图数据库中创建Talent节点
+try:
+    from app.core.graph.graph_operations import create_or_get_node
+    
+    # 创建Talent节点属性
+    talent_properties = {
+        'name_zh': business_card.name_zh,
+        'name_en': business_card.name_en,
+        'mobile': business_card.mobile,
+        'phone': business_card.phone,
+        'email': business_card.email,
+        'status': business_card.status,
+        'birthday': business_card.birthday.strftime('%Y-%m-%d') if business_card.birthday else None,
+        'age': business_card.age,
+        'residence': business_card.residence,
+        'native_place': business_card.native_place,
+        'pg_id': business_card.id,
+        'updated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+    }
+    
+    # 在Neo4j中创建Talent节点
+    neo4j_node_id = create_or_get_node('Talent', **talent_properties)
+    logging.info(f"成功在Neo4j中创建Talent节点,Neo4j ID: {neo4j_node_id}, PostgreSQL ID: {business_card.id}")
+    
+except Exception as neo4j_error:
+    logging.error(f"在Neo4j中创建Talent节点失败: {str(neo4j_error)}")
+```
+
+## 设计 rationale
+
+### 为什么疑似重复记录时不创建Neo4j节点?
+
+1. **数据一致性**: 疑似重复记录需要人工确认,过早创建节点可能导致数据不一致
+2. **避免重复**: 如果后续确认是重复记录,已创建的节点需要删除,增加复杂性
+3. **人工处理**: 疑似重复记录通常需要人工审核和决策,不适合自动化处理
+4. **资源管理**: 避免创建可能被删除的节点,节省Neo4j存储空间
+
+### 何时创建Neo4j节点?
+
+1. **确认唯一**: 全新记录且无重复嫌疑
+2. **更新现有**: 现有人才记录被更新,需要同步变更
+3. **人工确认**: 疑似重复记录经过人工确认和处理后
+
+## 后续处理建议
+
+### 疑似重复记录处理完成后
+可以考虑添加一个函数来创建Neo4j节点:
+
+```python
+def create_neo4j_node_after_duplicate_resolution(business_card_id):
+    """
+    在疑似重复记录处理完成后,创建Neo4j Talent节点
+    
+    Args:
+        business_card_id (int): business_cards表的主键ID
+    """
+    try:
+        # 获取business_card记录
+        business_card = BusinessCard.query.get(business_card_id)
+        if not business_card:
+            logging.error(f"未找到ID为{business_card_id}的business_card记录")
+            return False
+        
+        # 创建Neo4j节点
+        from app.core.graph.graph_operations import create_or_get_node
+        
+        talent_properties = {
+            'name_zh': business_card.name_zh,
+            'name_en': business_card.name_en,
+            'mobile': business_card.mobile,
+            'phone': business_card.phone,
+            'email': business_card.email,
+            'status': business_card.status,
+            'birthday': business_card.birthday.strftime('%Y-%m-%d') if business_card.birthday else None,
+            'age': business_card.age,
+            'residence': business_card.residence,
+            'native_place': business_card.native_place,
+            'pg_id': business_card.id,
+            'updated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+        }
+        
+        neo4j_node_id = create_or_get_node('Talent', **talent_properties)
+        logging.info(f"疑似重复记录处理完成后,成功创建Neo4j Talent节点,Neo4j ID: {neo4j_node_id}, PostgreSQL ID: {business_card.id}")
+        
+        return True
+        
+    except Exception as e:
+        logging.error(f"疑似重复记录处理完成后创建Neo4j节点失败: {str(e)}")
+        return False
+```
+
+## 总结
+
+- **更新现有人才**: 创建/更新Neo4j节点
+- **疑似重复记录**: 跳过Neo4j节点创建
+- **全新记录**: 创建完整的Neo4j节点
+
+这种设计确保了数据的一致性和完整性,避免了在数据状态不明确时创建可能无效的图数据库节点。 

+ 97 - 0
app/core/data_parse/TALENT_NEO4J_PROPERTIES.md

@@ -0,0 +1,97 @@
+# Talent节点Neo4j属性设置说明
+
+## 概述
+
+本文档说明了在`parse_task.py`的`add_single_talent`函数中,当创建新的Neo4j Talent节点时,节点包含的属性信息。
+
+## 属性列表
+
+### 基本信息
+- **`name_zh`** (string): 中文姓名
+- **`name_en`** (string): 英文姓名
+
+### 联系方式
+- **`mobile`** (string): 手机号码
+- **`phone`** (string): 固定电话
+- **`email`** (string): 电子邮箱
+
+### 状态信息
+- **`status`** (string): 人才状态,通常为'active'
+
+### 个人信息
+- **`birthday`** (date_string_or_null): 生日,格式为'YYYY-MM-DD',可能为None
+- **`age`** (integer_or_null): 年龄,可能为None
+- **`residence`** (string): 居住地
+- **`native_place`** (string): 籍贯
+
+### 系统信息
+- **`pg_id`** (integer): PostgreSQL数据库中business_cards表的主键ID
+- **`updated_at`** (datetime_string): 更新时间,格式为'YYYY-MM-DD HH:MM:SS'
+
+## 代码实现
+
+```python
+# 创建Talent节点属性
+talent_properties = {
+    'name_zh': business_card.name_zh,
+    'name_en': business_card.name_en,
+    'mobile': business_card.mobile,
+    'phone': business_card.phone,
+    'email': business_card.email,
+    'status': business_card.status,
+    'birthday': business_card.birthday.strftime('%Y-%m-%d') if business_card.birthday else None,
+    'age': business_card.age,
+    'residence': business_card.residence,
+    'native_place': business_card.native_place,
+    'pg_id': business_card.id,  # PostgreSQL主记录的ID
+    'updated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+}
+```
+
+## 属性处理说明
+
+### 生日字段处理
+- 如果`business_card.birthday`存在,将其格式化为'YYYY-MM-DD'字符串
+- 如果为None,则属性值也为None
+
+### 年龄字段处理
+- 直接使用`business_card.age`的值
+- 支持整数和None值
+
+### 时间字段处理
+- `updated_at`字段使用当前系统时间,格式化为'YYYY-MM-DD HH:MM:SS'
+
+## 数据来源
+
+所有属性都来自PostgreSQL数据库中的`business_cards`表记录,通过`BusinessCard`模型对象获取。
+
+## 使用场景
+
+这些属性用于在Neo4j图数据库中创建Talent节点,支持:
+- 人才信息查询和检索
+- 人才关系图谱构建
+- 人才数据分析
+- 与其他节点的关系建立
+
+## 注意事项
+
+1. **数据类型**: 确保属性值类型与Neo4j支持的类型兼容
+2. **空值处理**: 某些字段可能为None,需要正确处理
+3. **数据一致性**: 与PostgreSQL数据库中的数据保持同步
+4. **性能考虑**: 属性过多可能影响查询性能,建议根据实际需求调整
+
+## 扩展建议
+
+可以根据业务需求添加更多属性:
+- 技能标签
+- 工作经验
+- 教育背景
+- 证书信息
+- 推荐指数
+- 等等
+
+## 相关文件
+
+- `app/core/data_parse/parse_task.py`: 主要实现文件
+- `app/core/data_parse/parse_system.py`: BusinessCard模型定义
+- `app/core/graph/graph_operations.py`: Neo4j节点创建函数 

+ 4 - 4
app/core/data_parse/parse_neo4j_process.py

@@ -210,8 +210,8 @@ class HotelPositionNeo4jProcessor:
             
             query = """
                 CREATE (n:DataLabel {
-                    name: $name,
-                    en_name: $en_name,
+                    name_zh: $name_zh,
+                    name_en: $name_en,
                     describe: $describe,
                     time: $time,
                     category: $category,
@@ -221,8 +221,8 @@ class HotelPositionNeo4jProcessor:
             """
             
             parameters = {
-                'name': node_data['name'],
-                'en_name': node_data['en_name'],
+                'name_zh': node_data['name_zh'],
+                'name_en': node_data['name_en'],
                 'describe': '',
                 'time': current_time,
                 'category': '人才地图',

+ 141 - 1
app/core/data_parse/parse_system.py

@@ -17,6 +17,9 @@ from openai import OpenAI
 from app.config.config import DevelopmentConfig, ProductionConfig
 import time  # 添加导入时间模块
 
+# 导入Neo4j相关函数
+from app.core.data_parse.parse_task import create_or_get_talent_node, process_career_path
+
 # 名片解析数据模型
 class BusinessCard(db.Model):
     __tablename__ = 'business_cards'
@@ -948,7 +951,7 @@ def process_duplicate_record(duplicate_id, action, selected_duplicate_id=None, p
     处理重复记录
     
     Args:
-        duplicate_id (int): 重复记录ID
+        duplicate_id (int): 重复记录ID(主记录ID)
         action (str): 处理动作
         selected_duplicate_id (int, optional): 选择的重复记录ID
         processed_by (str, optional): 处理人
@@ -1034,18 +1037,155 @@ def process_duplicate_record(duplicate_id, action, selected_duplicate_id=None, p
             main_card.status = 'inactive'
             main_card.updated_by = processed_by or 'system'
             
+            # 在Neo4j图数据库中更新Talent节点和career_path
+            try:
+                # 创建Talent节点属性
+                talent_properties = {
+                    'name_zh': target_card.name_zh,
+                    'name_en': target_card.name_en,
+                    'mobile': target_card.mobile,
+                    'phone': target_card.phone,
+                    'email': target_card.email,
+                    'status': target_card.status,
+                    'birthday': target_card.birthday.strftime('%Y-%m-%d') if target_card.birthday else None,
+                    'age': target_card.age,
+                    'residence': target_card.residence,
+                    'native_place': target_card.native_place,
+                    'pg_id': target_card.id,  # PostgreSQL主记录的ID
+                    'updated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+                }
+                
+                # 在Neo4j中更新或创建Talent节点
+                neo4j_node_id = create_or_get_talent_node(**talent_properties)
+                logging.info(f"成功在Neo4j中更新Talent节点,Neo4j ID: {neo4j_node_id}, PostgreSQL ID: {target_card.id}")
+                
+                # 处理career_path,创建相关的Neo4j节点和关系
+                if target_card.career_path and isinstance(target_card.career_path, list):
+                    try:
+                        # 调用process_career_path函数处理职业轨迹
+                        career_result = process_career_path(
+                            career_path=target_card.career_path,
+                            talent_node_id=neo4j_node_id,
+                            talent_name_zh=target_card.name_zh
+                        )
+                        
+                        # 记录处理结果
+                        logging.info(f"处理career_path完成,结果: {career_result}")
+                                
+                    except Exception as career_error:
+                        logging.error(f"处理career_path失败: {str(career_error)}")
+                        # career_path处理失败不影响主流程,继续执行
+                else:
+                    logging.info(f"人才记录 {target_card.id} 没有career_path数据,跳过Neo4j关系处理")
+                    
+            except Exception as neo4j_error:
+                logging.error(f"在Neo4j中更新Talent节点失败: {str(neo4j_error)}")
+                # Neo4j操作失败不影响主流程,继续执行
+            
             result_data = target_card.to_dict()
             
         elif action == 'keep_main':
             # 保留主记录,将status设置为active
             main_card.status = 'active'
             main_card.updated_by = processed_by or 'system'
+            
+            # 在Neo4j图数据库中更新Talent节点和career_path
+            try:
+                # 创建Talent节点属性
+                talent_properties = {
+                    'name_zh': main_card.name_zh,
+                    'name_en': main_card.name_en,
+                    'mobile': main_card.mobile,
+                    'phone': main_card.phone,
+                    'email': main_card.email,
+                    'status': main_card.status,
+                    'birthday': main_card.birthday.strftime('%Y-%m-%d') if main_card.birthday else None,
+                    'age': main_card.age,
+                    'residence': main_card.residence,
+                    'native_place': main_card.native_place,
+                    'pg_id': main_card.id,  # PostgreSQL主记录的ID
+                    'updated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+                }
+                
+                # 在Neo4j中更新或创建Talent节点
+                neo4j_node_id = create_or_get_talent_node(**talent_properties)
+                logging.info(f"成功在Neo4j中更新Talent节点,Neo4j ID: {neo4j_node_id}, PostgreSQL ID: {main_card.id}")
+                
+                # 处理career_path,创建相关的Neo4j节点和关系
+                if main_card.career_path and isinstance(main_card.career_path, list):
+                    try:
+                        # 调用process_career_path函数处理职业轨迹
+                        career_result = process_career_path(
+                            career_path=main_card.career_path,
+                            talent_node_id=neo4j_node_id,
+                            talent_name_zh=main_card.name_zh
+                        )
+                        
+                        # 记录处理结果
+                        logging.info(f"处理career_path完成,结果: {career_result}")
+                                
+                    except Exception as career_error:
+                        logging.error(f"处理career_path失败: {str(career_error)}")
+                        # career_path处理失败不影响主流程,继续执行
+                else:
+                    logging.info(f"人才记录 {main_card.id} 没有career_path数据,跳过Neo4j关系处理")
+                    
+            except Exception as neo4j_error:
+                logging.error(f"在Neo4j中更新Talent节点失败: {str(neo4j_error)}")
+                # Neo4j操作失败不影响主流程,继续执行
+            
             result_data = main_card.to_dict()
             
         elif action == 'ignore':
             # 忽略,将主记录status设置为active
             main_card.status = 'active'
             main_card.updated_by = processed_by or 'system'
+            
+            # 在Neo4j图数据库中更新Talent节点和career_path
+            try:
+                # 创建Talent节点属性
+                talent_properties = {
+                    'name_zh': main_card.name_zh,
+                    'name_en': main_card.name_en,
+                    'mobile': main_card.mobile,
+                    'phone': main_card.phone,
+                    'email': main_card.email,
+                    'status': main_card.status,
+                    'birthday': main_card.birthday.strftime('%Y-%m-%d') if main_card.birthday else None,
+                    'age': main_card.age,
+                    'residence': main_card.residence,
+                    'native_place': main_card.native_place,
+                    'pg_id': main_card.id,  # PostgreSQL主记录的ID
+                    'updated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+                }
+                
+                # 在Neo4j中更新或创建Talent节点
+                neo4j_node_id = create_or_get_talent_node(**talent_properties)
+                logging.info(f"成功在Neo4j中更新Talent节点,Neo4j ID: {neo4j_node_id}, PostgreSQL ID: {main_card.id}")
+                
+                # 处理career_path,创建相关的Neo4j节点和关系
+                if main_card.career_path and isinstance(main_card.career_path, list):
+                    try:
+                        # 调用process_career_path函数处理职业轨迹
+                        career_result = process_career_path(
+                            career_path=main_card.career_path,
+                            talent_node_id=neo4j_node_id,
+                            talent_name_zh=main_card.name_zh
+                        )
+                        
+                        # 记录处理结果
+                        logging.info(f"处理career_path完成,结果: {career_result}")
+                                
+                    except Exception as career_error:
+                        logging.error(f"处理career_path失败: {str(career_error)}")
+                        # career_path处理失败不影响主流程,继续执行
+                else:
+                    logging.info(f"人才记录 {main_card.id} 没有career_path数据,跳过Neo4j关系处理")
+                    
+            except Exception as neo4j_error:
+                logging.error(f"在Neo4j中更新Talent节点失败: {str(neo4j_error)}")
+                # Neo4j操作失败不影响主流程,继续执行
+            
             result_data = main_card.to_dict()
         
         # 所有操作都更新duplicate_record的状态为processed

+ 447 - 43
app/core/data_parse/parse_task.py

@@ -51,6 +51,400 @@ def get_minio_client():
         return None
 
 
+def process_career_path(career_path, talent_node_id, talent_name_zh):
+    """
+    处理career_path,创建Hotel节点和相关的Neo4j关系
+    
+    Args:
+        career_path (list): 职业轨迹列表,每个元素包含酒店、职位等信息
+        talent_node_id (int): Talent节点的Neo4j ID
+        talent_name_zh (str): 人才中文姓名,用于日志记录
+        
+    Returns:
+        dict: 处理结果信息,包含成功和失败的统计
+    """
+    result = {
+        'total_items': 0,
+        'hotels_created': 0,
+        'hotels_skipped': 0,
+        'brand_relationships_created': 0,
+        'brand_relationships_failed': 0,
+        'work_for_relationships_created': 0,
+        'work_for_relationships_failed': 0,
+        'work_as_relationships_created': 0,
+        'work_as_relationships_failed': 0,
+        'errors': []
+    }
+    
+    try:
+        if not career_path or not isinstance(career_path, list):
+            logging.info("career_path为空或不是列表格式,跳过Hotel节点创建")
+            return result
+        
+        result['total_items'] = len(career_path)
+        logging.info(f"开始处理career_path,共 {len(career_path)} 条职业轨迹记录")
+        
+        # 在执行当前代码逻辑之前,清除传入节点的WORK_FOR和WORK_AS关系
+        try:
+            from app.core.graph.graph_operations import connect_graph
+            
+            with connect_graph().session() as session:
+                # 清除WORK_FOR关系
+                clear_work_for_query = """
+                MATCH (t:Talent)-[r:WORK_FOR]->(h:Hotel)
+                WHERE id(t) = $talent_node_id
+                DELETE r
+                """
+                work_for_result = session.run(clear_work_for_query, talent_node_id=talent_node_id)
+                logging.info(f"已清除Talent节点(ID: {talent_node_id})的所有WORK_FOR关系")
+                
+                # 清除WORK_AS关系
+                clear_work_as_query = """
+                MATCH (t:Talent)-[r:WORK_AS]->(d:DataLabel)
+                WHERE id(t) = $talent_node_id
+                DELETE r
+                """
+                work_as_result = session.run(clear_work_as_query, talent_node_id=talent_node_id)
+                logging.info(f"已清除Talent节点(ID: {talent_node_id})的所有WORK_AS关系")
+                
+        except Exception as clear_error:
+            logging.error(f"清除Talent节点关系失败: {str(clear_error)}")
+            result['errors'].append(f"清除Talent节点关系失败: {str(clear_error)}")
+            # 即使清除关系失败,也继续执行后续逻辑
+        
+        for i, career_item in enumerate(career_path):
+            try:
+                if not isinstance(career_item, dict):
+                    logging.warning(f"跳过无效的career_path元素 {i}: 不是字典格式")
+                    result['hotels_skipped'] += 1
+                    continue
+                
+                hotel_zh = career_item.get('hotel_zh', '')
+                hotel_en = career_item.get('hotel_en', '')
+                
+                if not hotel_zh:
+                    logging.warning(f"跳过career_path元素 {i}: 缺少hotel_zh字段")
+                    result['hotels_skipped'] += 1
+                    continue
+                
+                # 创建Hotel节点
+                try:
+                    from app.core.graph.graph_operations import connect_graph
+                    
+                    # 直接使用Cypher语句查找或创建Hotel节点
+                    with connect_graph().session() as session:
+                        # 首先查找是否已存在相同hotel_zh的Hotel节点
+                        find_query = """
+                        MATCH (h:Hotel {hotel_zh: $hotel_zh})
+                        RETURN id(h) as node_id, h.hotel_zh as hotel_zh
+                        LIMIT 1
+                        """
+                        find_result = session.run(find_query, hotel_zh=hotel_zh).single()
+                        
+                        if find_result:
+                            # 找到现有节点,使用其ID
+                            hotel_node_id = find_result['node_id']
+                            logging.info(f"找到现有Hotel节点,Neo4j ID: {hotel_node_id}, 酒店: {hotel_zh}")
+                            result['hotels_created'] += 0  # 不增加计数,因为不是新创建的
+                        else:
+                            # 没有找到,创建新节点
+                            current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+                            create_query = """
+                            CREATE (h:Hotel {
+                                hotel_zh: $hotel_zh,
+                                hotel_en: $hotel_en,
+                                create_time: $create_time
+                            })
+                            RETURN id(h) as node_id
+                            """
+                            create_result = session.run(create_query, 
+                                                      hotel_zh=hotel_zh,
+                                                      hotel_en=hotel_en,
+                                                      create_time=current_time).single()
+                            
+                            hotel_node_id = create_result['node_id']
+                            logging.info(f"成功创建新Hotel节点,Neo4j ID: {hotel_node_id}, 酒店: {hotel_zh}")
+                            result['hotels_created'] += 1
+                    
+                except Exception as hotel_error:
+                    logging.error(f"创建Hotel节点失败: {str(hotel_error)}")
+                    result['errors'].append(f"创建Hotel节点失败: {hotel_zh}, 错误: {str(hotel_error)}")
+                    continue
+                
+                # 使用千问大模型判断酒店所属品牌
+                try:
+                    from app.core.llm.llm_service import llm_client
+                    
+                    # 构建提示词
+                    prompt = f"请根据酒店名称'{hotel_zh}'判断该酒店所属的品牌。请只返回JSON格式结果,格式为{{\"brand\":\"品牌名称\"}}。"
+                    
+                    # 调用千问大模型
+                    brand_response = llm_client(prompt)
+                    
+                    if brand_response and isinstance(brand_response, str):
+                        # 尝试解析JSON响应
+                        try:
+                            brand_data = json.loads(brand_response)
+                            brand_name = brand_data.get('brand', '')
+                            
+                            if brand_name:
+                                # 查找对应的DataLabel节点
+                                try:
+                                    from app.core.graph.graph_operations import connect_graph
+                                    
+                                    # 直接查询Neo4j查找name_zh等于品牌名称的DataLabel节点
+                                    with connect_graph().session() as session:
+                                        query = "MATCH (n:DataLabel {name_zh: $brand_name}) RETURN id(n) as node_id, n.name_zh as name_zh LIMIT 1"
+                                        result_query = session.run(query, brand_name=brand_name).single()
+                                        
+                                        if result_query:
+                                            label_node_id = result_query['node_id']
+                                            
+                                            # 在创建BELONGS_TO关系之前,先检查关系是否已经存在
+                                            try:
+                                                from app.core.graph.graph_operations import connect_graph
+                                                
+                                                with connect_graph().session() as session:
+                                                    # 检查Hotel节点与DataLabel节点之间是否已经存在BELONGS_TO关系
+                                                    check_relationship_query = """
+                                                    MATCH (h:Hotel)-[r:BELONGS_TO]->(d:DataLabel)
+                                                    WHERE id(h) = $hotel_node_id AND id(d) = $label_node_id
+                                                    RETURN r
+                                                    LIMIT 1
+                                                    """
+                                                    existing_relationship = session.run(check_relationship_query, 
+                                                                                      hotel_node_id=hotel_node_id,
+                                                                                      label_node_id=label_node_id).single()
+                                                    
+                                                    if existing_relationship:
+                                                        logging.info(f"Hotel节点与品牌标签的关系已存在,跳过创建: {hotel_zh} BELONGS_TO {brand_name}")
+                                                        result['brand_relationships_created'] += 0  # 不增加计数,因为关系已存在
+                                                    else:
+                                                        # 关系不存在,创建新的BELONGS_TO关系
+                                                        from app.core.graph.graph_operations import create_relationship
+                                                        
+                                                        relationship_created = create_relationship(
+                                                            hotel_node_id, 
+                                                            label_node_id, 
+                                                            'BELONGS_TO'
+                                                        )
+                                                        
+                                                        if relationship_created:
+                                                            logging.info(f"成功创建Hotel节点与品牌标签的关系: {hotel_zh} BELONGS_TO {brand_name}")
+                                                            result['brand_relationships_created'] += 1
+                                                        else:
+                                                            logging.warning(f"创建Hotel节点与品牌标签关系失败: {hotel_zh} -> {brand_name}")
+                                                            result['brand_relationships_failed'] += 1
+                                                            
+                                            except Exception as check_error:
+                                                logging.error(f"检查Hotel节点与品牌标签关系失败: {str(check_error)}")
+                                                result['errors'].append(f"检查关系失败: {hotel_zh} -> {brand_name}, 错误: {str(check_error)}")
+                                                # 即使检查失败,也尝试创建关系
+                                                from app.core.graph.graph_operations import create_relationship
+                                                
+                                                relationship_created = create_relationship(
+                                                    hotel_node_id, 
+                                                    label_node_id, 
+                                                    'BELONGS_TO'
+                                                )
+                                                
+                                                if relationship_created:
+                                                    logging.info(f"成功创建Hotel节点与品牌标签的关系: {hotel_zh} BELONGS_TO {brand_name}")
+                                                    result['brand_relationships_created'] += 1
+                                                else:
+                                                    logging.warning(f"创建Hotel节点与品牌标签关系失败: {hotel_zh} -> {brand_name}")
+                                                    result['brand_relationships_failed'] += 1
+                                        else:
+                                            logging.warning(f"未找到品牌标签节点: {brand_name}")
+                                            
+                                except Exception as query_error:
+                                    logging.error(f"查询品牌标签节点失败: {str(query_error)}")
+                                    result['errors'].append(f"查询品牌标签节点失败: {brand_name}, 错误: {str(query_error)}")
+                            else:
+                                logging.warning(f"千问大模型返回的品牌名称为空: {hotel_zh}")
+                                
+                        except json.JSONDecodeError as json_error:
+                            logging.warning(f"解析千问大模型返回的JSON失败: {brand_response}, 错误: {json_error}")
+                    else:
+                        logging.warning(f"千问大模型返回结果无效: {brand_response}")
+                        
+                except Exception as brand_error:
+                    logging.error(f"调用千问大模型判断品牌失败: {str(brand_error)}")
+                    result['errors'].append(f"调用千问大模型判断品牌失败: {hotel_zh}, 错误: {str(brand_error)}")
+                
+                # 创建Talent节点到Hotel节点的WORK_FOR关系
+                try:
+                    from app.core.graph.graph_operations import create_relationship
+                    
+                    # 获取职业轨迹信息
+                    title_zh = career_item.get('title_zh', '')
+                    date = career_item.get('date', '')
+                    
+                    # 创建WORK_FOR关系,包含title_zh和date属性
+                    work_for_properties = {}
+                    if title_zh:
+                        work_for_properties['title_zh'] = title_zh
+                    if date:
+                        work_for_properties['date'] = date
+                    
+                    # 创建Talent节点到Hotel节点的WORK_FOR关系
+                    work_for_relationship_created = create_relationship(
+                        talent_node_id,  # Talent节点ID
+                        hotel_node_id,   # Hotel节点ID
+                        'WORK_FOR',
+                        work_for_properties
+                    )
+                    
+                    if work_for_relationship_created:
+                        logging.info(f"成功创建Talent到Hotel的WORK_FOR关系: Talent({talent_name_zh}) WORK_FOR Hotel({hotel_zh})")
+                        result['work_for_relationships_created'] += 1
+                    else:
+                        logging.warning(f"创建Talent到Hotel的WORK_FOR关系失败: Talent({talent_name_zh}) -> Hotel({hotel_zh})")
+                        result['work_for_relationships_failed'] += 1
+                    
+                    # 创建Talent节点到DataLabel节点的WORK_AS关系
+                    try:
+                        # 查找对应的DataLabel节点(职位标签)
+                        try:
+                            from app.core.graph.graph_operations import connect_graph
+                            
+                            # 直接查询Neo4j查找name_zh等于title_zh的DataLabel节点
+                            with connect_graph().session() as session:
+                                query = "MATCH (n:DataLabel {name_zh: $title_zh}) RETURN id(n) as node_id, n.name_zh as name_zh LIMIT 1"
+                                result_query = session.run(query, title_zh=title_zh).single()
+                                
+                                if result_query:
+                                    # 找到现有的DataLabel节点
+                                    label_node_id = result_query['node_id']
+                                    logging.info(f"找到现有职位标签节点: {title_zh}, ID: {label_node_id}")
+                                else:
+                                    # 没有找到,创建新的DataLabel节点
+                                    from app.core.graph.graph_operations import create_or_get_node
+                                    
+                                    current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+                                    label_properties = {
+                                        'name_zh': title_zh,
+                                        'en_name': career_item.get('title_en', ''),
+                                        'describe': '',
+                                        'time': current_time,
+                                        'category': '人才地图',
+                                        'status': 'active',
+                                        'node_type': 'position'
+                                    }
+                                    
+                                    label_node_id = create_or_get_node('DataLabel', **label_properties)
+                                    logging.info(f"创建新职位标签节点: {title_zh}, ID: {label_node_id}")
+                                
+                                # 创建WORK_AS关系,包含hotel_zh和date属性
+                                work_as_properties = {}
+                                if hotel_zh:
+                                    work_as_properties['hotel_zh'] = hotel_zh
+                                if date:
+                                    work_as_properties['date'] = date
+                                
+                                # 创建Talent节点到DataLabel节点的WORK_AS关系
+                                work_as_relationship_created = create_relationship(
+                                    talent_node_id,  # Talent节点ID
+                                    label_node_id,   # DataLabel节点ID
+                                    'WORK_AS',
+                                    work_as_properties
+                                )
+                                
+                                if work_as_relationship_created:
+                                    logging.info(f"成功创建Talent到职位标签的WORK_AS关系: Talent({talent_name_zh}) WORK_AS DataLabel({title_zh})")
+                                    result['work_as_relationships_created'] += 1
+                                else:
+                                    logging.warning(f"创建Talent到职位标签的WORK_AS关系失败: Talent({talent_name_zh}) -> DataLabel({title_zh})")
+                                    result['work_as_relationships_failed'] += 1
+                                    
+                        except Exception as label_query_error:
+                            logging.error(f"查询或创建职位标签节点失败: {str(label_query_error)}")
+                            result['errors'].append(f"查询或创建职位标签节点失败: {title_zh}, 错误: {str(label_query_error)}")
+                            
+                    except Exception as work_as_error:
+                        logging.error(f"创建WORK_AS关系失败: {str(work_as_error)}")
+                        result['errors'].append(f"创建WORK_AS关系失败: {title_zh}, 错误: {str(work_as_error)}")
+                        
+                except Exception as work_for_error:
+                    logging.error(f"创建WORK_FOR关系失败: {str(work_for_error)}")
+                    result['errors'].append(f"创建WORK_FOR关系失败: {hotel_zh}, 错误: {str(work_for_error)}")
+                
+            except Exception as career_error:
+                logging.error(f"处理career_path元素 {i} 失败: {str(career_error)}")
+                result['errors'].append(f"处理career_path元素 {i} 失败: {str(career_error)}")
+                continue
+        
+        logging.info(f"career_path处理完成,统计信息: {result}")
+        return result
+        
+    except Exception as career_path_error:
+        error_msg = f"处理career_path失败: {str(career_path_error)}"
+        logging.error(error_msg)
+        result['errors'].append(error_msg)
+        return result
+
+
+def create_or_get_talent_node(**properties):
+    """
+    创建具有给定属性的新Talent节点或获取现有节点
+    如果具有相同pg_id的节点存在,则更新属性
+    
+    Args:
+        **properties: 作为关键字参数的节点属性,必须包含pg_id
+        
+    Returns:
+        节点id
+    """
+    try:
+        from app.core.graph.graph_operations import connect_graph
+        
+        # 检查是否提供了pg_id
+        if 'pg_id' not in properties:
+            raise ValueError("pg_id is required for Talent node creation")
+        
+        pg_id = properties['pg_id']
+        
+        with connect_graph().session() as session:
+            # 检查节点是否存在(根据pg_id查找)
+            query = """
+            MATCH (n:Talent {pg_id: $pg_id})
+            RETURN n
+            """
+            result = session.run(query, pg_id=pg_id).single()
+            
+            if result:
+                # 节点存在,更新属性
+                props_string = ", ".join([f"n.{key} = ${key}" for key in properties if key != 'pg_id'])
+                if props_string:
+                    update_query = f"""
+                    MATCH (n:Talent {{pg_id: $pg_id}})
+                    SET {props_string}
+                    RETURN id(n) as node_id
+                    """
+                    result = session.run(update_query, **properties).single()
+                    logging.info(f"已更新现有Talent节点,pg_id: {pg_id}, Neo4j ID: {result['node_id']}")
+                    return result["node_id"]
+                else:
+                    # 没有需要更新的属性,返回现有节点ID
+                    existing_node_id = result['n'].id
+                    logging.info(f"找到现有Talent节点,pg_id: {pg_id}, Neo4j ID: {existing_node_id}")
+                    return existing_node_id
+            
+            # 如果到这里,则创建新节点
+            props_keys = ", ".join([f"{key}: ${key}" for key in properties])
+            create_query = f"""
+            CREATE (n:Talent {{{props_keys}}})
+            RETURN id(n) as node_id
+            """
+            result = session.run(create_query, **properties).single()
+            logging.info(f"已创建新Talent节点,pg_id: {pg_id}, Neo4j ID: {result['node_id']}")
+            return result["node_id"]
+            
+    except Exception as e:
+        logging.error(f"Error in create_or_get_talent_node: {str(e)}")
+        raise e
+
+
 def get_parse_tasks(page=1, per_page=10, task_type=None, task_status=None):
     """
     获取解析任务列表
@@ -753,9 +1147,6 @@ def add_single_talent(talent_data, minio_path=None, task_type=None):
                     if new_mobile:
                         # 如果有新的手机号码,合并到现有手机号码中
                         existing_card.mobile = merge_mobile_numbers(existing_card.mobile, new_mobile)
-                    elif talent_data.get('mobile') == '':
-                        # 如果明确传入空字符串,则清空手机号码
-                        existing_card.mobile = ''
                 
                 existing_card.phone = talent_data.get('phone', existing_card.phone)
                 existing_card.email = talent_data.get('email', existing_card.email)
@@ -812,22 +1203,45 @@ def add_single_talent(talent_data, minio_path=None, task_type=None):
                 
                 # 在Neo4j图数据库中更新Talent节点
                 try:
-                    from app.core.graph.graph_operations import create_or_get_node
-                    
                     # 创建Talent节点属性
                     talent_properties = {
                         'name_zh': existing_card.name_zh,
                         'name_en': existing_card.name_en,
                         'mobile': existing_card.mobile,
+                        'phone': existing_card.phone,
                         'email': existing_card.email,
+                        'status': existing_card.status,
+                        'birthday': existing_card.birthday.strftime('%Y-%m-%d') if existing_card.birthday else None,
+                        'age': existing_card.age,
+                        'residence': existing_card.residence,
+                        'native_place': existing_card.native_place,
                         'pg_id': existing_card.id,  # PostgreSQL主记录的ID
                         'updated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                     }
                     
                     # 在Neo4j中更新或创建Talent节点
-                    neo4j_node_id = create_or_get_node('Talent', **talent_properties)
+                    neo4j_node_id = create_or_get_talent_node(**talent_properties)
                     logging.info(f"成功在Neo4j中更新Talent节点,Neo4j ID: {neo4j_node_id}, PostgreSQL ID: {existing_card.id}")
                     
+                    # 处理career_path,创建相关的Neo4j节点和关系
+                    if existing_card.career_path and isinstance(existing_card.career_path, list):
+                        try:
+                            # 调用process_career_path函数处理职业轨迹
+                            career_result = process_career_path(
+                                career_path=existing_card.career_path,
+                                talent_node_id=neo4j_node_id,
+                                talent_name_zh=existing_card.name_zh
+                            )
+                            
+                            # 记录处理结果
+                            logging.info(f"处理career_path完成,结果: {career_result}")
+                                    
+                        except Exception as career_error:
+                            logging.error(f"处理career_path失败: {str(career_error)}")
+                            # career_path处理失败不影响主流程,继续执行
+                    else:
+                        logging.info(f"人才记录 {existing_card.id} 没有career_path数据,跳过Neo4j关系处理")
+                    
                     # 更新parsed_talents表中的对应记录状态
                     if talent_data.get('id'):
                         try:
@@ -866,40 +1280,22 @@ def add_single_talent(talent_data, minio_path=None, task_type=None):
                 main_card.origin_source = _update_origin_source_with_minio_path(main_card.origin_source, talent_data)
                 db.session.commit()  # 提交origin_source的更新
                 
-                # 在Neo4j图数据库中创建Talent节点
-                try:
-                    from app.core.graph.graph_operations import create_or_get_node
-                    
-                    # 创建Talent节点属性
-                    talent_properties = {
-                        'name_zh': main_card.name_zh,
-                        'name_en': main_card.name_en,
-                        'mobile': main_card.mobile,
-                        'email': main_card.email,
-                        'pg_id': main_card.id,  # PostgreSQL主记录的ID
-                        'updated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
-                    }
-                    
-                    # 在Neo4j中创建Talent节点
-                    neo4j_node_id = create_or_get_node('Talent', **talent_properties)
-                    logging.info(f"成功在Neo4j中创建Talent节点,Neo4j ID: {neo4j_node_id}, PostgreSQL ID: {main_card.id}")
-                    
-                    # 更新parsed_talents表中的对应记录状态
-                    if talent_data.get('id'):
-                        try:
-                            from app.core.data_parse.parse_system import ParsedTalent
-                            parsed_talent = ParsedTalent.query.filter_by(id=talent_data['id']).first()
-                            if parsed_talent:
-                                parsed_talent.status = '已入库'
-                                db.session.commit()
-                                logging.info(f"已更新parsed_talents表记录状态为已入库,ID: {talent_data['id']}")
-                        except Exception as update_error:
-                            logging.error(f"更新parsed_talents表记录状态失败: {str(update_error)}")
-                            # 状态更新失败不影响主流程
-                    
-                except Exception as neo4j_error:
-                    logging.error(f"在Neo4j中创建Talent节点失败: {str(neo4j_error)}")
-                    # Neo4j操作失败不影响主流程,继续返回成功结果
+                # 注意:当创建新记录作为主记录并保存疑似重复记录信息时,不在Neo4j图数据库中创建Talent节点
+                # 这是因为疑似重复记录需要进一步人工确认和处理
+                logging.info(f"跳过Neo4j Talent节点创建,等待疑似重复记录处理完成,PostgreSQL ID: {main_card.id}")
+                
+                # 更新parsed_talents表中的对应记录状态
+                if talent_data.get('id'):
+                    try:
+                        from app.core.data_parse.parse_system import ParsedTalent
+                        parsed_talent = ParsedTalent.query.filter_by(id=talent_data['id']).first()
+                        if parsed_talent:
+                            parsed_talent.status = '已入库'
+                            db.session.commit()
+                            logging.info(f"已更新parsed_talents表记录状态为已入库,ID: {talent_data['id']}")
+                    except Exception as update_error:
+                        logging.error(f"更新parsed_talents表记录状态失败: {str(update_error)}")
+                        # 状态更新失败不影响主流程
                 
                 return {
                     'code': 202,  # Accepted,表示已接受但需要进一步处理
@@ -966,22 +1362,30 @@ def add_single_talent(talent_data, minio_path=None, task_type=None):
                 
                 # 在Neo4j图数据库中创建Talent节点
                 try:
-                    from app.core.graph.graph_operations import create_or_get_node
-                    
                     # 创建Talent节点属性
                     talent_properties = {
                         'name_zh': business_card.name_zh,
                         'name_en': business_card.name_en,
                         'mobile': business_card.mobile,
+                        'phone': business_card.phone,
                         'email': business_card.email,
+                        'status': business_card.status,
+                        'birthday': business_card.birthday.strftime('%Y-%m-%d') if business_card.birthday else None,
+                        'age': business_card.age,
+                        'residence': business_card.residence,
+                        'native_place': business_card.native_place,
                         'pg_id': business_card.id,  # PostgreSQL主记录的ID
                         'updated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                     }
                     
                     # 在Neo4j中创建Talent节点
-                    neo4j_node_id = create_or_get_node('Talent', **talent_properties)
+                    neo4j_node_id = create_or_get_talent_node(**talent_properties)
                     logging.info(f"成功在Neo4j中创建Talent节点,Neo4j ID: {neo4j_node_id}, PostgreSQL ID: {business_card.id}")
                     
+                    # 处理career_path,创建Hotel节点及关系
+                    career_result = process_career_path(career_path, neo4j_node_id, business_card.name_zh)
+                    logging.info(f"career_path处理完成,结果: {career_result}")
+                    
                     # 更新parsed_talents表中的对应记录状态
                     if talent_data.get('id'):
                         try:

+ 201 - 0
test_neo4j_node_creation_logic.py

@@ -0,0 +1,201 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+测试Neo4j节点创建逻辑
+
+该脚本用于验证parse_task.py中add_single_talent函数的Neo4j节点创建逻辑是否正确
+"""
+
+import os
+import sys
+
+# 添加项目根目录到Python路径
+current_dir = os.path.dirname(os.path.abspath(__file__))
+project_root = os.path.dirname(current_dir)
+sys.path.insert(0, project_root)
+
+def test_neo4j_node_creation_logic():
+    """测试Neo4j节点创建逻辑"""
+    print("测试Neo4j节点创建逻辑...")
+    
+    try:
+        # 导入必要的模块
+        from app.core.data_parse.parse_task import add_single_talent
+        print("✅ 成功导入add_single_talent函数")
+        
+        # 检查函数是否存在
+        if hasattr(add_single_talent, '__call__'):
+            print("✅ add_single_talent是一个可调用的函数")
+        else:
+            print("❌ add_single_talent不是一个可调用的函数")
+            return False
+        
+        return True
+        
+    except ImportError as e:
+        print(f"❌ 导入模块失败: {e}")
+        return False
+    except Exception as e:
+        print(f"❌ 测试过程中发生错误: {e}")
+        return False
+
+def test_code_structure():
+    """测试代码结构"""
+    print("\n测试代码结构...")
+    
+    try:
+        # 读取parse_task.py文件,检查关键逻辑
+        parse_task_path = os.path.join('app', 'core', 'data_parse', 'parse_task.py')
+        
+        if not os.path.exists(parse_task_path):
+            print(f"❌ 找不到文件: {parse_task_path}")
+            return False
+        
+        with open(parse_task_path, 'r', encoding='utf-8') as f:
+            content = f.read()
+        
+        # 检查关键代码片段
+        print("检查代码中的关键逻辑...")
+        
+        # 检查疑似重复记录处理逻辑
+        if "跳过Neo4j Talent节点创建,等待疑似重复记录处理完成" in content:
+            print("✅ 找到疑似重复记录时跳过Neo4j节点创建的逻辑")
+        else:
+            print("❌ 未找到疑似重复记录时跳过Neo4j节点创建的逻辑")
+        
+        # 检查更新现有人才记录的逻辑
+        if "在Neo4j图数据库中更新Talent节点" in content:
+            print("✅ 找到更新现有人才记录时创建Neo4j节点的逻辑")
+        else:
+            print("❌ 未找到更新现有人才记录时创建Neo4j节点的逻辑")
+        
+        # 检查创建全新记录的逻辑
+        if "在Neo4j图数据库中创建Talent节点" in content:
+            print("✅ 找到创建全新记录时创建Neo4j节点的逻辑")
+        else:
+            print("❌ 未找到创建全新记录时创建Neo4j节点的逻辑")
+        
+        return True
+        
+    except Exception as e:
+        print(f"❌ 测试代码结构时发生错误: {e}")
+        return False
+
+def test_duplicate_handling_logic():
+    """测试重复记录处理逻辑"""
+    print("\n测试重复记录处理逻辑...")
+    
+    try:
+        # 模拟不同的重复检查结果
+        duplicate_scenarios = [
+            {
+                'name': '更新现有人才记录',
+                'action': 'update_existing',
+                'should_create_neo4j': True,
+                'description': '现有人才记录被更新,需要同步到图数据库'
+            },
+            {
+                'name': '创建新记录并保存疑似重复记录',
+                'action': 'create_with_duplicates',
+                'should_create_neo4j': False,
+                'description': '疑似重复记录需要进一步人工确认和处理'
+            },
+            {
+                'name': '创建全新记录',
+                'action': 'create_new',
+                'should_create_neo4j': True,
+                'description': '全新的人才记录,需要同步到图数据库'
+            }
+        ]
+        
+        print("重复记录处理逻辑分析:")
+        for scenario in duplicate_scenarios:
+            status = "✅" if scenario['should_create_neo4j'] else "⏸️"
+            print(f"  {status} {scenario['name']}")
+            print(f"      Action: {scenario['action']}")
+            print(f"      创建Neo4j节点: {'是' if scenario['should_create_neo4j'] else '否'}")
+            print(f"      说明: {scenario['description']}")
+            print()
+        
+        return True
+        
+    except Exception as e:
+        print(f"❌ 测试重复记录处理逻辑时发生错误: {e}")
+        return False
+
+def test_property_consistency():
+    """测试属性一致性"""
+    print("\n测试属性一致性...")
+    
+    try:
+        # 检查不同场景下的属性设置
+        scenarios = [
+            {
+                'name': '更新现有人才记录',
+                'properties': ['name_zh', 'name_en', 'mobile', 'email', 'pg_id', 'updated_at'],
+                'count': 6
+            },
+            {
+                'name': '创建全新记录',
+                'properties': [
+                    'name_zh', 'name_en', 'mobile', 'phone', 'email', 'status',
+                    'birthday', 'age', 'residence', 'native_place', 'pg_id', 'updated_at'
+                ],
+                'count': 12
+            }
+        ]
+        
+        print("属性一致性检查:")
+        for scenario in scenarios:
+            print(f"  📋 {scenario['name']}")
+            print(f"      属性数量: {scenario['count']}")
+            print(f"      属性列表: {', '.join(scenario['properties'])}")
+            print()
+        
+        return True
+        
+    except Exception as e:
+        print(f"❌ 测试属性一致性时发生错误: {e}")
+        return False
+
+def main():
+    """主函数"""
+    print("Neo4j节点创建逻辑测试")
+    print("=" * 60)
+    
+    # 测试Neo4j节点创建逻辑
+    if not test_neo4j_node_creation_logic():
+        print("\n❌ Neo4j节点创建逻辑测试失败")
+        return
+    
+    print("\n" + "=" * 60)
+    
+    # 测试代码结构
+    if not test_code_structure():
+        print("\n❌ 代码结构测试失败")
+        return
+    
+    print("\n" + "=" * 60)
+    
+    # 测试重复记录处理逻辑
+    if not test_duplicate_handling_logic():
+        print("\n❌ 重复记录处理逻辑测试失败")
+        return
+    
+    print("\n" + "=" * 60)
+    
+    # 测试属性一致性
+    if not test_property_consistency():
+        print("\n❌ 属性一致性测试失败")
+        return
+    
+    print("\n" + "=" * 60)
+    print("🎉 所有测试完成!")
+    print("\n总结:")
+    print("- 更新现有人才记录: 创建/更新Neo4j节点")
+    print("- 疑似重复记录: 跳过Neo4j节点创建")
+    print("- 创建全新记录: 创建完整的Neo4j节点")
+    print("- 这种设计确保了数据的一致性和完整性")
+
+if __name__ == "__main__":
+    main() 

+ 155 - 0
test_talent_neo4j_properties.py

@@ -0,0 +1,155 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+测试Talent节点的Neo4j属性设置
+
+该脚本用于验证parse_task.py中add_single_talent函数创建的Neo4j Talent节点属性是否正确
+"""
+
+import os
+import sys
+
+# 添加项目根目录到Python路径
+current_dir = os.path.dirname(os.path.abspath(__file__))
+project_root = os.path.dirname(current_dir)
+sys.path.insert(0, project_root)
+
+def test_talent_properties_structure():
+    """测试Talent节点属性结构"""
+    print("测试Talent节点属性结构...")
+    
+    try:
+        # 导入必要的模块
+        from app.core.data_parse.parse_task import add_single_talent
+        from app.core.data_parse.parse_system import BusinessCard
+        from datetime import datetime
+        
+        print("✅ 成功导入add_single_talent函数")
+        
+        # 检查BusinessCard模型是否包含所需字段
+        required_fields = [
+            'name_zh', 'name_en', 'mobile', 'phone', 'email', 'status',
+            'birthday', 'age', 'residence', 'native_place'
+        ]
+        
+        print("\n检查BusinessCard模型字段...")
+        for field in required_fields:
+            if hasattr(BusinessCard, field):
+                print(f"✅ {field} 字段存在")
+            else:
+                print(f"❌ {field} 字段不存在")
+        
+        # 模拟Talent节点属性结构
+        print("\n预期的Neo4j Talent节点属性结构:")
+        expected_properties = {
+            'name_zh': 'string',
+            'name_en': 'string', 
+            'mobile': 'string',
+            'phone': 'string',
+            'email': 'string',
+            'status': 'string',
+            'birthday': 'date_string_or_null',
+            'age': 'integer_or_null',
+            'residence': 'string',
+            'native_place': 'string',
+            'pg_id': 'integer',
+            'updated_at': 'datetime_string'
+        }
+        
+        for prop, prop_type in expected_properties.items():
+            print(f"  - {prop}: {prop_type}")
+        
+        print("\n✅ Talent节点属性结构验证完成")
+        return True
+        
+    except ImportError as e:
+        print(f"❌ 导入模块失败: {e}")
+        return False
+    except Exception as e:
+        print(f"❌ 测试过程中发生错误: {e}")
+        return False
+
+def test_neo4j_integration():
+    """测试Neo4j集成"""
+    print("\n测试Neo4j集成...")
+    
+    try:
+        # 检查graph_operations模块
+        from app.core.graph.graph_operations import create_or_get_node
+        print("✅ 成功导入create_or_get_node函数")
+        
+        # 检查Neo4j驱动
+        from app.services.neo4j_driver import Neo4jDriver
+        print("✅ 成功导入Neo4jDriver")
+        
+        return True
+        
+    except ImportError as e:
+        print(f"⚠️  Neo4j相关模块导入失败: {e}")
+        print("这可能是正常的,如果Neo4j服务未启动或配置不正确")
+        return False
+    except Exception as e:
+        print(f"❌ 测试Neo4j集成时发生错误: {e}")
+        return False
+
+def test_property_handling():
+    """测试属性处理逻辑"""
+    print("\n测试属性处理逻辑...")
+    
+    try:
+        # 模拟birthday字段处理
+        from datetime import datetime
+        
+        # 测试有效的生日
+        valid_birthday = datetime(1990, 5, 15)
+        formatted_birthday = valid_birthday.strftime('%Y-%m-%d')
+        print(f"✅ 有效生日格式化: {valid_birthday} -> {formatted_birthday}")
+        
+        # 测试None生日
+        none_birthday = None
+        formatted_none = none_birthday.strftime('%Y-%m-%d') if none_birthday else None
+        print(f"✅ None生日处理: {none_birthday} -> {formatted_none}")
+        
+        # 测试年龄字段处理
+        valid_age = 30
+        print(f"✅ 有效年龄: {valid_age}")
+        
+        none_age = None
+        print(f"✅ None年龄: {none_age}")
+        
+        print("✅ 属性处理逻辑验证完成")
+        return True
+        
+    except Exception as e:
+        print(f"❌ 测试属性处理逻辑时发生错误: {e}")
+        return False
+
+def main():
+    """主函数"""
+    print("Talent节点Neo4j属性设置测试")
+    print("=" * 60)
+    
+    # 测试Talent节点属性结构
+    if not test_talent_properties_structure():
+        print("\n❌ Talent节点属性结构测试失败")
+        return
+    
+    # 测试Neo4j集成
+    test_neo4j_integration()
+    
+    # 测试属性处理逻辑
+    if not test_property_handling():
+        print("\n❌ 属性处理逻辑测试失败")
+        return
+    
+    print("\n" + "=" * 60)
+    print("🎉 所有测试完成!")
+    print("\n总结:")
+    print("- Talent节点现在包含12个属性")
+    print("- 支持中英文姓名、联系方式、状态、生日、年龄、居住地、籍贯等信息")
+    print("- 包含PostgreSQL记录ID和更新时间戳")
+    print("- 生日字段支持None值处理")
+    print("- 年龄字段支持None值处理")
+
+if __name__ == "__main__":
+    main()