#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 酒店职位数据和酒店集团品牌数据同步到Neo4j图数据库程序 该程序通过读取config配置信息,访问PostgreSQL数据库表dataops/public/hotel_positions和hotel_group_brands, 依次读取数据表中的每一条记录,将其中相关字段内容添加到neo4j图数据库中的DataLabel节点,并创建节点之间的关系。 DataLabel节点的属性定义: - name_zh: 对应为字段值(department_zh/position_zh/level_zh/group_name_zh/brand_name_zh/positioning_level_zh) - name_en: 对应为英文名称(department_en/position_en/level_en/group_name_en/brand_name_en/positioning_level_en) - describe: 空字符串 - time: 当前系统时间 - category: "人才地图" - status: "active" - node_type: "department"/"position"/"position_level"/"group"/"brand"/"brand_level" 节点关系: - position_zh节点与department_zh节点:BELONGS_TO关系 - position_zh节点与level_zh节点:HAS_LEVEL关系 - brand_name_zh节点与group_name_zh节点:BELONGS_TO关系 - brand_name_zh节点与positioning_level_zh节点:HAS_LEVEL关系 添加时进行判断,若已经有name相同的节点,则不重复添加。 使用方法: python parse_neo4j_process.py """ import os import sys import logging from datetime import datetime from typing import Dict, Any, List, Tuple from app.core.data_parse.time_utils import get_east_asia_time_str, get_east_asia_timestamp, get_east_asia_isoformat, get_east_asia_date_str # 添加项目根目录到Python路径 current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(os.path.dirname(os.path.dirname(current_dir))) sys.path.insert(0, project_root) try: from app.services.neo4j_driver import Neo4jDriver from sqlalchemy import create_engine, text from sqlalchemy.exc import SQLAlchemyError except ImportError as e: print(f"导入模块失败: {e}") print("请确保在正确的环境中运行此脚本") sys.exit(1) # 配置日志 def setup_logging(): """配置日志""" log_format = '%(asctime)s - %(levelname)s - %(filename)s - %(funcName)s - %(lineno)s - %(message)s' # 创建logs目录(如果不存在) log_dir = os.path.join(project_root, 'logs') os.makedirs(log_dir, exist_ok=True) # 配置日志 logging.basicConfig( level=logging.INFO, format=log_format, handlers=[ logging.FileHandler(os.path.join(log_dir, 'parse_neo4j_process.log'), encoding='utf-8'), logging.StreamHandler(sys.stdout) ] ) return logging.getLogger(__name__) class HotelPositionNeo4jProcessor: """酒店职位数据和酒店集团品牌数据Neo4j处理器""" def __init__(self): """初始化处理器""" self.logger = logging.getLogger(__name__) # 直接使用数据库连接信息,不依赖Flask配置 self.pg_connection_string = 'postgresql://postgres:postgres@localhost:5432/dataops' self.pg_engine = None self.neo4j_driver = None def connect_postgresql(self): """连接PostgreSQL数据库""" try: self.pg_engine = create_engine(self.pg_connection_string) # 测试连接 with self.pg_engine.connect() as conn: conn.execute(text("SELECT 1")) self.logger.info("PostgreSQL数据库连接成功") return True except SQLAlchemyError as e: self.logger.error(f"PostgreSQL数据库连接失败: {e}") return False except Exception as e: self.logger.error(f"连接PostgreSQL时发生未知错误: {e}") return False def connect_neo4j(self): """连接Neo4j数据库,从Flask配置获取连接信息""" try: # 从Flask配置获取Neo4j连接信息(统一配置源:app/config/config.py) # 如果不传参数,Neo4jDriver会自动从Flask配置获取 self.neo4j_driver = Neo4jDriver() if self.neo4j_driver.verify_connectivity(): self.logger.info("Neo4j数据库连接成功") return True else: self.logger.error("Neo4j数据库连接失败") return False except ValueError as e: self.logger.error(f"Neo4j配置错误: {e}") return False except Exception as e: self.logger.error(f"连接Neo4j时发生未知错误: {e}") return False def get_hotel_positions(self) -> List[Dict[str, Any]]: """从PostgreSQL数据库获取酒店职位数据""" try: if not self.pg_engine: self.logger.error("PostgreSQL引擎未初始化") return [] query = """ SELECT DISTINCT department_zh, department_en, position_zh, position_en, level_zh, level_en FROM hotel_positions WHERE department_zh IS NOT NULL AND department_zh != '' AND position_zh IS NOT NULL AND position_zh != '' AND level_zh IS NOT NULL AND level_zh != '' AND status = 'active' ORDER BY department_zh, position_zh, level_zh """ with self.pg_engine.connect() as conn: result = conn.execute(text(query)) positions = [] for row in result: positions.append({ 'department_zh': row[0], 'department_en': row[1] or '', 'position_zh': row[2], 'position_en': row[3] or '', 'level_zh': row[4], 'level_en': row[5] or '' }) self.logger.info(f"成功获取 {len(positions)} 条酒店职位数据") return positions except SQLAlchemyError as e: self.logger.error(f"查询PostgreSQL数据库失败: {e}") return [] except Exception as e: self.logger.error(f"获取酒店职位数据时发生未知错误: {e}") return [] def get_hotel_group_brands(self) -> List[Dict[str, Any]]: """从PostgreSQL数据库获取酒店集团品牌数据""" try: if not self.pg_engine: self.logger.error("PostgreSQL引擎未初始化") return [] query = """ SELECT DISTINCT group_name_zh, group_name_en, brand_name_zh, brand_name_en, positioning_level_zh, positioning_level_en FROM hotel_group_brands WHERE group_name_zh IS NOT NULL AND group_name_zh != '' AND brand_name_zh IS NOT NULL AND brand_name_zh != '' AND positioning_level_zh IS NOT NULL AND positioning_level_zh != '' AND status = 'active' ORDER BY group_name_zh, brand_name_zh, positioning_level_zh """ with self.pg_engine.connect() as conn: result = conn.execute(text(query)) brands = [] for row in result: brands.append({ 'group_name_zh': row[0], 'group_name_en': row[1] or '', 'brand_name_zh': row[2], 'brand_name_en': row[3] or '', 'positioning_level_zh': row[4], 'positioning_level_en': row[5] or '' }) self.logger.info(f"成功获取 {len(brands)} 条酒店集团品牌数据") return brands except SQLAlchemyError as e: self.logger.error(f"查询PostgreSQL数据库失败: {e}") return [] except Exception as e: self.logger.error(f"获取酒店集团品牌数据时发生未知错误: {e}") return [] def check_neo4j_node_exists(self, session, name: str) -> bool: """检查Neo4j中是否已存在相同name_zh的DataLabel节点""" try: query = "MATCH (n:DataLabel {name_zh: $name}) RETURN n LIMIT 1" result = session.run(query, name=name) return result.single() is not None except Exception as e: self.logger.error(f"检查Neo4j节点存在性时发生错误: {e}") return False def create_neo4j_node(self, session, node_data: Dict[str, str], node_type: str) -> bool: """在Neo4j中创建DataLabel节点""" try: current_time = get_east_asia_time_str() query = """ CREATE (n:DataLabel { name_zh: $name_zh, name_en: $name_en, describe: $describe, time: $time, category: $category, status: $status, node_type: $node_type }) """ parameters = { 'name_zh': node_data['name_zh'], 'name_en': node_data['name_en'], 'describe': '', 'time': current_time, 'category': '人才地图', 'status': 'active', 'node_type': node_type } session.run(query, **parameters) return True except Exception as e: self.logger.error(f"创建Neo4j节点时发生错误: {e}") return False def create_relationship(self, session, from_name: str, to_name: str, relationship_type: str) -> bool: """创建两个DataLabel节点之间的关系""" try: query = """ MATCH (from:DataLabel {name_zh: $from_name}) MATCH (to:DataLabel {name_zh: $to_name}) MERGE (from)-[r:$relationship_type]->(to) RETURN r """ # 使用参数化查询避免Cypher注入 if relationship_type == "BELONGS_TO": query = """ MATCH (from:DataLabel {name_zh: $from_name}) MATCH (to:DataLabel {name_zh: $to_name}) MERGE (from)-[r:BELONGS_TO]->(to) RETURN r """ elif relationship_type == "HAS_LEVEL": query = """ MATCH (from:DataLabel {name_zh: $from_name}) MATCH (to:DataLabel {name_zh: $to_name}) MERGE (from)-[r:HAS_LEVEL]->(to) RETURN r """ result = session.run(query, from_name=from_name, to_name=to_name) return result.single() is not None except Exception as e: self.logger.error(f"创建关系时发生错误: {e}") return False def process_hotel_positions(self) -> Dict[str, Any]: """处理酒店职位数据同步到Neo4j""" try: # 获取酒店职位数据 positions = self.get_hotel_positions() if not positions: return { 'success': False, 'message': '没有获取到酒店职位数据', 'total': 0, 'departments_created': 0, 'departments_skipped': 0, 'positions_created': 0, 'positions_skipped': 0, 'levels_created': 0, 'levels_skipped': 0, 'relationships_created': 0 } total_count = len(positions) departments_created = 0 departments_skipped = 0 positions_created = 0 positions_skipped = 0 levels_created = 0 levels_skipped = 0 relationships_created = 0 # 获取Neo4j会话 if not self.neo4j_driver: self.logger.error("Neo4j驱动器未初始化") return { 'success': False, 'message': 'Neo4j驱动器未初始化', 'total': 0, 'departments_created': 0, 'departments_skipped': 0, 'positions_created': 0, 'positions_skipped': 0, 'levels_created': 0, 'levels_skipped': 0, 'relationships_created': 0 } with self.neo4j_driver.get_session() as session: for position in positions: department_zh = position['department_zh'] position_zh = position['position_zh'] level_zh = position['level_zh'] # 处理部门节点 if not self.check_neo4j_node_exists(session, department_zh): dept_data = { 'name_zh': department_zh, 'name_en': position['department_en'] } if self.create_neo4j_node(session, dept_data, 'department'): self.logger.info(f"成功创建部门节点: {department_zh}") departments_created += 1 else: self.logger.error(f"创建部门节点失败: {department_zh}") else: self.logger.info(f"部门节点已存在,跳过: {department_zh}") departments_skipped += 1 # 处理职位节点 if not self.check_neo4j_node_exists(session, position_zh): pos_data = { 'name_zh': position_zh, 'name_en': position['position_en'] } if self.create_neo4j_node(session, pos_data, 'position'): self.logger.info(f"成功创建职位节点: {position_zh}") positions_created += 1 else: self.logger.error(f"创建职位节点失败: {position_zh}") else: self.logger.info(f"职位节点已存在,跳过: {position_zh}") positions_skipped += 1 # 处理级别节点 if not self.check_neo4j_node_exists(session, level_zh): level_data = { 'name_zh': level_zh, 'name_en': position['level_en'] } if self.create_neo4j_node(session, level_data, 'position_level'): self.logger.info(f"成功创建级别节点: {level_zh}") levels_created += 1 else: self.logger.error(f"创建级别节点失败: {level_zh}") else: self.logger.info(f"级别节点已存在,跳过: {level_zh}") levels_skipped += 1 # 创建关系 # 职位属于部门的关系 if self.create_relationship(session, position_zh, department_zh, "BELONGS_TO"): self.logger.info(f"成功创建关系: {position_zh} BELONGS_TO {department_zh}") relationships_created += 1 else: self.logger.error(f"创建关系失败: {position_zh} BELONGS_TO {department_zh}") # 职位具有级别的关系 if self.create_relationship(session, position_zh, level_zh, "HAS_LEVEL"): self.logger.info(f"成功创建关系: {position_zh} HAS_LEVEL {level_zh}") relationships_created += 1 else: self.logger.error(f"创建关系失败: {position_zh} HAS_LEVEL {level_zh}") return { 'success': True, 'message': '酒店职位数据同步完成', 'total': total_count, 'departments_created': departments_created, 'departments_skipped': departments_skipped, 'positions_created': positions_created, 'positions_skipped': positions_skipped, 'levels_created': levels_created, 'levels_skipped': levels_skipped, 'relationships_created': relationships_created } except Exception as e: self.logger.error(f"处理酒店职位数据时发生错误: {e}") return { 'success': False, 'message': f'处理失败: {str(e)}', 'total': 0, 'departments_created': 0, 'departments_skipped': 0, 'positions_created': 0, 'positions_skipped': 0, 'levels_created': 0, 'levels_skipped': 0, 'relationships_created': 0 } def process_hotel_group_brands(self) -> Dict[str, Any]: """处理酒店集团品牌数据同步到Neo4j""" try: # 获取酒店集团品牌数据 brands = self.get_hotel_group_brands() if not brands: return { 'success': False, 'message': '没有获取到酒店集团品牌数据', 'total': 0, 'groups_created': 0, 'groups_skipped': 0, 'brands_created': 0, 'brands_skipped': 0, 'brand_levels_created': 0, 'brand_levels_skipped': 0, 'relationships_created': 0 } total_count = len(brands) groups_created = 0 groups_skipped = 0 brands_created = 0 brands_skipped = 0 brand_levels_created = 0 brand_levels_skipped = 0 relationships_created = 0 # 获取Neo4j会话 if not self.neo4j_driver: self.logger.error("Neo4j驱动器未初始化") return { 'success': False, 'message': 'Neo4j驱动器未初始化', 'total': 0, 'groups_created': 0, 'groups_skipped': 0, 'brands_created': 0, 'brands_skipped': 0, 'brand_levels_created': 0, 'brand_levels_skipped': 0, 'relationships_created': 0 } with self.neo4j_driver.get_session() as session: for brand in brands: group_name_zh = brand['group_name_zh'] brand_name_zh = brand['brand_name_zh'] positioning_level_zh = brand['positioning_level_zh'] # 处理集团节点 if not self.check_neo4j_node_exists(session, group_name_zh): group_data = { 'name_zh': group_name_zh, 'name_en': brand['group_name_en'] } if self.create_neo4j_node(session, group_data, 'group'): self.logger.info(f"成功创建集团节点: {group_name_zh}") groups_created += 1 else: self.logger.error(f"创建集团节点失败: {group_name_zh}") else: self.logger.info(f"集团节点已存在,跳过: {group_name_zh}") groups_skipped += 1 # 处理品牌节点 if not self.check_neo4j_node_exists(session, brand_name_zh): brand_data = { 'name_zh': brand_name_zh, 'name_en': brand['brand_name_en'] } if self.create_neo4j_node(session, brand_data, 'brand'): self.logger.info(f"成功创建品牌节点: {brand_name_zh}") brands_created += 1 else: self.logger.error(f"创建品牌节点失败: {brand_name_zh}") else: self.logger.info(f"品牌节点已存在,跳过: {brand_name_zh}") brands_skipped += 1 # 处理品牌级别节点 if not self.check_neo4j_node_exists(session, positioning_level_zh): level_data = { 'name_zh': positioning_level_zh, 'name_en': brand['positioning_level_en'] } if self.create_neo4j_node(session, level_data, 'brand_level'): self.logger.info(f"成功创建品牌级别节点: {positioning_level_zh}") brand_levels_created += 1 else: self.logger.error(f"创建品牌级别节点失败: {positioning_level_zh}") else: self.logger.info(f"品牌级别节点已存在,跳过: {positioning_level_zh}") brand_levels_skipped += 1 # 创建关系 # 品牌属于集团的关系 if self.create_relationship(session, brand_name_zh, group_name_zh, "BELONGS_TO"): self.logger.info(f"成功创建关系: {brand_name_zh} BELONGS_TO {group_name_zh}") relationships_created += 1 else: self.logger.error(f"创建关系失败: {brand_name_zh} BELONGS_TO {group_name_zh}") # 品牌具有级别的关系 if self.create_relationship(session, brand_name_zh, positioning_level_zh, "HAS_LEVEL"): self.logger.info(f"成功创建关系: {brand_name_zh} HAS_LEVEL {positioning_level_zh}") relationships_created += 1 else: self.logger.error(f"创建关系失败: {brand_name_zh} HAS_LEVEL {positioning_level_zh}") return { 'success': True, 'message': '酒店集团品牌数据同步完成', 'total': total_count, 'groups_created': groups_created, 'groups_skipped': groups_skipped, 'brands_created': brands_created, 'brands_skipped': brands_skipped, 'brand_levels_created': brand_levels_created, 'brand_levels_skipped': brand_levels_skipped, 'relationships_created': relationships_created } except Exception as e: self.logger.error(f"处理酒店集团品牌数据时发生错误: {e}") return { 'success': False, 'message': f'处理失败: {str(e)}', 'total': 0, 'groups_created': 0, 'groups_skipped': 0, 'brands_created': 0, 'brands_skipped': 0, 'brand_levels_created': 0, 'brand_levels_skipped': 0, 'relationships_created': 0 } def run(self) -> bool: """运行主程序""" self.logger.info("开始执行酒店职位数据和酒店集团品牌数据Neo4j同步程序") try: # 连接数据库 if not self.connect_postgresql(): self.logger.error("无法连接PostgreSQL数据库,程序退出") return False if not self.connect_neo4j(): self.logger.error("无法连接Neo4j数据库,程序退出") return False # 处理酒店职位数据同步 self.logger.info("开始处理酒店职位数据...") positions_result = self.process_hotel_positions() if positions_result['success']: self.logger.info(f"酒店职位数据同步完成: {positions_result['message']}") self.logger.info(f"总计记录: {positions_result['total']}") self.logger.info(f"部门节点 - 新建: {positions_result['departments_created']}, 跳过: {positions_result['departments_skipped']}") self.logger.info(f"职位节点 - 新建: {positions_result['positions_created']}, 跳过: {positions_result['positions_skipped']}") self.logger.info(f"级别节点 - 新建: {positions_result['levels_created']}, 跳过: {positions_result['levels_skipped']}") self.logger.info(f"关系创建: {positions_result['relationships_created']}") else: self.logger.error(f"酒店职位数据同步失败: {positions_result['message']}") # 处理酒店集团品牌数据同步 self.logger.info("开始处理酒店集团品牌数据...") brands_result = self.process_hotel_group_brands() if brands_result['success']: self.logger.info(f"酒店集团品牌数据同步完成: {brands_result['message']}") self.logger.info(f"总计记录: {brands_result['total']}") self.logger.info(f"集团节点 - 新建: {brands_result['groups_created']}, 跳过: {brands_result['groups_skipped']}") self.logger.info(f"品牌节点 - 新建: {brands_result['brands_created']}, 跳过: {brands_result['brands_skipped']}") self.logger.info(f"品牌级别节点 - 新建: {brands_result['brand_levels_created']}, 跳过: {brands_result['brand_levels_skipped']}") self.logger.info(f"关系创建: {brands_result['relationships_created']}") else: self.logger.error(f"酒店集团品牌数据同步失败: {brands_result['message']}") # 判断整体执行结果 overall_success = positions_result['success'] and brands_result['success'] if overall_success: self.logger.info("所有数据同步任务完成") else: self.logger.warning("部分数据同步任务失败") return overall_success except Exception as e: self.logger.error(f"程序执行过程中发生未知错误: {e}") return False finally: # 清理资源 if self.pg_engine: self.pg_engine.dispose() if self.neo4j_driver: self.neo4j_driver.close() self.logger.info("程序执行完成,资源已清理") def main(): """主函数""" # 设置日志 logger = setup_logging() try: # 创建处理器并运行 processor = HotelPositionNeo4jProcessor() success = processor.run() if success: logger.info("程序执行成功") sys.exit(0) else: logger.error("程序执行失败") sys.exit(1) except KeyboardInterrupt: logger.info("程序被用户中断") sys.exit(0) except Exception as e: logger.error(f"程序执行时发生未处理的错误: {e}") sys.exit(1) if __name__ == "__main__": main()