import os from neo4j import GraphDatabase from neo4j.exceptions import ServiceUnavailable class Neo4jDriver: def __init__(self, uri=None, user=None, password=None, encrypted=None): """ 初始化Neo4j驱动 Args: uri: Neo4j URI(可选,如果不提供则从Flask配置获取) user: Neo4j用户名(可选,如果不提供则从Flask配置获取) password: Neo4j密码(可选,如果不提供则从Flask配置获取) encrypted: 是否加密连接(可选,如果不提供则从Flask配置获取) Raises: ValueError: 如果配置参数缺失 """ self._driver = None # 优先使用传入的参数,否则从Flask配置获取 if uri is not None: self.uri = uri else: self.uri = self._get_config_value("NEO4J_URI") if not self.uri: raise ValueError( "Neo4j URI配置缺失,请检查app/config/config.py中的NEO4J_URI配置" ) if user is not None: self.user = user else: self.user = self._get_config_value("NEO4J_USER") if not self.user: raise ValueError( "Neo4j用户配置缺失,请检查app/config/config.py中的NEO4J_USER配置" ) if password is not None: self.password = password else: self.password = self._get_config_value("NEO4J_PASSWORD") if self.password is None: raise ValueError( "Neo4j密码配置缺失,请检查app/config/config.py中的NEO4J_PASSWORD配置" ) if encrypted is not None: self.encrypted = encrypted else: encrypted_value = self._get_config_value("NEO4J_ENCRYPTED") if encrypted_value is None: # 如果配置中没有,默认为False self.encrypted = False elif isinstance(encrypted_value, bool): self.encrypted = encrypted_value elif isinstance(encrypted_value, str): self.encrypted = encrypted_value.lower() == "true" else: self.encrypted = False def _get_config_value(self, key): """ 获取配置值,优先从Flask配置获取,否则从环境变量获取 Args: key: 配置键名 Returns: 配置值,如果不存在则返回None Raises: RuntimeError: 如果不在Flask环境中且环境变量也不存在 """ try: # 优先从Flask配置获取(这是统一配置源) from flask import current_app if current_app and hasattr(current_app, "config"): value = current_app.config.get(key) if value is not None: return value except (ImportError, RuntimeError): # 不在Flask环境中或Flask应用上下文外,尝试从环境变量获取 pass # 如果Flask配置中没有,尝试从环境变量获取(用于非Flask环境) return os.environ.get(key) def connect(self): if not self._driver: # user 和 password 在 __init__ 中已验证不为 None self._driver = GraphDatabase.driver( self.uri or "", auth=(str(self.user), str(self.password)), encrypted=self.encrypted, ) return self._driver def close(self): if self._driver: self._driver.close() self._driver = None def verify_connectivity(self): try: self.connect().verify_connectivity() return True except ServiceUnavailable: return False def get_session(self): """获取 Neo4j 会话""" return self.connect().session() class Neo4jDriverSingleton: """ Neo4j驱动单例包装类 延迟初始化,避免在模块导入时Flask应用上下文未初始化的问题 """ def __init__(self): self._driver = None def _get_driver(self): """获取或创建Neo4j驱动实例(延迟初始化)""" if self._driver is None: self._driver = Neo4jDriver() return self._driver def connect(self): """连接到Neo4j数据库""" return self._get_driver().connect() def close(self): """关闭Neo4j连接""" if self._driver: self._driver.close() self._driver = None def verify_connectivity(self): """验证Neo4j连接""" return self._get_driver().verify_connectivity() def get_session(self): """获取 Neo4j 会话""" return self._get_driver().get_session() # 单例实例(延迟初始化,只在第一次使用时创建) neo4j_driver = Neo4jDriverSingleton()