neo4j_driver.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. import os
  2. from neo4j import GraphDatabase
  3. from neo4j.exceptions import ServiceUnavailable
  4. class Neo4jDriver:
  5. def __init__(self, uri=None, user=None, password=None, encrypted=None):
  6. """
  7. 初始化Neo4j驱动
  8. Args:
  9. uri: Neo4j URI(可选,如果不提供则从Flask配置获取)
  10. user: Neo4j用户名(可选,如果不提供则从Flask配置获取)
  11. password: Neo4j密码(可选,如果不提供则从Flask配置获取)
  12. encrypted: 是否加密连接(可选,如果不提供则从Flask配置获取)
  13. Raises:
  14. ValueError: 如果配置参数缺失
  15. """
  16. self._driver = None
  17. # 优先使用传入的参数,否则从Flask配置获取
  18. if uri is not None:
  19. self.uri = uri
  20. else:
  21. self.uri = self._get_config_value("NEO4J_URI")
  22. if not self.uri:
  23. raise ValueError(
  24. "Neo4j URI配置缺失,请检查app/config/config.py中的NEO4J_URI配置"
  25. )
  26. if user is not None:
  27. self.user = user
  28. else:
  29. self.user = self._get_config_value("NEO4J_USER")
  30. if not self.user:
  31. raise ValueError(
  32. "Neo4j用户配置缺失,请检查app/config/config.py中的NEO4J_USER配置"
  33. )
  34. if password is not None:
  35. self.password = password
  36. else:
  37. self.password = self._get_config_value("NEO4J_PASSWORD")
  38. if self.password is None:
  39. raise ValueError(
  40. "Neo4j密码配置缺失,请检查app/config/config.py中的NEO4J_PASSWORD配置"
  41. )
  42. if encrypted is not None:
  43. self.encrypted = encrypted
  44. else:
  45. encrypted_value = self._get_config_value("NEO4J_ENCRYPTED")
  46. if encrypted_value is None:
  47. # 如果配置中没有,默认为False
  48. self.encrypted = False
  49. elif isinstance(encrypted_value, bool):
  50. self.encrypted = encrypted_value
  51. elif isinstance(encrypted_value, str):
  52. self.encrypted = encrypted_value.lower() == "true"
  53. else:
  54. self.encrypted = False
  55. def _get_config_value(self, key):
  56. """
  57. 获取配置值,优先从Flask配置获取,否则从环境变量获取
  58. Args:
  59. key: 配置键名
  60. Returns:
  61. 配置值,如果不存在则返回None
  62. Raises:
  63. RuntimeError: 如果不在Flask环境中且环境变量也不存在
  64. """
  65. try:
  66. # 优先从Flask配置获取(这是统一配置源)
  67. from flask import current_app
  68. if current_app and hasattr(current_app, "config"):
  69. value = current_app.config.get(key)
  70. if value is not None:
  71. return value
  72. except (ImportError, RuntimeError):
  73. # 不在Flask环境中或Flask应用上下文外,尝试从环境变量获取
  74. pass
  75. # 如果Flask配置中没有,尝试从环境变量获取(用于非Flask环境)
  76. return os.environ.get(key)
  77. def connect(self):
  78. if not self._driver:
  79. # user 和 password 在 __init__ 中已验证不为 None
  80. self._driver = GraphDatabase.driver(
  81. self.uri or "",
  82. auth=(str(self.user), str(self.password)),
  83. encrypted=self.encrypted,
  84. )
  85. return self._driver
  86. def close(self):
  87. if self._driver:
  88. self._driver.close()
  89. self._driver = None
  90. def verify_connectivity(self):
  91. try:
  92. self.connect().verify_connectivity()
  93. return True
  94. except ServiceUnavailable:
  95. return False
  96. def get_session(self):
  97. """获取 Neo4j 会话"""
  98. return self.connect().session()
  99. class Neo4jDriverSingleton:
  100. """
  101. Neo4j驱动单例包装类
  102. 延迟初始化,避免在模块导入时Flask应用上下文未初始化的问题
  103. """
  104. def __init__(self):
  105. self._driver = None
  106. def _get_driver(self):
  107. """获取或创建Neo4j驱动实例(延迟初始化)"""
  108. if self._driver is None:
  109. self._driver = Neo4jDriver()
  110. return self._driver
  111. def connect(self):
  112. """连接到Neo4j数据库"""
  113. return self._get_driver().connect()
  114. def close(self):
  115. """关闭Neo4j连接"""
  116. if self._driver:
  117. self._driver.close()
  118. self._driver = None
  119. def verify_connectivity(self):
  120. """验证Neo4j连接"""
  121. return self._get_driver().verify_connectivity()
  122. def get_session(self):
  123. """获取 Neo4j 会话"""
  124. return self._get_driver().get_session()
  125. # 单例实例(延迟初始化,只在第一次使用时创建)
  126. neo4j_driver = Neo4jDriverSingleton()