Ver Fonte

新增酒店职位数据和酒店集团品牌数据同步到Neo4j图数据库程序

maxiaolong há 1 semana atrás
pai
commit
320cbf9d0d

+ 334 - 0
app/core/data_parse/README_parse_neo4j_process.md

@@ -0,0 +1,334 @@
+# 酒店职位数据和酒店集团品牌数据Neo4j同步程序
+
+## 概述
+
+`parse_neo4j_process.py` 是一个Python程序,用于将PostgreSQL数据库中的酒店职位数据和酒店集团品牌数据同步到Neo4j图数据库中。
+
+## 功能特性
+
+- 自动读取PostgreSQL数据库中的酒店职位数据和酒店集团品牌数据
+- 将部门信息、职位信息、级别信息、集团信息、品牌信息、品牌级别信息同步到Neo4j图数据库的DataLabel节点
+- 创建节点之间的层次关系
+- 避免重复添加相同名称的节点
+- 完整的日志记录和错误处理
+- 支持开发和生产环境配置
+
+## 数据同步规则
+
+### 源数据
+
+#### 酒店职位数据 (hotel_positions)
+- 数据源:PostgreSQL数据库表 `dataops/public/hotel_positions`
+- 查询条件:`status = 'active'` 且 `department_zh`、`position_zh`、`level_zh` 都不为空
+- 获取字段:
+  - `department_zh`(部门中文名称)、`department_en`(部门英文名称)
+  - `position_zh`(职位中文名称)、`position_en`(职位英文名称)
+  - `level_zh`(级别中文名称)、`level_en`(级别英文名称)
+
+#### 酒店集团品牌数据 (hotel_group_brands)
+- 数据源:PostgreSQL数据库表 `dataops/public/hotel_group_brands`
+- 查询条件:`status = 'active'` 且 `group_name_zh`、`brand_name_zh`、`positioning_level_zh` 都不为空
+- 获取字段:
+  - `group_name_zh`(集团中文名称)、`group_name_en`(集团英文名称)
+  - `brand_name_zh`(品牌中文名称)、`brand_name_en`(品牌英文名称)
+  - `positioning_level_zh`(定位级别中文名称)、`positioning_level_en`(定位级别英文名称)
+
+### 目标节点
+- 节点标签:`DataLabel`
+- 节点属性:
+  - `name`: 对应字段值(department_zh/position_zh/level_zh/group_name_zh/brand_name_zh/positioning_level_zh)
+  - `en_name`: 对应英文名称(department_en/position_en/level_en/group_name_en/brand_name_en/positioning_level_en)
+  - `describe`: 空字符串
+  - `time`: 当前系统时间
+  - `category`: "人才地图"
+  - `status`: "active"
+  - `node_type`: 节点类型(department/position/position_level/group/brand/brand_level)
+
+### 节点关系
+
+#### 酒店职位关系
+- **BELONGS_TO**: 职位节点 → 部门节点(表示职位属于某个部门)
+- **HAS_LEVEL**: 职位节点 → 级别节点(表示职位具有某个级别)
+
+#### 酒店集团品牌关系
+- **BELONGS_TO**: 品牌节点 → 集团节点(表示品牌属于某个集团)
+- **HAS_LEVEL**: 品牌节点 → 品牌级别节点(表示品牌具有某个定位级别)
+
+### 去重逻辑
+- 在创建新节点前,检查Neo4j中是否已存在相同 `name` 的DataLabel节点
+- 如果存在,则跳过该记录,不重复创建
+- 关系使用MERGE操作,避免重复创建
+
+## 使用方法
+
+### 1. 直接运行
+
+```bash
+# 在项目根目录下运行
+python app/core/data_parse/parse_neo4j_process.py
+```
+
+### 2. 作为模块导入
+
+```python
+from app.core.data_parse.parse_neo4j_process import HotelPositionNeo4jProcessor
+
+# 创建处理器实例
+processor = HotelPositionNeo4jProcessor()
+
+# 运行同步程序
+success = processor.run()
+```
+
+## 环境要求
+
+### 依赖包
+- `neo4j>=5.0.0` - Neo4j Python驱动
+- `psycopg2-binary>=2.9.10` - PostgreSQL适配器
+- `SQLAlchemy>=2.0.0` - 数据库ORM
+- `Flask>=3.0.2` - Web框架(用于配置管理)
+
+### 数据库连接
+- **PostgreSQL**: 需要访问 `dataops` 数据库的 `hotel_positions` 和 `hotel_group_brands` 表
+- **Neo4j**: 需要访问Neo4j图数据库
+
+### 配置要求
+程序会自动读取 `app/config/config.py` 中的数据库连接配置:
+
+```python
+# PostgreSQL配置
+SQLALCHEMY_DATABASE_URI = 'postgresql://username:password@host:port/database'
+
+# Neo4j配置
+NEO4J_URI = "bolt://host:port"
+NEO4J_USER = "username"
+NEO4J_PASSWORD = "password"
+NEO4J_ENCRYPTED = False
+```
+
+## 输出说明
+
+### 控制台输出
+程序会在控制台显示执行进度和结果:
+```
+2024-01-01 10:00:00 - INFO - 开始执行酒店职位数据和酒店集团品牌数据Neo4j同步程序
+2024-01-01 10:00:01 - INFO - PostgreSQL数据库连接成功
+2024-01-01 10:00:02 - INFO - Neo4j数据库连接成功
+2024-01-01 10:00:03 - INFO - 开始处理酒店职位数据...
+2024-01-01 10:00:04 - INFO - 成功获取 15 条酒店职位数据
+2024-01-01 10:00:05 - INFO - 成功创建部门节点: 前厅部
+2024-01-01 10:00:06 - INFO - 成功创建职位节点: 前台接待
+2024-01-01 10:00:07 - INFO - 成功创建级别节点: 初级
+2024-01-01 10:00:08 - INFO - 成功创建关系: 前台接待 BELONGS_TO 前厅部
+2024-01-01 10:00:09 - INFO - 成功创建关系: 前台接待 HAS_LEVEL 初级
+...
+2024-01-01 10:00:15 - INFO - 酒店职位数据同步完成: 酒店职位数据同步完成
+2024-01-01 10:00:15 - INFO - 总计记录: 15
+2024-01-01 10:00:15 - INFO - 部门节点 - 新建: 8, 跳过: 2
+2024-01-01 10:00:15 - INFO - 职位节点 - 新建: 12, 跳过: 3
+2024-01-01 10:00:15 - INFO - 级别节点 - 新建: 5, 跳过: 1
+2024-01-01 10:00:15 - INFO - 关系创建: 30
+2024-01-01 10:00:16 - INFO - 开始处理酒店集团品牌数据...
+2024-01-01 10:00:17 - INFO - 成功获取 20 条酒店集团品牌数据
+2024-01-01 10:00:18 - INFO - 成功创建集团节点: 万豪国际
+2024-01-01 10:00:19 - INFO - 成功创建品牌节点: 丽思卡尔顿
+2024-01-01 10:00:20 - INFO - 成功创建品牌级别节点: 奢华
+2024-01-01 10:00:21 - INFO - 成功创建关系: 丽思卡尔顿 BELONGS_TO 万豪国际
+2024-01-01 10:00:22 - INFO - 成功创建关系: 丽思卡尔顿 HAS_LEVEL 奢华
+...
+2024-01-01 10:00:30 - INFO - 酒店集团品牌数据同步完成: 酒店集团品牌数据同步完成
+2024-01-01 10:00:30 - INFO - 总计记录: 20
+2024-01-01 10:00:30 - INFO - 集团节点 - 新建: 12, 跳过: 3
+2024-01-01 10:00:30 - INFO - 品牌节点 - 新建: 18, 跳过: 2
+2024-01-01 10:00:30 - INFO - 品牌级别节点 - 新建: 8, 跳过: 1
+2024-01-01 10:00:30 - INFO - 关系创建: 40
+2024-01-01 10:00:30 - INFO - 所有数据同步任务完成
+```
+
+### 日志文件
+程序会在 `logs/parse_neo4j_process.log` 文件中记录详细的执行日志。
+
+### 返回结果
+程序返回执行结果统计,包括两个数据表的处理结果:
+
+#### 酒店职位数据结果
+```python
+{
+    'success': True,
+    'message': '酒店职位数据同步完成',
+    'total': 15,                    # 总记录数
+    'departments_created': 8,       # 新建部门节点数
+    'departments_skipped': 2,       # 跳过部门节点数
+    'positions_created': 12,        # 新建职位节点数
+    'positions_skipped': 3,         # 跳过职位节点数
+    'levels_created': 5,            # 新建级别节点数
+    'levels_skipped': 1,            # 跳过级别节点数
+    'relationships_created': 30     # 创建关系数
+}
+```
+
+#### 酒店集团品牌数据结果
+```python
+{
+    'success': True,
+    'message': '酒店集团品牌数据同步完成',
+    'total': 20,                    # 总记录数
+    'groups_created': 12,           # 新建集团节点数
+    'groups_skipped': 3,            # 跳过集团节点数
+    'brands_created': 18,           # 新建品牌节点数
+    'brands_skipped': 2,            # 跳过品牌节点数
+    'brand_levels_created': 8,      # 新建品牌级别节点数
+    'brand_levels_skipped': 1,      # 跳过品牌级别节点数
+    'relationships_created': 40     # 创建关系数
+}
+```
+
+## 图数据库结构
+
+### 节点类型
+程序会创建六种类型的DataLabel节点:
+
+#### 酒店职位相关节点
+1. **部门节点** (`node_type: 'department'`)
+2. **职位节点** (`node_type: 'position'`)
+3. **职位级别节点** (`node_type: 'position_level'`)
+
+#### 酒店集团品牌相关节点
+4. **集团节点** (`node_type: 'group'`)
+5. **品牌节点** (`node_type: 'brand'`)
+6. **品牌级别节点** (`node_type: 'brand_level'`)
+
+### 关系类型
+
+#### 酒店职位关系
+1. **BELONGS_TO**: 职位 → 部门
+   - 表示职位属于某个部门
+   - 例如:前台接待 BELONGS_TO 前厅部
+
+2. **HAS_LEVEL**: 职位 → 级别
+   - 表示职位具有某个级别
+   - 例如:前台接待 HAS_LEVEL 初级
+
+#### 酒店集团品牌关系
+3. **BELONGS_TO**: 品牌 → 集团
+   - 表示品牌属于某个集团
+   - 例如:丽思卡尔顿 BELONGS_TO 万豪国际
+
+4. **HAS_LEVEL**: 品牌 → 品牌级别
+   - 表示品牌具有某个定位级别
+   - 例如:丽思卡尔顿 HAS_LEVEL 奢华
+
+### 图结构示例
+
+#### 酒店职位结构
+```
+(前厅部:DataLabel {name: '前厅部', node_type: 'department'})
+    ↑ BELONGS_TO
+(前台接待:DataLabel {name: '前台接待', node_type: 'position'})
+    ↓ HAS_LEVEL
+(初级:DataLabel {name: '初级', node_type: 'position_level'})
+```
+
+#### 酒店集团品牌结构
+```
+(万豪国际:DataLabel {name: '万豪国际', node_type: 'group'})
+    ↑ BELONGS_TO
+(丽思卡尔顿:DataLabel {name: '丽思卡尔顿', node_type: 'brand'})
+    ↓ HAS_LEVEL
+(奢华:DataLabel {name: '奢华', node_type: 'brand_level'})
+```
+
+## 错误处理
+
+### 常见错误及解决方案
+
+1. **数据库连接失败**
+   - 检查数据库服务是否启动
+   - 验证连接字符串和认证信息
+   - 确认网络连接和防火墙设置
+
+2. **表不存在或权限不足**
+   - 确认 `hotel_positions` 和 `hotel_group_brands` 表存在
+   - 检查数据库用户权限
+   - 验证表结构是否符合预期
+
+3. **Neo4j节点创建失败**
+   - 检查Neo4j服务状态
+   - 验证用户权限
+   - 确认Cypher查询语法正确
+
+4. **关系创建失败**
+   - 确认相关节点已存在
+   - 检查Neo4j权限设置
+   - 验证关系类型是否支持
+
+### 错误日志
+所有错误都会记录在日志文件中,包括:
+- 错误类型和描述
+- 错误发生的具体位置
+- 完整的错误堆栈信息
+
+## 性能优化
+
+### 批量处理
+- 程序分别批量处理两个数据表,避免混合查询
+- 使用事务确保数据一致性
+- 关系创建使用MERGE操作,避免重复
+
+### 资源管理
+- 自动管理数据库连接池
+- 及时释放Neo4j会话资源
+- 程序结束时自动清理所有资源
+
+## 扩展功能
+
+### 自定义配置
+可以通过修改配置类来调整程序行为:
+- 数据库连接参数
+- 日志级别和格式
+- 批处理大小
+
+### 数据过滤
+可以修改SQL查询来添加更多过滤条件:
+- 按时间范围过滤
+- 按状态过滤
+- 按类型过滤
+
+### 节点属性扩展
+可以轻松添加更多节点属性:
+- 描述信息
+- 创建者信息
+- 标签分类
+- 自定义元数据
+
+### 关系扩展
+可以添加更多关系类型:
+- 职位之间的上下级关系
+- 部门之间的从属关系
+- 级别之间的层次关系
+- 集团之间的合作关系
+
+## 注意事项
+
+1. **数据一致性**: 程序使用事务确保数据一致性,如果中途失败会自动回滚
+2. **重复执行**: 程序可以安全地重复执行,不会产生重复数据
+3. **资源占用**: 程序会占用数据库连接,建议在低峰期执行
+4. **备份建议**: 执行前建议备份Neo4j数据库
+5. **关系创建**: 关系创建依赖于节点存在,确保节点创建成功后再创建关系
+6. **数据完整性**: 确保两个数据表的数据完整性和一致性
+
+## 技术支持
+
+如果遇到问题,请检查:
+1. 日志文件中的错误信息
+2. 数据库连接配置
+3. 网络连接状态
+4. 数据库服务状态
+5. Neo4j图数据库权限设置
+6. 数据表结构和数据质量
+
+## 版本历史
+
+- v1.0.0: 初始版本,支持基本的酒店职位数据同步功能
+- v1.1.0: 增强版本,支持职位名称、级别名称同步和节点关系创建
+- v1.2.0: 扩展版本,新增酒店集团品牌数据同步功能,支持完整的酒店数据生态 

