||
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 酒店职位数据和酒店集团品牌数据同步到Neo4j图数据库程序
- 该程序通过读取config配置信息,访问PostgreSQL数据库表dataops/public/hotel_positions和hotel_group_brands,
- 依次读取数据表中的每一条记录,将其中相关字段内容添加到neo4j图数据库中的DataLabel节点,并创建节点之间的关系。
- DataLabel节点的属性定义:
- - name_zh: 对应为字段值(department_zh/position_zh/level_zh/group_name_zh/brand_name_zh/positioning_level_zh)
- - name_en: 对应为英文名称(department_en/position_en/level_en/group_name_en/brand_name_en/positioning_level_en)
- - describe: 空字符串
- - time: 当前系统时间
- - category: "人才地图"
- - status: "active"
- - node_type: "department"/"position"/"position_level"/"group"/"brand"/"brand_level"
- 节点关系:
- - position_zh节点与department_zh节点:BELONGS_TO关系
- - position_zh节点与level_zh节点:HAS_LEVEL关系
- - brand_name_zh节点与group_name_zh节点:BELONGS_TO关系
- - brand_name_zh节点与positioning_level_zh节点:HAS_LEVEL关系
- 添加时进行判断,若已经有name相同的节点,则不重复添加。
- 使用方法:
- python parse_neo4j_process.py
- """
- import os
- import sys
- import logging
- from datetime import datetime
- from typing import Dict, Any, List, Tuple
- from app.core.data_parse.time_utils import get_east_asia_time_str, get_east_asia_timestamp, get_east_asia_isoformat, get_east_asia_date_str
- # 添加项目根目录到Python路径
- current_dir = os.path.dirname(os.path.abspath(__file__))
- project_root = os.path.dirname(os.path.dirname(os.path.dirname(current_dir)))
- sys.path.insert(0, project_root)
- try:
- from app.services.neo4j_driver import Neo4jDriver
- from sqlalchemy import create_engine, text
- from sqlalchemy.exc import SQLAlchemyError
- except ImportError as e:
- print(f"导入模块失败: {e}")
- print("请确保在正确的环境中运行此脚本")
- sys.exit(1)
- # 配置日志
- def setup_logging():
- """配置日志"""
- log_format = '%(asctime)s - %(levelname)s - %(filename)s - %(funcName)s - %(lineno)s - %(message)s'
-
- # 创建logs目录(如果不存在)
- log_dir = os.path.join(project_root, 'logs')
- os.makedirs(log_dir, exist_ok=True)
-
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format=log_format,
- handlers=[
- logging.FileHandler(os.path.join(log_dir, 'parse_neo4j_process.log'), encoding='utf-8'),
- logging.StreamHandler(sys.stdout)
- ]
- )
-
- return logging.getLogger(__name__)
- class HotelPositionNeo4jProcessor:
- """酒店职位数据和酒店集团品牌数据Neo4j处理器"""
-
- def __init__(self):
- """初始化处理器"""
- self.logger = logging.getLogger(__name__)
- # 直接使用数据库连接信息,不依赖Flask配置
- self.pg_connection_string = 'postgresql://postgres:postgres@localhost:5432/dataops'
- self.pg_engine = None
- self.neo4j_driver = None
-
- def connect_postgresql(self):
- """连接PostgreSQL数据库"""
- try:
- self.pg_engine = create_engine(self.pg_connection_string)
- # 测试连接
- with self.pg_engine.connect() as conn:
- conn.execute(text("SELECT 1"))
- self.logger.info("PostgreSQL数据库连接成功")
- return True
- except SQLAlchemyError as e:
- self.logger.error(f"PostgreSQL数据库连接失败: {e}")
- return False
- except Exception as e:
- self.logger.error(f"连接PostgreSQL时发生未知错误: {e}")
- return False
-
- def connect_neo4j(self):
- """连接Neo4j数据库,从Flask配置获取连接信息"""
- try:
- # 从Flask配置获取Neo4j连接信息(统一配置源:app/config/config.py)
- # 如果不传参数,Neo4jDriver会自动从Flask配置获取
- self.neo4j_driver = Neo4jDriver()
- if self.neo4j_driver.verify_connectivity():
- self.logger.info("Neo4j数据库连接成功")
- return True
- else:
- self.logger.error("Neo4j数据库连接失败")
- return False
- except ValueError as e:
- self.logger.error(f"Neo4j配置错误: {e}")
- return False
- except Exception as e:
- self.logger.error(f"连接Neo4j时发生未知错误: {e}")
- return False
-
- def get_hotel_positions(self) -> List[Dict[str, Any]]:
- """从PostgreSQL数据库获取酒店职位数据"""
- try:
- if not self.pg_engine:
- self.logger.error("PostgreSQL引擎未初始化")
- return []
-
- query = """
- SELECT DISTINCT
- department_zh, department_en,
- position_zh, position_en,
- level_zh, level_en
- FROM hotel_positions
- WHERE department_zh IS NOT NULL
- AND department_zh != ''
- AND position_zh IS NOT NULL
- AND position_zh != ''
- AND level_zh IS NOT NULL
- AND level_zh != ''
- AND status = 'active'
- ORDER BY department_zh, position_zh, level_zh
- """
-
- with self.pg_engine.connect() as conn:
- result = conn.execute(text(query))
- positions = []
- for row in result:
- positions.append({
- 'department_zh': row[0],
- 'department_en': row[1] or '',
- 'position_zh': row[2],
- 'position_en': row[3] or '',
- 'level_zh': row[4],
- 'level_en': row[5] or ''
- })
-
- self.logger.info(f"成功获取 {len(positions)} 条酒店职位数据")
- return positions
-
- except SQLAlchemyError as e:
- self.logger.error(f"查询PostgreSQL数据库失败: {e}")
- return []
- except Exception as e:
- self.logger.error(f"获取酒店职位数据时发生未知错误: {e}")
- return []
-
- def get_hotel_group_brands(self) -> List[Dict[str, Any]]:
- """从PostgreSQL数据库获取酒店集团品牌数据"""
- try:
- if not self.pg_engine:
- self.logger.error("PostgreSQL引擎未初始化")
- return []
-
- query = """
- SELECT DISTINCT
- group_name_zh, group_name_en,
- brand_name_zh, brand_name_en,
- positioning_level_zh, positioning_level_en
- FROM hotel_group_brands
- WHERE group_name_zh IS NOT NULL
- AND group_name_zh != ''
- AND brand_name_zh IS NOT NULL
- AND brand_name_zh != ''
- AND positioning_level_zh IS NOT NULL
- AND positioning_level_zh != ''
- AND status = 'active'
- ORDER BY group_name_zh, brand_name_zh, positioning_level_zh
- """
-
- with self.pg_engine.connect() as conn:
- result = conn.execute(text(query))
- brands = []
- for row in result:
- brands.append({
- 'group_name_zh': row[0],
- 'group_name_en': row[1] or '',
- 'brand_name_zh': row[2],
- 'brand_name_en': row[3] or '',
- 'positioning_level_zh': row[4],
- 'positioning_level_en': row[5] or ''
- })
-
- self.logger.info(f"成功获取 {len(brands)} 条酒店集团品牌数据")
- return brands
-
- except SQLAlchemyError as e:
- self.logger.error(f"查询PostgreSQL数据库失败: {e}")
- return []
- except Exception as e:
- self.logger.error(f"获取酒店集团品牌数据时发生未知错误: {e}")
- return []
-
- def check_neo4j_node_exists(self, session, name: str) -> bool:
- """检查Neo4j中是否已存在相同name_zh的DataLabel节点"""
- try:
- query = "MATCH (n:DataLabel {name_zh: $name}) RETURN n LIMIT 1"
- result = session.run(query, name=name)
- return result.single() is not None
- except Exception as e:
- self.logger.error(f"检查Neo4j节点存在性时发生错误: {e}")
- return False
-
- def create_neo4j_node(self, session, node_data: Dict[str, str], node_type: str) -> bool:
- """在Neo4j中创建DataLabel节点"""
- try:
- current_time = get_east_asia_time_str()
-
- query = """
- CREATE (n:DataLabel {
- name_zh: $name_zh,
- name_en: $name_en,
- describe: $describe,
- time: $time,
- category: $category,
- status: $status,
- node_type: $node_type
- })
- """
-
- parameters = {
- 'name_zh': node_data['name_zh'],
- 'name_en': node_data['name_en'],
- 'describe': '',
- 'time': current_time,
- 'category': '人才地图',
- 'status': 'active',
- 'node_type': node_type
- }
-
- session.run(query, **parameters)
- return True
-
- except Exception as e:
- self.logger.error(f"创建Neo4j节点时发生错误: {e}")
- return False
-
- def create_relationship(self, session, from_name: str, to_name: str, relationship_type: str) -> bool:
- """创建两个DataLabel节点之间的关系"""
- try:
- query = """
- MATCH (from:DataLabel {name_zh: $from_name})
- MATCH (to:DataLabel {name_zh: $to_name})
- MERGE (from)-[r:$relationship_type]->(to)
- RETURN r
- """
-
- # 使用参数化查询避免Cypher注入
- if relationship_type == "BELONGS_TO":
- query = """
- MATCH (from:DataLabel {name_zh: $from_name})
- MATCH (to:DataLabel {name_zh: $to_name})
- MERGE (from)-[r:BELONGS_TO]->(to)
- RETURN r
- """
- elif relationship_type == "HAS_LEVEL":
- query = """
- MATCH (from:DataLabel {name_zh: $from_name})
- MATCH (to:DataLabel {name_zh: $to_name})
- MERGE (from)-[r:HAS_LEVEL]->(to)
- RETURN r
- """
-
- result = session.run(query, from_name=from_name, to_name=to_name)
- return result.single() is not None
-
- except Exception as e:
- self.logger.error(f"创建关系时发生错误: {e}")
- return False
-
- def process_hotel_positions(self) -> Dict[str, Any]:
- """处理酒店职位数据同步到Neo4j"""
- try:
- # 获取酒店职位数据
- positions = self.get_hotel_positions()
- if not positions:
- return {
- 'success': False,
- 'message': '没有获取到酒店职位数据',
- 'total': 0,
- 'departments_created': 0,
- 'departments_skipped': 0,
- 'positions_created': 0,
- 'positions_skipped': 0,
- 'levels_created': 0,
- 'levels_skipped': 0,
- 'relationships_created': 0
- }
-
- total_count = len(positions)
- departments_created = 0
- departments_skipped = 0
- positions_created = 0
- positions_skipped = 0
- levels_created = 0
- levels_skipped = 0
- relationships_created = 0
-
- # 获取Neo4j会话
- if not self.neo4j_driver:
- self.logger.error("Neo4j驱动器未初始化")
- return {
- 'success': False,
- 'message': 'Neo4j驱动器未初始化',
- 'total': 0,
- 'departments_created': 0,
- 'departments_skipped': 0,
- 'positions_created': 0,
- 'positions_skipped': 0,
- 'levels_created': 0,
- 'levels_skipped': 0,
- 'relationships_created': 0
- }
-
- with self.neo4j_driver.get_session() as session:
- for position in positions:
- department_zh = position['department_zh']
- position_zh = position['position_zh']
- level_zh = position['level_zh']
-
- # 处理部门节点
- if not self.check_neo4j_node_exists(session, department_zh):
- dept_data = {
- 'name_zh': department_zh,
- 'name_en': position['department_en']
- }
- if self.create_neo4j_node(session, dept_data, 'department'):
- self.logger.info(f"成功创建部门节点: {department_zh}")
- departments_created += 1
- else:
- self.logger.error(f"创建部门节点失败: {department_zh}")
- else:
- self.logger.info(f"部门节点已存在,跳过: {department_zh}")
- departments_skipped += 1
-
- # 处理职位节点
- if not self.check_neo4j_node_exists(session, position_zh):
- pos_data = {
- 'name_zh': position_zh,
- 'name_en': position['position_en']
- }
- if self.create_neo4j_node(session, pos_data, 'position'):
- self.logger.info(f"成功创建职位节点: {position_zh}")
- positions_created += 1
- else:
- self.logger.error(f"创建职位节点失败: {position_zh}")
- else:
- self.logger.info(f"职位节点已存在,跳过: {position_zh}")
- positions_skipped += 1
-
- # 处理级别节点
- if not self.check_neo4j_node_exists(session, level_zh):
- level_data = {
- 'name_zh': level_zh,
- 'name_en': position['level_en']
- }
- if self.create_neo4j_node(session, level_data, 'position_level'):
- self.logger.info(f"成功创建级别节点: {level_zh}")
- levels_created += 1
- else:
- self.logger.error(f"创建级别节点失败: {level_zh}")
- else:
- self.logger.info(f"级别节点已存在,跳过: {level_zh}")
- levels_skipped += 1
-
- # 创建关系
- # 职位属于部门的关系
- if self.create_relationship(session, position_zh, department_zh, "BELONGS_TO"):
- self.logger.info(f"成功创建关系: {position_zh} BELONGS_TO {department_zh}")
- relationships_created += 1
- else:
- self.logger.error(f"创建关系失败: {position_zh} BELONGS_TO {department_zh}")
-
- # 职位具有级别的关系
- if self.create_relationship(session, position_zh, level_zh, "HAS_LEVEL"):
- self.logger.info(f"成功创建关系: {position_zh} HAS_LEVEL {level_zh}")
- relationships_created += 1
- else:
- self.logger.error(f"创建关系失败: {position_zh} HAS_LEVEL {level_zh}")
-
- return {
- 'success': True,
- 'message': '酒店职位数据同步完成',
- 'total': total_count,
- 'departments_created': departments_created,
- 'departments_skipped': departments_skipped,
- 'positions_created': positions_created,
- 'positions_skipped': positions_skipped,
- 'levels_created': levels_created,
- 'levels_skipped': levels_skipped,
- 'relationships_created': relationships_created
- }
-
- except Exception as e:
- self.logger.error(f"处理酒店职位数据时发生错误: {e}")
- return {
- 'success': False,
- 'message': f'处理失败: {str(e)}',
- 'total': 0,
- 'departments_created': 0,
- 'departments_skipped': 0,
- 'positions_created': 0,
- 'positions_skipped': 0,
- 'levels_created': 0,
- 'levels_skipped': 0,
- 'relationships_created': 0
- }
-
- def process_hotel_group_brands(self) -> Dict[str, Any]:
- """处理酒店集团品牌数据同步到Neo4j"""
- try:
- # 获取酒店集团品牌数据
- brands = self.get_hotel_group_brands()
- if not brands:
- return {
- 'success': False,
- 'message': '没有获取到酒店集团品牌数据',
- 'total': 0,
- 'groups_created': 0,
- 'groups_skipped': 0,
- 'brands_created': 0,
- 'brands_skipped': 0,
- 'brand_levels_created': 0,
- 'brand_levels_skipped': 0,
- 'relationships_created': 0
- }
-
- total_count = len(brands)
- groups_created = 0
- groups_skipped = 0
- brands_created = 0
- brands_skipped = 0
- brand_levels_created = 0
- brand_levels_skipped = 0
- relationships_created = 0
-
- # 获取Neo4j会话
- if not self.neo4j_driver:
- self.logger.error("Neo4j驱动器未初始化")
- return {
- 'success': False,
- 'message': 'Neo4j驱动器未初始化',
- 'total': 0,
- 'groups_created': 0,
- 'groups_skipped': 0,
- 'brands_created': 0,
- 'brands_skipped': 0,
- 'brand_levels_created': 0,
- 'brand_levels_skipped': 0,
- 'relationships_created': 0
- }
-
- with self.neo4j_driver.get_session() as session:
- for brand in brands:
- group_name_zh = brand['group_name_zh']
- brand_name_zh = brand['brand_name_zh']
- positioning_level_zh = brand['positioning_level_zh']
-
- # 处理集团节点
- if not self.check_neo4j_node_exists(session, group_name_zh):
- group_data = {
- 'name_zh': group_name_zh,
- 'name_en': brand['group_name_en']
- }
- if self.create_neo4j_node(session, group_data, 'group'):
- self.logger.info(f"成功创建集团节点: {group_name_zh}")
- groups_created += 1
- else:
- self.logger.error(f"创建集团节点失败: {group_name_zh}")
- else:
- self.logger.info(f"集团节点已存在,跳过: {group_name_zh}")
- groups_skipped += 1
-
- # 处理品牌节点
- if not self.check_neo4j_node_exists(session, brand_name_zh):
- brand_data = {
- 'name_zh': brand_name_zh,
- 'name_en': brand['brand_name_en']
- }
- if self.create_neo4j_node(session, brand_data, 'brand'):
- self.logger.info(f"成功创建品牌节点: {brand_name_zh}")
- brands_created += 1
- else:
- self.logger.error(f"创建品牌节点失败: {brand_name_zh}")
- else:
- self.logger.info(f"品牌节点已存在,跳过: {brand_name_zh}")
- brands_skipped += 1
-
- # 处理品牌级别节点
- if not self.check_neo4j_node_exists(session, positioning_level_zh):
- level_data = {
- 'name_zh': positioning_level_zh,
- 'name_en': brand['positioning_level_en']
- }
- if self.create_neo4j_node(session, level_data, 'brand_level'):
- self.logger.info(f"成功创建品牌级别节点: {positioning_level_zh}")
- brand_levels_created += 1
- else:
- self.logger.error(f"创建品牌级别节点失败: {positioning_level_zh}")
- else:
- self.logger.info(f"品牌级别节点已存在,跳过: {positioning_level_zh}")
- brand_levels_skipped += 1
-
- # 创建关系
- # 品牌属于集团的关系
- if self.create_relationship(session, brand_name_zh, group_name_zh, "BELONGS_TO"):
- self.logger.info(f"成功创建关系: {brand_name_zh} BELONGS_TO {group_name_zh}")
- relationships_created += 1
- else:
- self.logger.error(f"创建关系失败: {brand_name_zh} BELONGS_TO {group_name_zh}")
-
- # 品牌具有级别的关系
- if self.create_relationship(session, brand_name_zh, positioning_level_zh, "HAS_LEVEL"):
- self.logger.info(f"成功创建关系: {brand_name_zh} HAS_LEVEL {positioning_level_zh}")
- relationships_created += 1
- else:
- self.logger.error(f"创建关系失败: {brand_name_zh} HAS_LEVEL {positioning_level_zh}")
-
- return {
- 'success': True,
- 'message': '酒店集团品牌数据同步完成',
- 'total': total_count,
- 'groups_created': groups_created,
- 'groups_skipped': groups_skipped,
- 'brands_created': brands_created,
- 'brands_skipped': brands_skipped,
- 'brand_levels_created': brand_levels_created,
- 'brand_levels_skipped': brand_levels_skipped,
- 'relationships_created': relationships_created
- }
-
- except Exception as e:
- self.logger.error(f"处理酒店集团品牌数据时发生错误: {e}")
- return {
- 'success': False,
- 'message': f'处理失败: {str(e)}',
- 'total': 0,
- 'groups_created': 0,
- 'groups_skipped': 0,
- 'brands_created': 0,
- 'brands_skipped': 0,
- 'brand_levels_created': 0,
- 'brand_levels_skipped': 0,
- 'relationships_created': 0
- }
-
- def run(self) -> bool:
- """运行主程序"""
- self.logger.info("开始执行酒店职位数据和酒店集团品牌数据Neo4j同步程序")
-
- try:
- # 连接数据库
- if not self.connect_postgresql():
- self.logger.error("无法连接PostgreSQL数据库,程序退出")
- return False
-
- if not self.connect_neo4j():
- self.logger.error("无法连接Neo4j数据库,程序退出")
- return False
-
- # 处理酒店职位数据同步
- self.logger.info("开始处理酒店职位数据...")
- positions_result = self.process_hotel_positions()
-
- if positions_result['success']:
- self.logger.info(f"酒店职位数据同步完成: {positions_result['message']}")
- self.logger.info(f"总计记录: {positions_result['total']}")
- self.logger.info(f"部门节点 - 新建: {positions_result['departments_created']}, 跳过: {positions_result['departments_skipped']}")
- self.logger.info(f"职位节点 - 新建: {positions_result['positions_created']}, 跳过: {positions_result['positions_skipped']}")
- self.logger.info(f"级别节点 - 新建: {positions_result['levels_created']}, 跳过: {positions_result['levels_skipped']}")
- self.logger.info(f"关系创建: {positions_result['relationships_created']}")
- else:
- self.logger.error(f"酒店职位数据同步失败: {positions_result['message']}")
-
- # 处理酒店集团品牌数据同步
- self.logger.info("开始处理酒店集团品牌数据...")
- brands_result = self.process_hotel_group_brands()
-
- if brands_result['success']:
- self.logger.info(f"酒店集团品牌数据同步完成: {brands_result['message']}")
- self.logger.info(f"总计记录: {brands_result['total']}")
- self.logger.info(f"集团节点 - 新建: {brands_result['groups_created']}, 跳过: {brands_result['groups_skipped']}")
- self.logger.info(f"品牌节点 - 新建: {brands_result['brands_created']}, 跳过: {brands_result['brands_skipped']}")
- self.logger.info(f"品牌级别节点 - 新建: {brands_result['brand_levels_created']}, 跳过: {brands_result['brand_levels_skipped']}")
- self.logger.info(f"关系创建: {brands_result['relationships_created']}")
- else:
- self.logger.error(f"酒店集团品牌数据同步失败: {brands_result['message']}")
-
- # 判断整体执行结果
- overall_success = positions_result['success'] and brands_result['success']
-
- if overall_success:
- self.logger.info("所有数据同步任务完成")
- else:
- self.logger.warning("部分数据同步任务失败")
-
- return overall_success
-
- except Exception as e:
- self.logger.error(f"程序执行过程中发生未知错误: {e}")
- return False
-
- finally:
- # 清理资源
- if self.pg_engine:
- self.pg_engine.dispose()
- if self.neo4j_driver:
- self.neo4j_driver.close()
- self.logger.info("程序执行完成,资源已清理")
- def main():
- """主函数"""
- # 设置日志
- logger = setup_logging()
-
- try:
- # 创建处理器并运行
- processor = HotelPositionNeo4jProcessor()
- success = processor.run()
-
- if success:
- logger.info("程序执行成功")
- sys.exit(0)
- else:
- logger.error("程序执行失败")
- sys.exit(1)
-
- except KeyboardInterrupt:
- logger.info("程序被用户中断")
- sys.exit(0)
- except Exception as e:
- logger.error(f"程序执行时发生未处理的错误: {e}")
- sys.exit(1)
- if __name__ == "__main__":
- main()
|