neo4j_driver.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. import logging
  2. import os
  3. from neo4j import GraphDatabase
  4. from neo4j.exceptions import AuthError, ServiceUnavailable
  5. logger = logging.getLogger(__name__)
  6. class Neo4jDriver:
  7. def __init__(self, uri=None, user=None, password=None, encrypted=None):
  8. """
  9. 初始化Neo4j驱动
  10. Args:
  11. uri: Neo4j URI(可选,如果不提供则从Flask配置获取)
  12. user: Neo4j用户名(可选,如果不提供则从Flask配置获取)
  13. password: Neo4j密码(可选,如果不提供则从Flask配置获取)
  14. encrypted: 是否加密连接(可选,如果不提供则从Flask配置获取)
  15. Raises:
  16. ValueError: 如果配置参数缺失
  17. """
  18. self._driver = None
  19. # 优先使用传入的参数,否则从Flask配置获取
  20. if uri is not None:
  21. self.uri = uri
  22. else:
  23. self.uri = self._get_config_value("NEO4J_URI")
  24. if not self.uri:
  25. raise ValueError(
  26. "Neo4j URI配置缺失,请检查app/config/config.py中的NEO4J_URI配置"
  27. )
  28. if user is not None:
  29. self.user = user
  30. else:
  31. self.user = self._get_config_value("NEO4J_USER")
  32. if not self.user:
  33. raise ValueError(
  34. "Neo4j用户配置缺失,请检查app/config/config.py中的NEO4J_USER配置"
  35. )
  36. if password is not None:
  37. self.password = password
  38. else:
  39. self.password = self._get_config_value("NEO4J_PASSWORD")
  40. if self.password is None:
  41. raise ValueError(
  42. "Neo4j密码配置缺失,请检查app/config/config.py中的NEO4J_PASSWORD配置"
  43. )
  44. if encrypted is not None:
  45. self.encrypted = encrypted
  46. else:
  47. encrypted_value = self._get_config_value("NEO4J_ENCRYPTED")
  48. if encrypted_value is None:
  49. # 如果配置中没有,默认为False
  50. self.encrypted = False
  51. elif isinstance(encrypted_value, bool):
  52. self.encrypted = encrypted_value
  53. elif isinstance(encrypted_value, str):
  54. self.encrypted = encrypted_value.lower() == "true"
  55. else:
  56. self.encrypted = False
  57. def _get_config_value(self, key):
  58. """
  59. 获取配置值,优先从Flask配置获取,否则从环境变量获取
  60. Args:
  61. key: 配置键名
  62. Returns:
  63. 配置值,如果不存在则返回None
  64. Raises:
  65. RuntimeError: 如果不在Flask环境中且环境变量也不存在
  66. """
  67. try:
  68. # 优先从Flask配置获取(这是统一配置源)
  69. from flask import current_app
  70. if current_app and hasattr(current_app, "config"):
  71. value = current_app.config.get(key)
  72. if value is not None:
  73. return value
  74. except (ImportError, RuntimeError):
  75. # 不在Flask环境中或Flask应用上下文外,尝试从环境变量获取
  76. pass
  77. # 如果Flask配置中没有,尝试从环境变量获取(用于非Flask环境)
  78. return os.environ.get(key)
  79. def connect(self):
  80. if not self._driver:
  81. # user 和 password 在 __init__ 中已验证不为 None
  82. self._driver = GraphDatabase.driver(
  83. self.uri or "",
  84. auth=(str(self.user), str(self.password)),
  85. encrypted=self.encrypted,
  86. )
  87. return self._driver
  88. def close(self):
  89. if self._driver:
  90. self._driver.close()
  91. self._driver = None
  92. def verify_connectivity(self):
  93. try:
  94. self.connect().verify_connectivity()
  95. return True
  96. except (ServiceUnavailable, AuthError) as exc:
  97. logger.error(f"Neo4j连接失败: {exc}")
  98. return False
  99. def get_session(self):
  100. """获取 Neo4j 会话"""
  101. return self.connect().session()
  102. class Neo4jDriverSingleton:
  103. """
  104. Neo4j驱动单例包装类
  105. 延迟初始化,避免在模块导入时Flask应用上下文未初始化的问题
  106. """
  107. def __init__(self):
  108. self._driver = None
  109. def _get_driver(self):
  110. """获取或创建Neo4j驱动实例(延迟初始化)"""
  111. if self._driver is None:
  112. self._driver = Neo4jDriver()
  113. return self._driver
  114. def connect(self):
  115. """连接到Neo4j数据库"""
  116. return self._get_driver().connect()
  117. def close(self):
  118. """关闭Neo4j连接"""
  119. if self._driver:
  120. self._driver.close()
  121. self._driver = None
  122. def verify_connectivity(self):
  123. """验证Neo4j连接"""
  124. return self._get_driver().verify_connectivity()
  125. def get_session(self):
  126. """获取 Neo4j 会话"""
  127. return self._get_driver().get_session()
  128. # 单例实例(延迟初始化,只在第一次使用时创建)
  129. neo4j_driver = Neo4jDriverSingleton()