+ 320 - 0
app/core/data_parse/USAGE_EXAMPLE.md

@@ -0,0 +1,320 @@
+# 酒店职位数据和酒店集团品牌数据Neo4j同步程序使用示例
+
+## 快速开始
+
+### 1. 环境准备
+
+确保您的环境满足以下要求:
+- Python 3.7+
+- PostgreSQL数据库(包含hotel_positions和hotel_group_brands表)
+- Neo4j图数据库
+- 必要的Python依赖包
+
+### 2. 配置检查
+
+确认 `app/config/config.py` 中的数据库配置正确:
+
+```python
+# PostgreSQL配置
+SQLALCHEMY_DATABASE_URI = 'postgresql://username:password@host:port/dataops'
+
+# Neo4j配置
+NEO4J_URI = "bolt://host:port"
+NEO4J_USER = "username"
+NEO4J_PASSWORD = "password"
+NEO4J_ENCRYPTED = False
+```
+
+### 3. 运行程序
+
+#### 方法1:直接运行Python脚本
+```bash
+# 在项目根目录下运行
+python app/core/data_parse/parse_neo4j_process.py
+```
+
+#### 方法2:使用批处理文件(Windows)
+```bash
+# 双击运行
+run_parse_neo4j.bat
+
+# 或在命令行中运行
+.\run_parse_neo4j.bat
+```
+
+#### 方法3:使用Shell脚本(Linux/Mac)
+```bash
+# 添加执行权限(首次运行)
+chmod +x run_parse_neo4j.sh
+
+# 运行脚本
+./run_parse_neo4j.sh
+```
+
+## 使用示例
+
+### 基本使用
+
+```python
+from app.core.data_parse.parse_neo4j_process import HotelPositionNeo4jProcessor
+
+# 创建处理器实例
+processor = HotelPositionNeo4jProcessor()
+
+# 运行同步程序
+success = processor.run()
+
+if success:
+    print("所有数据同步成功!")
+else:
+    print("部分或全部数据同步失败!")
+```
+
+### 自定义处理
+
+```python
+from app.core.data_parse.parse_neo4j_process import HotelPositionNeo4jProcessor
+
+# 创建处理器实例
+processor = HotelPositionNeo4jProcessor()
+
+# 连接数据库
+if processor.connect_postgresql() and processor.connect_neo4j():
+    
+    # 处理酒店职位数据
+    print("开始处理酒店职位数据...")
+    positions_result = processor.process_hotel_positions()
+    
+    if positions_result['success']:
+        print(f"酒店职位数据同步完成!")
+        print(f"部门节点: 新建 {positions_result['departments_created']}, 跳过 {positions_result['departments_skipped']}")
+        print(f"职位节点: 新建 {positions_result['positions_created']}, 跳过 {positions_result['positions_skipped']}")
+        print(f"级别节点: 新建 {positions_result['levels_created']}, 跳过 {positions_result['levels_skipped']}")
+        print(f"关系创建: {positions_result['relationships_created']}")
+    else:
+        print(f"酒店职位数据同步失败: {positions_result['message']}")
+    
+    # 处理酒店集团品牌数据
+    print("\n开始处理酒店集团品牌数据...")
+    brands_result = processor.process_hotel_group_brands()
+    
+    if brands_result['success']:
+        print(f"酒店集团品牌数据同步完成!")
+        print(f"集团节点: 新建 {brands_result['groups_created']}, 跳过 {brands_result['groups_skipped']}")
+        print(f"品牌节点: 新建 {brands_result['brands_created']}, 跳过 {brands_result['brands_skipped']}")
+        print(f"品牌级别节点: 新建 {brands_result['brand_levels_created']}, 跳过 {brands_result['brand_levels_skipped']}")
+        print(f"关系创建: {brands_result['relationships_created']}")
+    else:
+        print(f"酒店集团品牌数据同步失败: {brands_result['message']}")
+```
+
+### 分别获取数据
+
+```python
+from app.core.data_parse.parse_neo4j_process import HotelPositionNeo4jProcessor
+
+processor = HotelPositionNeo4jProcessor()
+
+if processor.connect_postgresql():
+    # 获取酒店职位数据
+    positions = processor.get_hotel_positions()
+    print(f"获取到 {len(positions)} 条酒店职位数据")
+    
+    # 获取酒店集团品牌数据
+    brands = processor.get_hotel_group_brands()
+    print(f"获取到 {len(brands)} 条酒店集团品牌数据")
+    
+    # 显示数据示例
+    if positions:
+        print("\n酒店职位数据示例:")
+        for i, pos in enumerate(positions[:3]):
+            print(f"  {i+1}. {pos['department_zh']} - {pos['position_zh']} - {pos['level_zh']}")
+    
+    if brands:
+        print("\n酒店集团品牌数据示例:")
+        for i, brand in enumerate(brands[:3]):
+            print(f"  {i+1}. {brand['group_name_zh']} - {brand['brand_name_zh']} - {brand['positioning_level_zh']}")
+```
+
+## 输出示例
+
+### 成功执行示例
+
+```
+2024-01-01 10:00:00 - INFO - 开始执行酒店职位数据和酒店集团品牌数据Neo4j同步程序
+2024-01-01 10:00:01 - INFO - PostgreSQL数据库连接成功
+2024-01-01 10:00:02 - INFO - Neo4j数据库连接成功
+2024-01-01 10:00:03 - INFO - 开始处理酒店职位数据...
+2024-01-01 10:00:04 - INFO - 成功获取 15 条酒店职位数据
+2024-01-01 10:00:05 - INFO - 成功创建部门节点: 前厅部
+2024-01-01 10:00:06 - INFO - 成功创建职位节点: 前台接待
+2024-01-01 10:00:07 - INFO - 成功创建级别节点: 初级
+2024-01-01 10:00:08 - INFO - 成功创建关系: 前台接待 BELONGS_TO 前厅部
+2024-01-01 10:00:09 - INFO - 成功创建关系: 前台接待 HAS_LEVEL 初级
+2024-01-01 10:00:10 - INFO - 部门节点已存在,跳过: 客房部
+2024-01-01 10:00:11 - INFO - 成功创建职位节点: 客房服务员
+2024-01-01 10:00:12 - INFO - 级别节点已存在,跳过: 初级
+2024-01-01 10:00:13 - INFO - 成功创建关系: 客房服务员 BELONGS_TO 客房部
+2024-01-01 10:00:14 - INFO - 成功创建关系: 客房服务员 HAS_LEVEL 初级
+...
+2024-01-01 10:00:20 - INFO - 酒店职位数据同步完成: 酒店职位数据同步完成
+2024-01-01 10:00:20 - INFO - 总计记录: 15
+2024-01-01 10:00:20 - INFO - 部门节点 - 新建: 8, 跳过: 2
+2024-01-01 10:00:20 - INFO - 职位节点 - 新建: 12, 跳过: 3
+2024-01-01 10:00:20 - INFO - 级别节点 - 新建: 5, 跳过: 1
+2024-01-01 10:00:20 - INFO - 关系创建: 30
+2024-01-01 10:00:21 - INFO - 开始处理酒店集团品牌数据...
+2024-01-01 10:00:22 - INFO - 成功获取 20 条酒店集团品牌数据
+2024-01-01 10:00:23 - INFO - 成功创建集团节点: 万豪国际
+2024-01-01 10:00:24 - INFO - 成功创建品牌节点: 丽思卡尔顿
+2024-01-01 10:00:25 - INFO - 成功创建品牌级别节点: 奢华
+2024-01-01 10:00:26 - INFO - 成功创建关系: 丽思卡尔顿 BELONGS_TO 万豪国际
+2024-01-01 10:00:27 - INFO - 成功创建关系: 丽思卡尔顿 HAS_LEVEL 奢华
+2024-01-01 10:00:28 - INFO - 集团节点已存在,跳过: 希尔顿
+2024-01-01 10:00:29 - INFO - 成功创建品牌节点: 华尔道夫
+2024-01-01 10:00:30 - INFO - 品牌级别节点已存在,跳过: 奢华
+2024-01-01 10:00:31 - INFO - 成功创建关系: 华尔道夫 BELONGS_TO 希尔顿
+2024-01-01 10:00:32 - INFO - 成功创建关系: 华尔道夫 HAS_LEVEL 奢华
+...
+2024-01-01 10:00:40 - INFO - 酒店集团品牌数据同步完成: 酒店集团品牌数据同步完成
+2024-01-01 10:00:40 - INFO - 总计记录: 20
+2024-01-01 10:00:40 - INFO - 集团节点 - 新建: 12, 跳过: 3
+2024-01-01 10:00:40 - INFO - 品牌节点 - 新建: 18, 跳过: 2
+2024-01-01 10:00:40 - INFO - 品牌级别节点 - 新建: 8, 跳过: 1
+2024-01-01 10:00:40 - INFO - 关系创建: 40
+2024-01-01 10:00:40 - INFO - 所有数据同步任务完成
+2024-01-01 10:00:40 - INFO - 程序执行完成,资源已清理
+```
+
+### 错误处理示例
+
+```
+2024-01-01 10:00:00 - INFO - 开始执行酒店职位数据和酒店集团品牌数据Neo4j同步程序
+2024-01-01 10:00:01 - ERROR - PostgreSQL数据库连接失败: (psycopg2.OperationalError) connection to server at "localhost" (127.0.0.1), port 5432 failed: Connection refused
+2024-01-01 10:00:01 - ERROR - 无法连接PostgreSQL数据库,程序退出
+2024-01-01 10:00:01 - INFO - 程序执行完成,资源已清理
+```
+
+## 数据验证
+
+### 在Neo4j中查看结果
+
+运行程序后,您可以在Neo4j浏览器中执行以下查询来验证结果:
+
+#### 查看所有节点
+```cypher
+MATCH (n:DataLabel) RETURN n LIMIT 20
+```
+
+#### 查看酒店职位相关节点
+```cypher
+// 查看部门节点
+MATCH (n:DataLabel {node_type: 'department'}) RETURN n
+
+// 查看职位节点
+MATCH (n:DataLabel {node_type: 'position'}) RETURN n
+
+// 查看职位级别节点
+MATCH (n:DataLabel {node_type: 'position_level'}) RETURN n
+```
+
+#### 查看酒店集团品牌相关节点
+```cypher
+// 查看集团节点
+MATCH (n:DataLabel {node_type: 'group'}) RETURN n
+
+// 查看品牌节点
+MATCH (n:DataLabel {node_type: 'brand'}) RETURN n
+
+// 查看品牌级别节点
+MATCH (n:DataLabel {node_type: 'brand_level'}) RETURN n
+```
+
+#### 查看关系
+```cypher
+MATCH (from:DataLabel)-[r]->(to:DataLabel) 
+RETURN from.name, type(r), to.name
+```
+
+#### 查看完整的酒店职位结构
+```cypher
+MATCH (dept:DataLabel {node_type: 'department'})
+MATCH (pos:DataLabel {node_type: 'position'})
+MATCH (level:DataLabel {node_type: 'position_level'})
+MATCH (pos)-[:BELONGS_TO]->(dept)
+MATCH (pos)-[:HAS_LEVEL]->(level)
+RETURN dept.name as 部门, pos.name as 职位, level.name as 级别
+ORDER BY dept.name, pos.name
+```
+
+#### 查看完整的酒店集团品牌结构
+```cypher
+MATCH (group:DataLabel {node_type: 'group'})
+MATCH (brand:DataLabel {node_type: 'brand'})
+MATCH (level:DataLabel {node_type: 'brand_level'})
+MATCH (brand)-[:BELONGS_TO]->(group)
+MATCH (brand)-[:HAS_LEVEL]->(level)
+RETURN group.name as 集团, brand.name as 品牌, level.name as 级别
+ORDER BY group.name, brand.name
+```
+
+#### 查看所有节点类型统计
+```cypher
+MATCH (n:DataLabel)
+RETURN n.node_type as 节点类型, count(n) as 数量
+ORDER BY 数量 DESC
+```
+
+#### 查看关系类型统计
+```cypher
+MATCH ()-[r]->()
+RETURN type(r) as 关系类型, count(r) as 数量
+ORDER BY 数量 DESC
+```
+
+## 常见问题
+
+### Q: 程序运行时提示"模块导入失败"
+A: 确保您在项目根目录下运行程序,或者将项目根目录添加到Python路径中。
+
+### Q: 数据库连接失败
+A: 检查数据库服务是否启动,连接字符串是否正确,网络连接是否正常。
+
+### Q: 表不存在或权限不足
+A: 确认 `hotel_positions` 和 `hotel_group_brands` 表存在,检查数据库用户权限。
+
+### Q: Neo4j节点创建失败
+A: 检查Neo4j服务状态,验证用户权限,确认Cypher查询语法正确。
+
+### Q: 关系创建失败
+A: 确保相关节点已存在,检查Neo4j权限设置。
+
+### Q: 如何重复运行程序
+A: 程序可以安全地重复运行,会自动跳过已存在的节点,只创建新的节点和关系。
+
+### Q: 如何处理部分数据同步失败
+A: 程序会分别处理两个数据表,即使其中一个失败,另一个仍会继续执行。检查日志了解具体失败原因。
+
+## 性能优化建议
+
+1. **批量处理**: 程序已优化为批量处理,避免多次数据库查询
+2. **事务管理**: 使用事务确保数据一致性
+3. **资源管理**: 自动管理数据库连接和会话资源
+4. **去重逻辑**: 避免重复创建节点和关系
+5. **并行处理**: 可以考虑将两个数据表的处理改为并行执行(需要进一步优化)
+
+## 监控和日志
+
+- 程序会在控制台显示实时进度
+- 详细日志保存在 `logs/parse_neo4j_process.log` 文件中
+- 可以通过日志文件排查问题和监控执行情况
+- 支持分别监控两个数据表的处理进度
+
+## 扩展建议
+
+1. **定时任务**: 可以设置为定时任务,定期同步数据
+2. **增量同步**: 可以扩展为只同步新增或修改的数据
+3. **数据验证**: 可以添加数据质量检查和验证功能
+4. **性能监控**: 可以添加执行时间统计和性能指标
+5. **并行处理**: 可以优化为并行处理两个数据表,提高执行效率
+6. **数据一致性检查**: 可以添加数据一致性验证功能 

