neo4j_driver.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. from neo4j import GraphDatabase
  2. from neo4j.exceptions import ServiceUnavailable
  3. import os
  4. class Neo4jDriver:
  5. def __init__(self, uri=None, user=None, password=None, encrypted=None):
  6. """
  7. 初始化Neo4j驱动
  8. Args:
  9. uri: Neo4j URI(可选,如果不提供则从Flask配置获取)
  10. user: Neo4j用户名(可选,如果不提供则从Flask配置获取)
  11. password: Neo4j密码(可选,如果不提供则从Flask配置获取)
  12. encrypted: 是否加密连接(可选,如果不提供则从Flask配置获取)
  13. Raises:
  14. ValueError: 如果配置参数缺失
  15. """
  16. self._driver = None
  17. # 优先使用传入的参数,否则从Flask配置获取
  18. if uri is not None:
  19. self.uri = uri
  20. else:
  21. self.uri = self._get_config_value('NEO4J_URI')
  22. if not self.uri:
  23. raise ValueError("Neo4j URI配置缺失,请检查app/config/config.py中的NEO4J_URI配置")
  24. if user is not None:
  25. self.user = user
  26. else:
  27. self.user = self._get_config_value('NEO4J_USER')
  28. if not self.user:
  29. raise ValueError("Neo4j用户配置缺失,请检查app/config/config.py中的NEO4J_USER配置")
  30. if password is not None:
  31. self.password = password
  32. else:
  33. self.password = self._get_config_value('NEO4J_PASSWORD')
  34. if self.password is None:
  35. raise ValueError("Neo4j密码配置缺失,请检查app/config/config.py中的NEO4J_PASSWORD配置")
  36. if encrypted is not None:
  37. self.encrypted = encrypted
  38. else:
  39. encrypted_value = self._get_config_value('NEO4J_ENCRYPTED')
  40. if encrypted_value is None:
  41. # 如果配置中没有,默认为False
  42. self.encrypted = False
  43. elif isinstance(encrypted_value, bool):
  44. self.encrypted = encrypted_value
  45. elif isinstance(encrypted_value, str):
  46. self.encrypted = encrypted_value.lower() == 'true'
  47. else:
  48. self.encrypted = False
  49. def _get_config_value(self, key):
  50. """
  51. 获取配置值,优先从Flask配置获取,否则从环境变量获取
  52. Args:
  53. key: 配置键名
  54. Returns:
  55. 配置值,如果不存在则返回None
  56. Raises:
  57. RuntimeError: 如果不在Flask环境中且环境变量也不存在
  58. """
  59. try:
  60. # 优先从Flask配置获取(这是统一配置源)
  61. from flask import current_app
  62. if current_app and hasattr(current_app, 'config'):
  63. value = current_app.config.get(key)
  64. if value is not None:
  65. return value
  66. except (ImportError, RuntimeError):
  67. # 不在Flask环境中或Flask应用上下文外,尝试从环境变量获取
  68. pass
  69. # 如果Flask配置中没有,尝试从环境变量获取(用于非Flask环境)
  70. return os.environ.get(key)
  71. def connect(self):
  72. if not self._driver:
  73. self._driver = GraphDatabase.driver(
  74. self.uri,
  75. auth=(self.user, self.password),
  76. encrypted=self.encrypted
  77. )
  78. return self._driver
  79. def close(self):
  80. if self._driver:
  81. self._driver.close()
  82. self._driver = None
  83. def verify_connectivity(self):
  84. try:
  85. self.connect().verify_connectivity()
  86. return True
  87. except ServiceUnavailable:
  88. return False
  89. def get_session(self):
  90. """获取 Neo4j 会话"""
  91. return self.connect().session()
  92. class Neo4jDriverSingleton:
  93. """
  94. Neo4j驱动单例包装类
  95. 延迟初始化,避免在模块导入时Flask应用上下文未初始化的问题
  96. """
  97. def __init__(self):
  98. self._driver = None
  99. def _get_driver(self):
  100. """获取或创建Neo4j驱动实例(延迟初始化)"""
  101. if self._driver is None:
  102. self._driver = Neo4jDriver()
  103. return self._driver
  104. def connect(self):
  105. """连接到Neo4j数据库"""
  106. return self._get_driver().connect()
  107. def close(self):
  108. """关闭Neo4j连接"""
  109. if self._driver:
  110. self._driver.close()
  111. self._driver = None
  112. def verify_connectivity(self):
  113. """验证Neo4j连接"""
  114. return self._get_driver().verify_connectivity()
  115. def get_session(self):
  116. """获取 Neo4j 会话"""
  117. return self._get_driver().get_session()
  118. # 单例实例(延迟初始化,只在第一次使用时创建)
  119. neo4j_driver = Neo4jDriverSingleton()