parse_neo4j_process.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 酒店职位数据和酒店集团品牌数据同步到Neo4j图数据库程序
  5. 该程序通过读取config配置信息,访问PostgreSQL数据库表dataops/public/hotel_positions和hotel_group_brands,
  6. 依次读取数据表中的每一条记录,将其中相关字段内容添加到neo4j图数据库中的DataLabel节点,并创建节点之间的关系。
  7. DataLabel节点的属性定义:
  8. - name_zh: 对应为字段值(department_zh/position_zh/level_zh/group_name_zh/brand_name_zh/positioning_level_zh)
  9. - name_en: 对应为英文名称(department_en/position_en/level_en/group_name_en/brand_name_en/positioning_level_en)
  10. - describe: 空字符串
  11. - time: 当前系统时间
  12. - category: "人才地图"
  13. - status: "active"
  14. - node_type: "department"/"position"/"position_level"/"group"/"brand"/"brand_level"
  15. 节点关系:
  16. - position_zh节点与department_zh节点:BELONGS_TO关系
  17. - position_zh节点与level_zh节点:HAS_LEVEL关系
  18. - brand_name_zh节点与group_name_zh节点:BELONGS_TO关系
  19. - brand_name_zh节点与positioning_level_zh节点:HAS_LEVEL关系
  20. 添加时进行判断,若已经有name相同的节点,则不重复添加。
  21. 使用方法:
  22. python parse_neo4j_process.py
  23. """
  24. import os
  25. import sys
  26. import logging
  27. from datetime import datetime
  28. from typing import Dict, Any, List, Tuple
  29. from app.core.data_parse.time_utils import get_east_asia_time_str, get_east_asia_timestamp, get_east_asia_isoformat, get_east_asia_date_str
  30. # 添加项目根目录到Python路径
  31. current_dir = os.path.dirname(os.path.abspath(__file__))
  32. project_root = os.path.dirname(os.path.dirname(os.path.dirname(current_dir)))
  33. sys.path.insert(0, project_root)
  34. try:
  35. from app.services.neo4j_driver import Neo4jDriver
  36. from sqlalchemy import create_engine, text
  37. from sqlalchemy.exc import SQLAlchemyError
  38. except ImportError as e:
  39. print(f"导入模块失败: {e}")
  40. print("请确保在正确的环境中运行此脚本")
  41. sys.exit(1)
  42. # 配置日志
  43. def setup_logging():
  44. """配置日志"""
  45. log_format = '%(asctime)s - %(levelname)s - %(filename)s - %(funcName)s - %(lineno)s - %(message)s'
  46. # 创建logs目录(如果不存在)
  47. log_dir = os.path.join(project_root, 'logs')
  48. os.makedirs(log_dir, exist_ok=True)
  49. # 配置日志
  50. logging.basicConfig(
  51. level=logging.INFO,
  52. format=log_format,
  53. handlers=[
  54. logging.FileHandler(os.path.join(log_dir, 'parse_neo4j_process.log'), encoding='utf-8'),
  55. logging.StreamHandler(sys.stdout)
  56. ]
  57. )
  58. return logging.getLogger(__name__)
  59. class HotelPositionNeo4jProcessor:
  60. """酒店职位数据和酒店集团品牌数据Neo4j处理器"""
  61. def __init__(self):
  62. """初始化处理器"""
  63. self.logger = logging.getLogger(__name__)
  64. # 直接使用数据库连接信息,不依赖Flask配置
  65. self.pg_connection_string = 'postgresql://postgres:postgres@localhost:5432/dataops'
  66. self.pg_engine = None
  67. self.neo4j_driver = None
  68. def connect_postgresql(self):
  69. """连接PostgreSQL数据库"""
  70. try:
  71. self.pg_engine = create_engine(self.pg_connection_string)
  72. # 测试连接
  73. with self.pg_engine.connect() as conn:
  74. conn.execute(text("SELECT 1"))
  75. self.logger.info("PostgreSQL数据库连接成功")
  76. return True
  77. except SQLAlchemyError as e:
  78. self.logger.error(f"PostgreSQL数据库连接失败: {e}")
  79. return False
  80. except Exception as e:
  81. self.logger.error(f"连接PostgreSQL时发生未知错误: {e}")
  82. return False
  83. def connect_neo4j(self):
  84. """连接Neo4j数据库,从Flask配置获取连接信息"""
  85. try:
  86. # 从Flask配置获取Neo4j连接信息(统一配置源:app/config/config.py)
  87. # 如果不传参数,Neo4jDriver会自动从Flask配置获取
  88. self.neo4j_driver = Neo4jDriver()
  89. if self.neo4j_driver.verify_connectivity():
  90. self.logger.info("Neo4j数据库连接成功")
  91. return True
  92. else:
  93. self.logger.error("Neo4j数据库连接失败")
  94. return False
  95. except ValueError as e:
  96. self.logger.error(f"Neo4j配置错误: {e}")
  97. return False
  98. except Exception as e:
  99. self.logger.error(f"连接Neo4j时发生未知错误: {e}")
  100. return False
  101. def get_hotel_positions(self) -> List[Dict[str, Any]]:
  102. """从PostgreSQL数据库获取酒店职位数据"""
  103. try:
  104. if not self.pg_engine:
  105. self.logger.error("PostgreSQL引擎未初始化")
  106. return []
  107. query = """
  108. SELECT DISTINCT
  109. department_zh, department_en,
  110. position_zh, position_en,
  111. level_zh, level_en
  112. FROM hotel_positions
  113. WHERE department_zh IS NOT NULL
  114. AND department_zh != ''
  115. AND position_zh IS NOT NULL
  116. AND position_zh != ''
  117. AND level_zh IS NOT NULL
  118. AND level_zh != ''
  119. AND status = 'active'
  120. ORDER BY department_zh, position_zh, level_zh
  121. """
  122. with self.pg_engine.connect() as conn:
  123. result = conn.execute(text(query))
  124. positions = []
  125. for row in result:
  126. positions.append({
  127. 'department_zh': row[0],
  128. 'department_en': row[1] or '',
  129. 'position_zh': row[2],
  130. 'position_en': row[3] or '',
  131. 'level_zh': row[4],
  132. 'level_en': row[5] or ''
  133. })
  134. self.logger.info(f"成功获取 {len(positions)} 条酒店职位数据")
  135. return positions
  136. except SQLAlchemyError as e:
  137. self.logger.error(f"查询PostgreSQL数据库失败: {e}")
  138. return []
  139. except Exception as e:
  140. self.logger.error(f"获取酒店职位数据时发生未知错误: {e}")
  141. return []
  142. def get_hotel_group_brands(self) -> List[Dict[str, Any]]:
  143. """从PostgreSQL数据库获取酒店集团品牌数据"""
  144. try:
  145. if not self.pg_engine:
  146. self.logger.error("PostgreSQL引擎未初始化")
  147. return []
  148. query = """
  149. SELECT DISTINCT
  150. group_name_zh, group_name_en,
  151. brand_name_zh, brand_name_en,
  152. positioning_level_zh, positioning_level_en
  153. FROM hotel_group_brands
  154. WHERE group_name_zh IS NOT NULL
  155. AND group_name_zh != ''
  156. AND brand_name_zh IS NOT NULL
  157. AND brand_name_zh != ''
  158. AND positioning_level_zh IS NOT NULL
  159. AND positioning_level_zh != ''
  160. AND status = 'active'
  161. ORDER BY group_name_zh, brand_name_zh, positioning_level_zh
  162. """
  163. with self.pg_engine.connect() as conn:
  164. result = conn.execute(text(query))
  165. brands = []
  166. for row in result:
  167. brands.append({
  168. 'group_name_zh': row[0],
  169. 'group_name_en': row[1] or '',
  170. 'brand_name_zh': row[2],
  171. 'brand_name_en': row[3] or '',
  172. 'positioning_level_zh': row[4],
  173. 'positioning_level_en': row[5] or ''
  174. })
  175. self.logger.info(f"成功获取 {len(brands)} 条酒店集团品牌数据")
  176. return brands
  177. except SQLAlchemyError as e:
  178. self.logger.error(f"查询PostgreSQL数据库失败: {e}")
  179. return []
  180. except Exception as e:
  181. self.logger.error(f"获取酒店集团品牌数据时发生未知错误: {e}")
  182. return []
  183. def check_neo4j_node_exists(self, session, name: str) -> bool:
  184. """检查Neo4j中是否已存在相同name_zh的DataLabel节点"""
  185. try:
  186. query = "MATCH (n:DataLabel {name_zh: $name}) RETURN n LIMIT 1"
  187. result = session.run(query, name=name)
  188. return result.single() is not None
  189. except Exception as e:
  190. self.logger.error(f"检查Neo4j节点存在性时发生错误: {e}")
  191. return False
  192. def create_neo4j_node(self, session, node_data: Dict[str, str], node_type: str) -> bool:
  193. """在Neo4j中创建DataLabel节点"""
  194. try:
  195. current_time = get_east_asia_time_str()
  196. query = """
  197. CREATE (n:DataLabel {
  198. name_zh: $name_zh,
  199. name_en: $name_en,
  200. describe: $describe,
  201. time: $time,
  202. category: $category,
  203. status: $status,
  204. node_type: $node_type
  205. })
  206. """
  207. parameters = {
  208. 'name_zh': node_data['name_zh'],
  209. 'name_en': node_data['name_en'],
  210. 'describe': '',
  211. 'time': current_time,
  212. 'category': '人才地图',
  213. 'status': 'active',
  214. 'node_type': node_type
  215. }
  216. session.run(query, **parameters)
  217. return True
  218. except Exception as e:
  219. self.logger.error(f"创建Neo4j节点时发生错误: {e}")
  220. return False
  221. def create_relationship(self, session, from_name: str, to_name: str, relationship_type: str) -> bool:
  222. """创建两个DataLabel节点之间的关系"""
  223. try:
  224. query = """
  225. MATCH (from:DataLabel {name_zh: $from_name})
  226. MATCH (to:DataLabel {name_zh: $to_name})
  227. MERGE (from)-[r:$relationship_type]->(to)
  228. RETURN r
  229. """
  230. # 使用参数化查询避免Cypher注入
  231. if relationship_type == "BELONGS_TO":
  232. query = """
  233. MATCH (from:DataLabel {name_zh: $from_name})
  234. MATCH (to:DataLabel {name_zh: $to_name})
  235. MERGE (from)-[r:BELONGS_TO]->(to)
  236. RETURN r
  237. """
  238. elif relationship_type == "HAS_LEVEL":
  239. query = """
  240. MATCH (from:DataLabel {name_zh: $from_name})
  241. MATCH (to:DataLabel {name_zh: $to_name})
  242. MERGE (from)-[r:HAS_LEVEL]->(to)
  243. RETURN r
  244. """
  245. result = session.run(query, from_name=from_name, to_name=to_name)
  246. return result.single() is not None
  247. except Exception as e:
  248. self.logger.error(f"创建关系时发生错误: {e}")
  249. return False
  250. def process_hotel_positions(self) -> Dict[str, Any]:
  251. """处理酒店职位数据同步到Neo4j"""
  252. try:
  253. # 获取酒店职位数据
  254. positions = self.get_hotel_positions()
  255. if not positions:
  256. return {
  257. 'success': False,
  258. 'message': '没有获取到酒店职位数据',
  259. 'total': 0,
  260. 'departments_created': 0,
  261. 'departments_skipped': 0,
  262. 'positions_created': 0,
  263. 'positions_skipped': 0,
  264. 'levels_created': 0,
  265. 'levels_skipped': 0,
  266. 'relationships_created': 0
  267. }
  268. total_count = len(positions)
  269. departments_created = 0
  270. departments_skipped = 0
  271. positions_created = 0
  272. positions_skipped = 0
  273. levels_created = 0
  274. levels_skipped = 0
  275. relationships_created = 0
  276. # 获取Neo4j会话
  277. if not self.neo4j_driver:
  278. self.logger.error("Neo4j驱动器未初始化")
  279. return {
  280. 'success': False,
  281. 'message': 'Neo4j驱动器未初始化',
  282. 'total': 0,
  283. 'departments_created': 0,
  284. 'departments_skipped': 0,
  285. 'positions_created': 0,
  286. 'positions_skipped': 0,
  287. 'levels_created': 0,
  288. 'levels_skipped': 0,
  289. 'relationships_created': 0
  290. }
  291. with self.neo4j_driver.get_session() as session:
  292. for position in positions:
  293. department_zh = position['department_zh']
  294. position_zh = position['position_zh']
  295. level_zh = position['level_zh']
  296. # 处理部门节点
  297. if not self.check_neo4j_node_exists(session, department_zh):
  298. dept_data = {
  299. 'name_zh': department_zh,
  300. 'name_en': position['department_en']
  301. }
  302. if self.create_neo4j_node(session, dept_data, 'department'):
  303. self.logger.info(f"成功创建部门节点: {department_zh}")
  304. departments_created += 1
  305. else:
  306. self.logger.error(f"创建部门节点失败: {department_zh}")
  307. else:
  308. self.logger.info(f"部门节点已存在,跳过: {department_zh}")
  309. departments_skipped += 1
  310. # 处理职位节点
  311. if not self.check_neo4j_node_exists(session, position_zh):
  312. pos_data = {
  313. 'name_zh': position_zh,
  314. 'name_en': position['position_en']
  315. }
  316. if self.create_neo4j_node(session, pos_data, 'position'):
  317. self.logger.info(f"成功创建职位节点: {position_zh}")
  318. positions_created += 1
  319. else:
  320. self.logger.error(f"创建职位节点失败: {position_zh}")
  321. else:
  322. self.logger.info(f"职位节点已存在,跳过: {position_zh}")
  323. positions_skipped += 1
  324. # 处理级别节点
  325. if not self.check_neo4j_node_exists(session, level_zh):
  326. level_data = {
  327. 'name_zh': level_zh,
  328. 'name_en': position['level_en']
  329. }
  330. if self.create_neo4j_node(session, level_data, 'position_level'):
  331. self.logger.info(f"成功创建级别节点: {level_zh}")
  332. levels_created += 1
  333. else:
  334. self.logger.error(f"创建级别节点失败: {level_zh}")
  335. else:
  336. self.logger.info(f"级别节点已存在,跳过: {level_zh}")
  337. levels_skipped += 1
  338. # 创建关系
  339. # 职位属于部门的关系
  340. if self.create_relationship(session, position_zh, department_zh, "BELONGS_TO"):
  341. self.logger.info(f"成功创建关系: {position_zh} BELONGS_TO {department_zh}")
  342. relationships_created += 1
  343. else:
  344. self.logger.error(f"创建关系失败: {position_zh} BELONGS_TO {department_zh}")
  345. # 职位具有级别的关系
  346. if self.create_relationship(session, position_zh, level_zh, "HAS_LEVEL"):
  347. self.logger.info(f"成功创建关系: {position_zh} HAS_LEVEL {level_zh}")
  348. relationships_created += 1
  349. else:
  350. self.logger.error(f"创建关系失败: {position_zh} HAS_LEVEL {level_zh}")
  351. return {
  352. 'success': True,
  353. 'message': '酒店职位数据同步完成',
  354. 'total': total_count,
  355. 'departments_created': departments_created,
  356. 'departments_skipped': departments_skipped,
  357. 'positions_created': positions_created,
  358. 'positions_skipped': positions_skipped,
  359. 'levels_created': levels_created,
  360. 'levels_skipped': levels_skipped,
  361. 'relationships_created': relationships_created
  362. }
  363. except Exception as e:
  364. self.logger.error(f"处理酒店职位数据时发生错误: {e}")
  365. return {
  366. 'success': False,
  367. 'message': f'处理失败: {str(e)}',
  368. 'total': 0,
  369. 'departments_created': 0,
  370. 'departments_skipped': 0,
  371. 'positions_created': 0,
  372. 'positions_skipped': 0,
  373. 'levels_created': 0,
  374. 'levels_skipped': 0,
  375. 'relationships_created': 0
  376. }
  377. def process_hotel_group_brands(self) -> Dict[str, Any]:
  378. """处理酒店集团品牌数据同步到Neo4j"""
  379. try:
  380. # 获取酒店集团品牌数据
  381. brands = self.get_hotel_group_brands()
  382. if not brands:
  383. return {
  384. 'success': False,
  385. 'message': '没有获取到酒店集团品牌数据',
  386. 'total': 0,
  387. 'groups_created': 0,
  388. 'groups_skipped': 0,
  389. 'brands_created': 0,
  390. 'brands_skipped': 0,
  391. 'brand_levels_created': 0,
  392. 'brand_levels_skipped': 0,
  393. 'relationships_created': 0
  394. }
  395. total_count = len(brands)
  396. groups_created = 0
  397. groups_skipped = 0
  398. brands_created = 0
  399. brands_skipped = 0
  400. brand_levels_created = 0
  401. brand_levels_skipped = 0
  402. relationships_created = 0
  403. # 获取Neo4j会话
  404. if not self.neo4j_driver:
  405. self.logger.error("Neo4j驱动器未初始化")
  406. return {
  407. 'success': False,
  408. 'message': 'Neo4j驱动器未初始化',
  409. 'total': 0,
  410. 'groups_created': 0,
  411. 'groups_skipped': 0,
  412. 'brands_created': 0,
  413. 'brands_skipped': 0,
  414. 'brand_levels_created': 0,
  415. 'brand_levels_skipped': 0,
  416. 'relationships_created': 0
  417. }
  418. with self.neo4j_driver.get_session() as session:
  419. for brand in brands:
  420. group_name_zh = brand['group_name_zh']
  421. brand_name_zh = brand['brand_name_zh']
  422. positioning_level_zh = brand['positioning_level_zh']
  423. # 处理集团节点
  424. if not self.check_neo4j_node_exists(session, group_name_zh):
  425. group_data = {
  426. 'name_zh': group_name_zh,
  427. 'name_en': brand['group_name_en']
  428. }
  429. if self.create_neo4j_node(session, group_data, 'group'):
  430. self.logger.info(f"成功创建集团节点: {group_name_zh}")
  431. groups_created += 1
  432. else:
  433. self.logger.error(f"创建集团节点失败: {group_name_zh}")
  434. else:
  435. self.logger.info(f"集团节点已存在,跳过: {group_name_zh}")
  436. groups_skipped += 1
  437. # 处理品牌节点
  438. if not self.check_neo4j_node_exists(session, brand_name_zh):
  439. brand_data = {
  440. 'name_zh': brand_name_zh,
  441. 'name_en': brand['brand_name_en']
  442. }
  443. if self.create_neo4j_node(session, brand_data, 'brand'):
  444. self.logger.info(f"成功创建品牌节点: {brand_name_zh}")
  445. brands_created += 1
  446. else:
  447. self.logger.error(f"创建品牌节点失败: {brand_name_zh}")
  448. else:
  449. self.logger.info(f"品牌节点已存在,跳过: {brand_name_zh}")
  450. brands_skipped += 1
  451. # 处理品牌级别节点
  452. if not self.check_neo4j_node_exists(session, positioning_level_zh):
  453. level_data = {
  454. 'name_zh': positioning_level_zh,
  455. 'name_en': brand['positioning_level_en']
  456. }
  457. if self.create_neo4j_node(session, level_data, 'brand_level'):
  458. self.logger.info(f"成功创建品牌级别节点: {positioning_level_zh}")
  459. brand_levels_created += 1
  460. else:
  461. self.logger.error(f"创建品牌级别节点失败: {positioning_level_zh}")
  462. else:
  463. self.logger.info(f"品牌级别节点已存在,跳过: {positioning_level_zh}")
  464. brand_levels_skipped += 1
  465. # 创建关系
  466. # 品牌属于集团的关系
  467. if self.create_relationship(session, brand_name_zh, group_name_zh, "BELONGS_TO"):
  468. self.logger.info(f"成功创建关系: {brand_name_zh} BELONGS_TO {group_name_zh}")
  469. relationships_created += 1
  470. else:
  471. self.logger.error(f"创建关系失败: {brand_name_zh} BELONGS_TO {group_name_zh}")
  472. # 品牌具有级别的关系
  473. if self.create_relationship(session, brand_name_zh, positioning_level_zh, "HAS_LEVEL"):
  474. self.logger.info(f"成功创建关系: {brand_name_zh} HAS_LEVEL {positioning_level_zh}")
  475. relationships_created += 1
  476. else:
  477. self.logger.error(f"创建关系失败: {brand_name_zh} HAS_LEVEL {positioning_level_zh}")
  478. return {
  479. 'success': True,
  480. 'message': '酒店集团品牌数据同步完成',
  481. 'total': total_count,
  482. 'groups_created': groups_created,
  483. 'groups_skipped': groups_skipped,
  484. 'brands_created': brands_created,
  485. 'brands_skipped': brands_skipped,
  486. 'brand_levels_created': brand_levels_created,
  487. 'brand_levels_skipped': brand_levels_skipped,
  488. 'relationships_created': relationships_created
  489. }
  490. except Exception as e:
  491. self.logger.error(f"处理酒店集团品牌数据时发生错误: {e}")
  492. return {
  493. 'success': False,
  494. 'message': f'处理失败: {str(e)}',
  495. 'total': 0,
  496. 'groups_created': 0,
  497. 'groups_skipped': 0,
  498. 'brands_created': 0,
  499. 'brands_skipped': 0,
  500. 'brand_levels_created': 0,
  501. 'brand_levels_skipped': 0,
  502. 'relationships_created': 0
  503. }
  504. def run(self) -> bool:
  505. """运行主程序"""
  506. self.logger.info("开始执行酒店职位数据和酒店集团品牌数据Neo4j同步程序")
  507. try:
  508. # 连接数据库
  509. if not self.connect_postgresql():
  510. self.logger.error("无法连接PostgreSQL数据库,程序退出")
  511. return False
  512. if not self.connect_neo4j():
  513. self.logger.error("无法连接Neo4j数据库,程序退出")
  514. return False
  515. # 处理酒店职位数据同步
  516. self.logger.info("开始处理酒店职位数据...")
  517. positions_result = self.process_hotel_positions()
  518. if positions_result['success']:
  519. self.logger.info(f"酒店职位数据同步完成: {positions_result['message']}")
  520. self.logger.info(f"总计记录: {positions_result['total']}")
  521. self.logger.info(f"部门节点 - 新建: {positions_result['departments_created']}, 跳过: {positions_result['departments_skipped']}")
  522. self.logger.info(f"职位节点 - 新建: {positions_result['positions_created']}, 跳过: {positions_result['positions_skipped']}")
  523. self.logger.info(f"级别节点 - 新建: {positions_result['levels_created']}, 跳过: {positions_result['levels_skipped']}")
  524. self.logger.info(f"关系创建: {positions_result['relationships_created']}")
  525. else:
  526. self.logger.error(f"酒店职位数据同步失败: {positions_result['message']}")
  527. # 处理酒店集团品牌数据同步
  528. self.logger.info("开始处理酒店集团品牌数据...")
  529. brands_result = self.process_hotel_group_brands()
  530. if brands_result['success']:
  531. self.logger.info(f"酒店集团品牌数据同步完成: {brands_result['message']}")
  532. self.logger.info(f"总计记录: {brands_result['total']}")
  533. self.logger.info(f"集团节点 - 新建: {brands_result['groups_created']}, 跳过: {brands_result['groups_skipped']}")
  534. self.logger.info(f"品牌节点 - 新建: {brands_result['brands_created']}, 跳过: {brands_result['brands_skipped']}")
  535. self.logger.info(f"品牌级别节点 - 新建: {brands_result['brand_levels_created']}, 跳过: {brands_result['brand_levels_skipped']}")
  536. self.logger.info(f"关系创建: {brands_result['relationships_created']}")
  537. else:
  538. self.logger.error(f"酒店集团品牌数据同步失败: {brands_result['message']}")
  539. # 判断整体执行结果
  540. overall_success = positions_result['success'] and brands_result['success']
  541. if overall_success:
  542. self.logger.info("所有数据同步任务完成")
  543. else:
  544. self.logger.warning("部分数据同步任务失败")
  545. return overall_success
  546. except Exception as e:
  547. self.logger.error(f"程序执行过程中发生未知错误: {e}")
  548. return False
  549. finally:
  550. # 清理资源
  551. if self.pg_engine:
  552. self.pg_engine.dispose()
  553. if self.neo4j_driver:
  554. self.neo4j_driver.close()
  555. self.logger.info("程序执行完成,资源已清理")
  556. def main():
  557. """主函数"""
  558. # 设置日志
  559. logger = setup_logging()
  560. try:
  561. # 创建处理器并运行
  562. processor = HotelPositionNeo4jProcessor()
  563. success = processor.run()
  564. if success:
  565. logger.info("程序执行成功")
  566. sys.exit(0)
  567. else:
  568. logger.error("程序执行失败")
  569. sys.exit(1)
  570. except KeyboardInterrupt:
  571. logger.info("程序被用户中断")
  572. sys.exit(0)
  573. except Exception as e:
  574. logger.error(f"程序执行时发生未处理的错误: {e}")
  575. sys.exit(1)
  576. if __name__ == "__main__":
  577. main()