+ 608 - 0
app/core/data_parse/parse_neo4j_process.py

@@ -0,0 +1,608 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+酒店职位数据和酒店集团品牌数据同步到Neo4j图数据库程序
+
+该程序通过读取config配置信息,访问PostgreSQL数据库表dataops/public/hotel_positions和hotel_group_brands,
+依次读取数据表中的每一条记录,将其中相关字段内容添加到neo4j图数据库中的DataLabel节点,并创建节点之间的关系。
+
+DataLabel节点的属性定义:
+- name: 对应为字段值(department_zh/position_zh/level_zh/group_name_zh/brand_name_zh/positioning_level_zh)
+- en_name: 对应为英文名称(department_en/position_en/level_en/group_name_en/brand_name_en/positioning_level_en)
+- describe: 空字符串
+- time: 当前系统时间
+- category: "人才地图"
+- status: "active"
+- node_type: "department"/"position"/"position_level"/"group"/"brand"/"brand_level"
+
+节点关系:
+- position_zh节点与department_zh节点:BELONGS_TO关系
+- position_zh节点与level_zh节点:HAS_LEVEL关系
+- brand_name_zh节点与group_name_zh节点:BELONGS_TO关系
+- brand_name_zh节点与positioning_level_zh节点:HAS_LEVEL关系
+
+添加时进行判断,若已经有name相同的节点,则不重复添加。
+
+使用方法:
+python parse_neo4j_process.py
+"""
+
+import os
+import sys
+import logging
+from datetime import datetime
+from typing import Dict, Any, List, Tuple
+
+# 添加项目根目录到Python路径
+current_dir = os.path.dirname(os.path.abspath(__file__))
+project_root = os.path.dirname(os.path.dirname(os.path.dirname(current_dir)))
+sys.path.insert(0, project_root)
+
+try:
+    from app.config.config import config, current_env
+    from app.services.neo4j_driver import Neo4jDriver
+    from sqlalchemy import create_engine, text
+    from sqlalchemy.exc import SQLAlchemyError
+except ImportError as e:
+    print(f"导入模块失败: {e}")
+    print("请确保在正确的环境中运行此脚本")
+    sys.exit(1)
+
+# 配置日志
+def setup_logging():
+    """配置日志"""
+    log_format = '%(asctime)s - %(levelname)s - %(filename)s - %(funcName)s - %(lineno)s - %(message)s'
+    
+    # 创建logs目录(如果不存在)
+    log_dir = os.path.join(project_root, 'logs')
+    os.makedirs(log_dir, exist_ok=True)
+    
+    # 配置日志
+    logging.basicConfig(
+        level=logging.INFO,
+        format=log_format,
+        handlers=[
+            logging.FileHandler(os.path.join(log_dir, 'parse_neo4j_process.log'), encoding='utf-8'),
+            logging.StreamHandler(sys.stdout)
+        ]
+    )
+    
+    return logging.getLogger(__name__)
+
+class HotelPositionNeo4jProcessor:
+    """酒店职位数据和酒店集团品牌数据Neo4j处理器"""
+    
+    def __init__(self):
+        """初始化处理器"""
+        self.logger = logging.getLogger(__name__)
+        self.config = config[current_env]()
+        self.pg_engine = None
+        self.neo4j_driver = None
+        
+    def connect_postgresql(self):
+        """连接PostgreSQL数据库"""
+        try:
+            self.pg_engine = create_engine(self.config.SQLALCHEMY_DATABASE_URI)
+            # 测试连接
+            with self.pg_engine.connect() as conn:
+                conn.execute(text("SELECT 1"))
+            self.logger.info("PostgreSQL数据库连接成功")
+            return True
+        except SQLAlchemyError as e:
+            self.logger.error(f"PostgreSQL数据库连接失败: {e}")
+            return False
+        except Exception as e:
+            self.logger.error(f"连接PostgreSQL时发生未知错误: {e}")
+            return False
+    
+    def connect_neo4j(self):
+        """连接Neo4j数据库"""
+        try:
+            self.neo4j_driver = Neo4jDriver()
+            if self.neo4j_driver.verify_connectivity():
+                self.logger.info("Neo4j数据库连接成功")
+                return True
+            else:
+                self.logger.error("Neo4j数据库连接失败")
+                return False
+        except Exception as e:
+            self.logger.error(f"连接Neo4j时发生未知错误: {e}")
+            return False
+    
+    def get_hotel_positions(self) -> List[Dict[str, Any]]:
+        """从PostgreSQL数据库获取酒店职位数据"""
+        try:
+            query = """
+                SELECT DISTINCT 
+                    department_zh, department_en,
+                    position_zh, position_en,
+                    level_zh, level_en
+                FROM hotel_positions 
+                WHERE department_zh IS NOT NULL 
+                AND department_zh != ''
+                AND position_zh IS NOT NULL
+                AND position_zh != ''
+                AND level_zh IS NOT NULL
+                AND level_zh != ''
+                AND status = 'active'
+                ORDER BY department_zh, position_zh, level_zh
+            """
+            
+            with self.pg_engine.connect() as conn:
+                result = conn.execute(text(query))
+                positions = []
+                for row in result:
+                    positions.append({
+                        'department_zh': row[0],
+                        'department_en': row[1] or '',
+                        'position_zh': row[2],
+                        'position_en': row[3] or '',
+                        'level_zh': row[4],
+                        'level_en': row[5] or ''
+                    })
+                
+            self.logger.info(f"成功获取 {len(positions)} 条酒店职位数据")
+            return positions
+            
+        except SQLAlchemyError as e:
+            self.logger.error(f"查询PostgreSQL数据库失败: {e}")
+            return []
+        except Exception as e:
+            self.logger.error(f"获取酒店职位数据时发生未知错误: {e}")
+            return []
+    
+    def get_hotel_group_brands(self) -> List[Dict[str, Any]]:
+        """从PostgreSQL数据库获取酒店集团品牌数据"""
+        try:
+            query = """
+                SELECT DISTINCT 
+                    group_name_zh, group_name_en,
+                    brand_name_zh, brand_name_en,
+                    positioning_level_zh, positioning_level_en
+                FROM hotel_group_brands 
+                WHERE group_name_zh IS NOT NULL 
+                AND group_name_zh != ''
+                AND brand_name_zh IS NOT NULL
+                AND brand_name_zh != ''
+                AND positioning_level_zh IS NOT NULL
+                AND positioning_level_zh != ''
+                AND status = 'active'
+                ORDER BY group_name_zh, brand_name_zh, positioning_level_zh
+            """
+            
+            with self.pg_engine.connect() as conn:
+                result = conn.execute(text(query))
+                brands = []
+                for row in result:
+                    brands.append({
+                        'group_name_zh': row[0],
+                        'group_name_en': row[1] or '',
+                        'brand_name_zh': row[2],
+                        'brand_name_en': row[3] or '',
+                        'positioning_level_zh': row[4],
+                        'positioning_level_en': row[5] or ''
+                    })
+                
+            self.logger.info(f"成功获取 {len(brands)} 条酒店集团品牌数据")
+            return brands
+            
+        except SQLAlchemyError as e:
+            self.logger.error(f"查询PostgreSQL数据库失败: {e}")
+            return []
+        except Exception as e:
+            self.logger.error(f"获取酒店集团品牌数据时发生未知错误: {e}")
+            return []
+    
+    def check_neo4j_node_exists(self, session, name: str) -> bool:
+        """检查Neo4j中是否已存在相同name的DataLabel节点"""
+        try:
+            query = "MATCH (n:DataLabel {name: $name}) RETURN n LIMIT 1"
+            result = session.run(query, name=name)
+            return result.single() is not None
+        except Exception as e:
+            self.logger.error(f"检查Neo4j节点存在性时发生错误: {e}")
+            return False
+    
+    def create_neo4j_node(self, session, node_data: Dict[str, str], node_type: str) -> bool:
+        """在Neo4j中创建DataLabel节点"""
+        try:
+            current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+            
+            query = """
+                CREATE (n:DataLabel {
+                    name: $name,
+                    en_name: $en_name,
+                    describe: $describe,
+                    time: $time,
+                    category: $category,
+                    status: $status,
+                    node_type: $node_type
+                })
+            """
+            
+            parameters = {
+                'name': node_data['name'],
+                'en_name': node_data['en_name'],
+                'describe': '',
+                'time': current_time,
+                'category': '人才地图',
+                'status': 'active',
+                'node_type': node_type
+            }
+            
+            session.run(query, **parameters)
+            return True
+            
+        except Exception as e:
+            self.logger.error(f"创建Neo4j节点时发生错误: {e}")
+            return False
+    
+    def create_relationship(self, session, from_name: str, to_name: str, relationship_type: str) -> bool:
+        """创建两个DataLabel节点之间的关系"""
+        try:
+            query = """
+                MATCH (from:DataLabel {name: $from_name})
+                MATCH (to:DataLabel {name: $to_name})
+                MERGE (from)-[r:$relationship_type]->(to)
+                RETURN r
+            """
+            
+            # 使用参数化查询避免Cypher注入
+            if relationship_type == "BELONGS_TO":
+                query = """
+                    MATCH (from:DataLabel {name: $from_name})
+                    MATCH (to:DataLabel {name: $to_name})
+                    MERGE (from)-[r:BELONGS_TO]->(to)
+                    RETURN r
+                """
+            elif relationship_type == "HAS_LEVEL":
+                query = """
+                    MATCH (from:DataLabel {name: $from_name})
+                    MATCH (to:DataLabel {name: $to_name})
+                    MERGE (from)-[r:HAS_LEVEL]->(to)
+                    RETURN r
+                """
+            
+            result = session.run(query, from_name=from_name, to_name=to_name)
+            return result.single() is not None
+            
+        except Exception as e:
+            self.logger.error(f"创建关系时发生错误: {e}")
+            return False
+    
+    def process_hotel_positions(self) -> Dict[str, Any]:
+        """处理酒店职位数据同步到Neo4j"""
+        try:
+            # 获取酒店职位数据
+            positions = self.get_hotel_positions()
+            if not positions:
+                return {
+                    'success': False,
+                    'message': '没有获取到酒店职位数据',
+                    'total': 0,
+                    'departments_created': 0,
+                    'departments_skipped': 0,
+                    'positions_created': 0,
+                    'positions_skipped': 0,
+                    'levels_created': 0,
+                    'levels_skipped': 0,
+                    'relationships_created': 0
+                }
+            
+            total_count = len(positions)
+            departments_created = 0
+            departments_skipped = 0
+            positions_created = 0
+            positions_skipped = 0
+            levels_created = 0
+            levels_skipped = 0
+            relationships_created = 0
+            
+            # 获取Neo4j会话
+            with self.neo4j_driver.get_session() as session:
+                for position in positions:
+                    department_zh = position['department_zh']
+                    position_zh = position['position_zh']
+                    level_zh = position['level_zh']
+                    
+                    # 处理部门节点
+                    if not self.check_neo4j_node_exists(session, department_zh):
+                        dept_data = {
+                            'name': department_zh,
+                            'en_name': position['department_en']
+                        }
+                        if self.create_neo4j_node(session, dept_data, 'department'):
+                            self.logger.info(f"成功创建部门节点: {department_zh}")
+                            departments_created += 1
+                        else:
+                            self.logger.error(f"创建部门节点失败: {department_zh}")
+                    else:
+                        self.logger.info(f"部门节点已存在,跳过: {department_zh}")
+                        departments_skipped += 1
+                    
+                    # 处理职位节点
+                    if not self.check_neo4j_node_exists(session, position_zh):
+                        pos_data = {
+                            'name': position_zh,
+                            'en_name': position['position_en']
+                        }
+                        if self.create_neo4j_node(session, pos_data, 'position'):
+                            self.logger.info(f"成功创建职位节点: {position_zh}")
+                            positions_created += 1
+                        else:
+                            self.logger.error(f"创建职位节点失败: {position_zh}")
+                    else:
+                        self.logger.info(f"职位节点已存在,跳过: {position_zh}")
+                        positions_skipped += 1
+                    
+                    # 处理级别节点
+                    if not self.check_neo4j_node_exists(session, level_zh):
+                        level_data = {
+                            'name': level_zh,
+                            'en_name': position['level_en']
+                        }
+                        if self.create_neo4j_node(session, level_data, 'position_level'):
+                            self.logger.info(f"成功创建级别节点: {level_zh}")
+                            levels_created += 1
+                        else:
+                            self.logger.error(f"创建级别节点失败: {level_zh}")
+                    else:
+                        self.logger.info(f"级别节点已存在,跳过: {level_zh}")
+                        levels_skipped += 1
+                    
+                    # 创建关系
+                    # 职位属于部门的关系
+                    if self.create_relationship(session, position_zh, department_zh, "BELONGS_TO"):
+                        self.logger.info(f"成功创建关系: {position_zh} BELONGS_TO {department_zh}")
+                        relationships_created += 1
+                    else:
+                        self.logger.error(f"创建关系失败: {position_zh} BELONGS_TO {department_zh}")
+                    
+                    # 职位具有级别的关系
+                    if self.create_relationship(session, position_zh, level_zh, "HAS_LEVEL"):
+                        self.logger.info(f"成功创建关系: {position_zh} HAS_LEVEL {level_zh}")
+                        relationships_created += 1
+                    else:
+                        self.logger.error(f"创建关系失败: {position_zh} HAS_LEVEL {level_zh}")
+            
+            return {
+                'success': True,
+                'message': '酒店职位数据同步完成',
+                'total': total_count,
+                'departments_created': departments_created,
+                'departments_skipped': departments_skipped,
+                'positions_created': positions_created,
+                'positions_skipped': positions_skipped,
+                'levels_created': levels_created,
+                'levels_skipped': levels_skipped,
+                'relationships_created': relationships_created
+            }
+            
+        except Exception as e:
+            self.logger.error(f"处理酒店职位数据时发生错误: {e}")
+            return {
+                'success': False,
+                'message': f'处理失败: {str(e)}',
+                'total': 0,
+                'departments_created': 0,
+                'departments_skipped': 0,
+                'positions_created': 0,
+                'positions_skipped': 0,
+                'levels_created': 0,
+                'levels_skipped': 0,
+                'relationships_created': 0
+            }
+    
+    def process_hotel_group_brands(self) -> Dict[str, Any]:
+        """处理酒店集团品牌数据同步到Neo4j"""
+        try:
+            # 获取酒店集团品牌数据
+            brands = self.get_hotel_group_brands()
+            if not brands:
+                return {
+                    'success': False,
+                    'message': '没有获取到酒店集团品牌数据',
+                    'total': 0,
+                    'groups_created': 0,
+                    'groups_skipped': 0,
+                    'brands_created': 0,
+                    'brands_skipped': 0,
+                    'brand_levels_created': 0,
+                    'brand_levels_skipped': 0,
+                    'relationships_created': 0
+                }
+            
+            total_count = len(brands)
+            groups_created = 0
+            groups_skipped = 0
+            brands_created = 0
+            brands_skipped = 0
+            brand_levels_created = 0
+            brand_levels_skipped = 0
+            relationships_created = 0
+            
+            # 获取Neo4j会话
+            with self.neo4j_driver.get_session() as session:
+                for brand in brands:
+                    group_name_zh = brand['group_name_zh']
+                    brand_name_zh = brand['brand_name_zh']
+                    positioning_level_zh = brand['positioning_level_zh']
+                    
+                    # 处理集团节点
+                    if not self.check_neo4j_node_exists(session, group_name_zh):
+                        group_data = {
+                            'name': group_name_zh,
+                            'en_name': brand['group_name_en']
+                        }
+                        if self.create_neo4j_node(session, group_data, 'group'):
+                            self.logger.info(f"成功创建集团节点: {group_name_zh}")
+                            groups_created += 1
+                        else:
+                            self.logger.error(f"创建集团节点失败: {group_name_zh}")
+                    else:
+                        self.logger.info(f"集团节点已存在,跳过: {group_name_zh}")
+                        groups_skipped += 1
+                    
+                    # 处理品牌节点
+                    if not self.check_neo4j_node_exists(session, brand_name_zh):
+                        brand_data = {
+                            'name': brand_name_zh,
+                            'en_name': brand['brand_name_en']
+                        }
+                        if self.create_neo4j_node(session, brand_data, 'brand'):
+                            self.logger.info(f"成功创建品牌节点: {brand_name_zh}")
+                            brands_created += 1
+                        else:
+                            self.logger.error(f"创建品牌节点失败: {brand_name_zh}")
+                    else:
+                        self.logger.info(f"品牌节点已存在,跳过: {brand_name_zh}")
+                        brands_skipped += 1
+                    
+                    # 处理品牌级别节点
+                    if not self.check_neo4j_node_exists(session, positioning_level_zh):
+                        level_data = {
+                            'name': positioning_level_zh,
+                            'en_name': brand['positioning_level_en']
+                        }
+                        if self.create_neo4j_node(session, level_data, 'brand_level'):
+                            self.logger.info(f"成功创建品牌级别节点: {positioning_level_zh}")
+                            brand_levels_created += 1
+                        else:
+                            self.logger.error(f"创建品牌级别节点失败: {positioning_level_zh}")
+                    else:
+                        self.logger.info(f"品牌级别节点已存在,跳过: {positioning_level_zh}")
+                        brand_levels_skipped += 1
+                    
+                    # 创建关系
+                    # 品牌属于集团的关系
+                    if self.create_relationship(session, brand_name_zh, group_name_zh, "BELONGS_TO"):
+                        self.logger.info(f"成功创建关系: {brand_name_zh} BELONGS_TO {group_name_zh}")
+                        relationships_created += 1
+                    else:
+                        self.logger.error(f"创建关系失败: {brand_name_zh} BELONGS_TO {group_name_zh}")
+                    
+                    # 品牌具有级别的关系
+                    if self.create_relationship(session, brand_name_zh, positioning_level_zh, "HAS_LEVEL"):
+                        self.logger.info(f"成功创建关系: {brand_name_zh} HAS_LEVEL {positioning_level_zh}")
+                        relationships_created += 1
+                    else:
+                        self.logger.error(f"创建关系失败: {brand_name_zh} HAS_LEVEL {positioning_level_zh}")
+            
+            return {
+                'success': True,
+                'message': '酒店集团品牌数据同步完成',
+                'total': total_count,
+                'groups_created': groups_created,
+                'groups_skipped': groups_skipped,
+                'brands_created': brands_created,
+                'brands_skipped': brands_skipped,
+                'brand_levels_created': brand_levels_created,
+                'brand_levels_skipped': brand_levels_skipped,
+                'relationships_created': relationships_created
+            }
+            
+        except Exception as e:
+            self.logger.error(f"处理酒店集团品牌数据时发生错误: {e}")
+            return {
+                'success': False,
+                'message': f'处理失败: {str(e)}',
+                'total': 0,
+                'groups_created': 0,
+                'groups_skipped': 0,
+                'brands_created': 0,
+                'brands_skipped': 0,
+                'brand_levels_created': 0,
+                'brand_levels_skipped': 0,
+                'relationships_created': 0
+            }
+    
+    def run(self) -> bool:
+        """运行主程序"""
+        self.logger.info("开始执行酒店职位数据和酒店集团品牌数据Neo4j同步程序")
+        
+        try:
+            # 连接数据库
+            if not self.connect_postgresql():
+                self.logger.error("无法连接PostgreSQL数据库,程序退出")
+                return False
+            
+            if not self.connect_neo4j():
+                self.logger.error("无法连接Neo4j数据库,程序退出")
+                return False
+            
+            # 处理酒店职位数据同步
+            self.logger.info("开始处理酒店职位数据...")
+            positions_result = self.process_hotel_positions()
+            
+            if positions_result['success']:
+                self.logger.info(f"酒店职位数据同步完成: {positions_result['message']}")
+                self.logger.info(f"总计记录: {positions_result['total']}")
+                self.logger.info(f"部门节点 - 新建: {positions_result['departments_created']}, 跳过: {positions_result['departments_skipped']}")
+                self.logger.info(f"职位节点 - 新建: {positions_result['positions_created']}, 跳过: {positions_result['positions_skipped']}")
+                self.logger.info(f"级别节点 - 新建: {positions_result['levels_created']}, 跳过: {positions_result['levels_skipped']}")
+                self.logger.info(f"关系创建: {positions_result['relationships_created']}")
+            else:
+                self.logger.error(f"酒店职位数据同步失败: {positions_result['message']}")
+            
+            # 处理酒店集团品牌数据同步
+            self.logger.info("开始处理酒店集团品牌数据...")
+            brands_result = self.process_hotel_group_brands()
+            
+            if brands_result['success']:
+                self.logger.info(f"酒店集团品牌数据同步完成: {brands_result['message']}")
+                self.logger.info(f"总计记录: {brands_result['total']}")
+                self.logger.info(f"集团节点 - 新建: {brands_result['groups_created']}, 跳过: {brands_result['groups_skipped']}")
+                self.logger.info(f"品牌节点 - 新建: {brands_result['brands_created']}, 跳过: {brands_result['brands_skipped']}")
+                self.logger.info(f"品牌级别节点 - 新建: {brands_result['brand_levels_created']}, 跳过: {brands_result['brand_levels_skipped']}")
+                self.logger.info(f"关系创建: {brands_result['relationships_created']}")
+            else:
+                self.logger.error(f"酒店集团品牌数据同步失败: {brands_result['message']}")
+            
+            # 判断整体执行结果
+            overall_success = positions_result['success'] and brands_result['success']
+            
+            if overall_success:
+                self.logger.info("所有数据同步任务完成")
+            else:
+                self.logger.warning("部分数据同步任务失败")
+            
+            return overall_success
+            
+        except Exception as e:
+            self.logger.error(f"程序执行过程中发生未知错误: {e}")
+            return False
+        
+        finally:
+            # 清理资源
+            if self.pg_engine:
+                self.pg_engine.dispose()
+            if self.neo4j_driver:
+                self.neo4j_driver.close()
+            self.logger.info("程序执行完成,资源已清理")
+
+def main():
+    """主函数"""
+    # 设置日志
+    logger = setup_logging()
+    
+    try:
+        # 创建处理器并运行
+        processor = HotelPositionNeo4jProcessor()
+        success = processor.run()
+        
+        if success:
+            logger.info("程序执行成功")
+            sys.exit(0)
+        else:
+            logger.error("程序执行失败")
+            sys.exit(1)
+            
+    except KeyboardInterrupt:
+        logger.info("程序被用户中断")
+        sys.exit(0)
+    except Exception as e:
+        logger.error(f"程序执行时发生未处理的错误: {e}")
+        sys.exit(1)
+
+if __name__ == "__main__":
+    main() 

+ 48 - 0
run_parse_neo4j.bat

@@ -0,0 +1,48 @@
+@echo off
+chcp 65001 >nul
+echo ========================================
+echo 酒店职位数据Neo4j同步程序
+echo ========================================
+echo.
+
+REM 检查Python是否安装
+python --version >nul 2>&1
+if errorlevel 1 (
+    echo 错误: 未找到Python,请先安装Python 3.7+
+    pause
+    exit /b 1
+)
+
+echo Python环境检查通过
+echo.
+
+REM 检查程序文件是否存在
+if not exist "app\core\data_parse\parse_neo4j_process.py" (
+    echo 错误: 找不到程序文件 app\core\data_parse\parse_neo4j_process.py
+    pause
+    exit /b 1
+)
+
+echo 程序文件检查通过
+echo.
+
+echo 开始执行数据同步程序...
+echo 时间: %date% %time%
+echo.
+
+REM 运行Python程序
+python app\core\data_parse\parse_neo4j_process.py
+
+REM 检查执行结果
+if errorlevel 1 (
+    echo.
+    echo ❌ 程序执行失败
+    echo 请检查日志文件 logs\parse_neo4j_process.log 获取详细错误信息
+) else (
+    echo.
+    echo ✅ 程序执行成功
+)
+
+echo.
+echo 按任意键退出...
+pause >nul 

+ 53 - 0
run_parse_neo4j.sh

@@ -0,0 +1,53 @@
+#!/bin/bash
+
+# 酒店职位数据Neo4j同步程序运行脚本
+# 适用于Linux和Mac环境
+
+echo "========================================"
+echo "酒店职位数据Neo4j同步程序"
+echo "========================================"
+echo
+
+# 检查Python是否安装
+if ! command -v python3 &> /dev/null; then
+    if ! command -v python &> /dev/null; then
+        echo "错误: 未找到Python,请先安装Python 3.7+"
+        exit 1
+    else
+        PYTHON_CMD="python"
+    fi
+else
+    PYTHON_CMD="python3"
+fi
+
+echo "Python环境检查通过: $($PYTHON_CMD --version)"
+echo
+
+# 检查程序文件是否存在
+if [ ! -f "app/core/data_parse/parse_neo4j_process.py" ]; then
+    echo "错误: 找不到程序文件 app/core/data_parse/parse_neo4j_process.py"
+    exit 1
+fi
+
+echo "程序文件检查通过"
+echo
+
+echo "开始执行数据同步程序..."
+echo "时间: $(date)"
+echo
+
+# 运行Python程序
+$PYTHON_CMD app/core/data_parse/parse_neo4j_process.py
+
+# 检查执行结果
+if [ $? -eq 0 ]; then
+    echo
+    echo "✅ 程序执行成功"
+else
+    echo
+    echo "❌ 程序执行失败"
+    echo "请检查日志文件 logs/parse_neo4j_process.log 获取详细错误信息"
+fi
+
+echo
+echo "程序执行完成" 

+ 157 - 0
test_enhanced_parse_neo4j.py

@@ -0,0 +1,157 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+测试增强后的 parse_neo4j_process.py 程序
+
+该脚本用于测试酒店职位数据Neo4j同步程序的增强功能,包括:
+- 职位名称和级别名称的同步
+- 节点关系的创建
+"""
+
+import os
+import sys
+import subprocess
+import time
+
+def test_enhanced_parse_neo4j_process():
+    """测试增强后的parse_neo4j_process.py程序"""
+    
+    # 获取脚本路径
+    script_path = os.path.join('app', 'core', 'data_parse', 'parse_neo4j_process.py')
+    
+    if not os.path.exists(script_path):
+        print(f"错误: 找不到脚本文件 {script_path}")
+        return False
+    
+    print(f"开始测试增强版脚本: {script_path}")
+    print("=" * 60)
+    
+    try:
+        # 运行脚本
+        start_time = time.time()
+        result = subprocess.run(
+            [sys.executable, script_path],
+            capture_output=True,
+            text=True,
+            encoding='utf-8',
+            cwd=os.getcwd()
+        )
+        end_time = time.time()
+        
+        # 输出结果
+        print(f"脚本执行时间: {end_time - start_time:.2f} 秒")
+        print(f"返回码: {result.returncode}")
+        print("\n标准输出:")
+        print(result.stdout)
+        
+        if result.stderr:
+            print("\n标准错误:")
+            print(result.stderr)
+        
+        # 判断执行结果
+        if result.returncode == 0:
+            print("\n✅ 增强版脚本执行成功!")
+            
+            # 检查输出中是否包含新的功能信息
+            if "职位节点" in result.stdout and "级别节点" in result.stdout:
+                print("✅ 检测到职位和级别节点创建功能")
+            else:
+                print("⚠️  未检测到职位和级别节点创建功能")
+            
+            if "BELONGS_TO" in result.stdout and "HAS_LEVEL" in result.stdout:
+                print("✅ 检测到节点关系创建功能")
+            else:
+                print("⚠️  未检测到节点关系创建功能")
+                
+        else:
+            print(f"\n❌ 增强版脚本执行失败,返回码: {result.returncode}")
+            return False
+            
+        return True
+        
+    except Exception as e:
+        print(f"执行增强版脚本时发生错误: {e}")
+        return False
+
+def test_import_enhanced_modules():
+    """测试增强后的模块导入"""
+    print("测试增强后的模块导入...")
+    
+    try:
+        # 测试配置模块
+        from app.config.config import config, current_env
+        print("✅ 配置模块导入成功")
+        
+        # 测试Neo4j驱动模块
+        from app.services.neo4j_driver import Neo4jDriver
+        print("✅ Neo4j驱动模块导入成功")
+        
+        # 测试SQLAlchemy模块
+        from sqlalchemy import create_engine, text
+        print("✅ SQLAlchemy模块导入成功")
+        
+        # 测试增强后的处理器类
+        from app.core.data_parse.parse_neo4j_process import HotelPositionNeo4jProcessor
+        print("✅ 增强后的处理器类导入成功")
+        
+        return True
+        
+    except ImportError as e:
+        print(f"❌ 模块导入失败: {e}")
+        return False
+
+def test_enhanced_functionality():
+    """测试增强功能"""
+    print("\n测试增强功能...")
+    
+    try:
+        from app.core.data_parse.parse_neo4j_process import HotelPositionNeo4jProcessor
+        
+        # 创建处理器实例
+        processor = HotelPositionNeo4jProcessor()
+        print("✅ 成功创建处理器实例")
+        
+        # 检查方法是否存在
+        if hasattr(processor, 'create_relationship'):
+            print("✅ 检测到关系创建方法")
+        else:
+            print("❌ 未检测到关系创建方法")
+            
+        if hasattr(processor, 'process_hotel_data'):
+            print("✅ 检测到酒店数据处理方法")
+        else:
+            print("❌ 未检测到酒店数据处理方法")
+            
+        return True
+        
+    except Exception as e:
+        print(f"❌ 测试增强功能时发生错误: {e}")
+        return False
+
+def main():
+    """主函数"""
+    print("酒店职位数据Neo4j同步程序增强功能测试")
+    print("=" * 60)
+    
+    # 测试模块导入
+    if not test_import_enhanced_modules():
+        print("\n模块导入测试失败,跳过其他测试")
+        return
+    
+    print("\n" + "=" * 60)
+    
+    # 测试增强功能
+    if not test_enhanced_functionality():
+        print("\n增强功能测试失败,跳过脚本执行测试")
+        return
+    
+    print("\n" + "=" * 60)
+    
+    # 测试脚本执行
+    if test_enhanced_parse_neo4j_process():
+        print("\n🎉 所有增强功能测试通过!")
+    else:
+        print("\n💥 增强功能测试失败!")
+
+if __name__ == "__main__":
+    main() 

+ 212 - 0
test_hotel_group_brands.py

@@ -0,0 +1,212 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+测试酒店集团品牌数据处理功能
+
+该脚本用于测试parse_neo4j_process.py程序中新增的酒店集团品牌数据同步功能
+"""
+
+import os
+import sys
+import subprocess
+import time
+
+def test_hotel_group_brands_functionality():
+    """测试酒店集团品牌数据处理功能"""
+    
+    # 获取脚本路径
+    script_path = os.path.join('app', 'core', 'data_parse', 'parse_neo4j_process.py')
+    
+    if not os.path.exists(script_path):
+        print(f"错误: 找不到脚本文件 {script_path}")
+        return False
+    
+    print(f"开始测试酒店集团品牌数据处理功能: {script_path}")
+    print("=" * 70)
+    
+    try:
+        # 运行脚本
+        start_time = time.time()
+        result = subprocess.run(
+            [sys.executable, script_path],
+            capture_output=True,
+            text=True,
+            encoding='utf-8',
+            cwd=os.getcwd()
+        )
+        end_time = time.time()
+        
+        # 输出结果
+        print(f"脚本执行时间: {end_time - start_time:.2f} 秒")
+        print(f"返回码: {result.returncode}")
+        print("\n标准输出:")
+        print(result.stdout)
+        
+        if result.stderr:
+            print("\n标准错误:")
+            print(result.stderr)
+        
+        # 判断执行结果
+        if result.returncode == 0:
+            print("\n✅ 脚本执行成功!")
+            
+            # 检查输出中是否包含酒店集团品牌相关功能信息
+            if "酒店集团品牌数据" in result.stdout:
+                print("✅ 检测到酒店集团品牌数据处理功能")
+            else:
+                print("⚠️  未检测到酒店集团品牌数据处理功能")
+            
+            if "集团节点" in result.stdout and "品牌节点" in result.stdout:
+                print("✅ 检测到集团和品牌节点创建功能")
+            else:
+                print("⚠️  未检测到集团和品牌节点创建功能")
+            
+            if "品牌级别节点" in result.stdout:
+                print("✅ 检测到品牌级别节点创建功能")
+            else:
+                print("⚠️  未检测到品牌级别节点创建功能")
+                
+            if "BELONGS_TO" in result.stdout and "HAS_LEVEL" in result.stdout:
+                print("✅ 检测到节点关系创建功能")
+            else:
+                print("⚠️  未检测到节点关系创建功能")
+                
+        else:
+            print(f"\n❌ 脚本执行失败,返回码: {result.returncode}")
+            return False
+            
+        return True
+        
+    except Exception as e:
+        print(f"执行脚本时发生错误: {e}")
+        return False
+
+def test_import_enhanced_modules():
+    """测试增强后的模块导入"""
+    print("测试增强后的模块导入...")
+    
+    try:
+        # 测试配置模块
+        from app.config.config import config, current_env
+        print("✅ 配置模块导入成功")
+        
+        # 测试Neo4j驱动模块
+        from app.services.neo4j_driver import Neo4jDriver
+        print("✅ Neo4j驱动模块导入成功")
+        
+        # 测试SQLAlchemy模块
+        from sqlalchemy import create_engine, text
+        print("✅ SQLAlchemy模块导入成功")
+        
+        # 测试增强后的处理器类
+        from app.core.data_parse.parse_neo4j_process import HotelPositionNeo4jProcessor
+        print("✅ 增强后的处理器类导入成功")
+        
+        return True
+        
+    except ImportError as e:
+        print(f"❌ 模块导入失败: {e}")
+        return False
+
+def test_hotel_group_brands_methods():
+    """测试酒店集团品牌相关方法"""
+    print("\n测试酒店集团品牌相关方法...")
+    
+    try:
+        from app.core.data_parse.parse_neo4j_process import HotelPositionNeo4jProcessor
+        
+        # 创建处理器实例
+        processor = HotelPositionNeo4jProcessor()
+        print("✅ 成功创建处理器实例")
+        
+        # 检查酒店集团品牌相关方法是否存在
+        if hasattr(processor, 'get_hotel_group_brands'):
+            print("✅ 检测到获取酒店集团品牌数据方法")
+        else:
+            print("❌ 未检测到获取酒店集团品牌数据方法")
+            
+        if hasattr(processor, 'process_hotel_group_brands'):
+            print("✅ 检测到处理酒店集团品牌数据方法")
+        else:
+            print("❌ 未检测到处理酒店集团品牌数据方法")
+            
+        if hasattr(processor, 'create_relationship'):
+            print("✅ 检测到关系创建方法")
+        else:
+            print("❌ 未检测到关系创建方法")
+            
+        return True
+        
+    except Exception as e:
+        print(f"❌ 测试酒店集团品牌方法时发生错误: {e}")
+        return False
+
+def test_data_structure():
+    """测试数据结构"""
+    print("\n测试数据结构...")
+    
+    try:
+        from app.core.data_parse.parse_neo4j_process import HotelPositionNeo4jProcessor
+        
+        processor = HotelPositionNeo4jProcessor()
+        
+        # 测试连接数据库(如果可能)
+        if processor.connect_postgresql():
+            print("✅ PostgreSQL数据库连接成功")
+            
+            # 尝试获取酒店集团品牌数据
+            try:
+                brands = processor.get_hotel_group_brands()
+                print(f"✅ 成功获取酒店集团品牌数据,共 {len(brands)} 条记录")
+                
+                if brands:
+                    print("数据示例:")
+                    for i, brand in enumerate(brands[:3]):
+                        print(f"  {i+1}. 集团: {brand['group_name_zh']}, 品牌: {brand['brand_name_zh']}, 级别: {brand['positioning_level_zh']}")
+                else:
+                    print("⚠️  未获取到酒店集团品牌数据,可能是表为空或查询条件不匹配")
+                    
+            except Exception as e:
+                print(f"⚠️  获取酒店集团品牌数据时发生错误: {e}")
+                print("这可能是正常的,如果数据库中没有相应的表或数据")
+        else:
+            print("⚠️  PostgreSQL数据库连接失败,跳过数据获取测试")
+            
+        return True
+        
+    except Exception as e:
+        print(f"❌ 测试数据结构时发生错误: {e}")
+        return False
+
+def main():
+    """主函数"""
+    print("酒店集团品牌数据处理功能测试")
+    print("=" * 70)
+    
+    # 测试模块导入
+    if not test_import_enhanced_modules():
+        print("\n模块导入测试失败,跳过其他测试")
+        return
+    
+    print("\n" + "=" * 70)
+    
+    # 测试酒店集团品牌相关方法
+    if not test_hotel_group_brands_methods():
+        print("\n酒店集团品牌方法测试失败,跳过其他测试")
+        return
+    
+    print("\n" + "=" * 70)
+    
+    # 测试数据结构
+    test_data_structure()
+    
+    print("\n" + "=" * 70)
+    
+    # 测试脚本执行
+    if test_hotel_group_brands_functionality():
+        print("\n🎉 酒店集团品牌功能测试通过!")
+    else:
+        print("\n💥 酒店集团品牌功能测试失败!")
+
+if __name__ == "__main__":
+    main() 

+ 103 - 0
test_parse_neo4j_process.py

@@ -0,0 +1,103 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+测试 parse_neo4j_process.py 程序
+
+该脚本用于测试酒店职位数据Neo4j同步程序的功能
+"""
+
+import os
+import sys
+import subprocess
+import time
+
+def test_parse_neo4j_process():
+    """测试parse_neo4j_process.py程序"""
+    
+    # 获取脚本路径
+    script_path = os.path.join('app', 'core', 'data_parse', 'parse_neo4j_process.py')
+    
+    if not os.path.exists(script_path):
+        print(f"错误: 找不到脚本文件 {script_path}")
+        return False
+    
+    print(f"开始测试脚本: {script_path}")
+    print("=" * 50)
+    
+    try:
+        # 运行脚本
+        start_time = time.time()
+        result = subprocess.run(
+            [sys.executable, script_path],
+            capture_output=True,
+            text=True,
+            encoding='utf-8',
+            cwd=os.getcwd()
+        )
+        end_time = time.time()
+        
+        # 输出结果
+        print(f"脚本执行时间: {end_time - start_time:.2f} 秒")
+        print(f"返回码: {result.returncode}")
+        print("\n标准输出:")
+        print(result.stdout)
+        
+        if result.stderr:
+            print("\n标准错误:")
+            print(result.stderr)
+        
+        # 判断执行结果
+        if result.returncode == 0:
+            print("\n✅ 脚本执行成功!")
+            return True
+        else:
+            print(f"\n❌ 脚本执行失败,返回码: {result.returncode}")
+            return False
+            
+    except Exception as e:
+        print(f"执行脚本时发生错误: {e}")
+        return False
+
+def test_import_modules():
+    """测试模块导入"""
+    print("测试模块导入...")
+    
+    try:
+        # 测试配置模块
+        from app.config.config import config, current_env
+        print("✅ 配置模块导入成功")
+        
+        # 测试Neo4j驱动模块
+        from app.services.neo4j_driver import Neo4jDriver
+        print("✅ Neo4j驱动模块导入成功")
+        
+        # 测试SQLAlchemy模块
+        from sqlalchemy import create_engine, text
+        print("✅ SQLAlchemy模块导入成功")
+        
+        return True
+        
+    except ImportError as e:
+        print(f"❌ 模块导入失败: {e}")
+        return False
+
+def main():
+    """主函数"""
+    print("酒店职位数据Neo4j同步程序测试")
+    print("=" * 50)
+    
+    # 测试模块导入
+    if not test_import_modules():
+        print("\n模块导入测试失败,跳过脚本执行测试")
+        return
+    
+    print("\n" + "=" * 50)
+    
+    # 测试脚本执行
+    if test_parse_neo4j_process():
+        print("\n🎉 所有测试通过!")
+    else:
+        print("\n💥 测试失败!")
+
+if __name__ == "__main__":
+    main()