| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- from neo4j import GraphDatabase
- from neo4j.exceptions import ServiceUnavailable
- import os
- class Neo4jDriver:
- def __init__(self, uri=None, user=None, password=None, encrypted=None):
- """
- 初始化Neo4j驱动
-
- Args:
- uri: Neo4j URI(可选,如果不提供则从Flask配置获取)
- user: Neo4j用户名(可选,如果不提供则从Flask配置获取)
- password: Neo4j密码(可选,如果不提供则从Flask配置获取)
- encrypted: 是否加密连接(可选,如果不提供则从Flask配置获取)
-
- Raises:
- ValueError: 如果配置参数缺失
- """
- self._driver = None
-
- # 优先使用传入的参数,否则从Flask配置获取
- if uri is not None:
- self.uri = uri
- else:
- self.uri = self._get_config_value('NEO4J_URI')
- if not self.uri:
- raise ValueError("Neo4j URI配置缺失,请检查app/config/config.py中的NEO4J_URI配置")
-
- if user is not None:
- self.user = user
- else:
- self.user = self._get_config_value('NEO4J_USER')
- if not self.user:
- raise ValueError("Neo4j用户配置缺失,请检查app/config/config.py中的NEO4J_USER配置")
-
- if password is not None:
- self.password = password
- else:
- self.password = self._get_config_value('NEO4J_PASSWORD')
- if self.password is None:
- raise ValueError("Neo4j密码配置缺失,请检查app/config/config.py中的NEO4J_PASSWORD配置")
-
- if encrypted is not None:
- self.encrypted = encrypted
- else:
- encrypted_value = self._get_config_value('NEO4J_ENCRYPTED')
- if encrypted_value is None:
- # 如果配置中没有,默认为False
- self.encrypted = False
- elif isinstance(encrypted_value, bool):
- self.encrypted = encrypted_value
- elif isinstance(encrypted_value, str):
- self.encrypted = encrypted_value.lower() == 'true'
- else:
- self.encrypted = False
-
- def _get_config_value(self, key):
- """
- 获取配置值,优先从Flask配置获取,否则从环境变量获取
-
- Args:
- key: 配置键名
-
- Returns:
- 配置值,如果不存在则返回None
-
- Raises:
- RuntimeError: 如果不在Flask环境中且环境变量也不存在
- """
- try:
- # 优先从Flask配置获取(这是统一配置源)
- from flask import current_app
- if current_app and hasattr(current_app, 'config'):
- value = current_app.config.get(key)
- if value is not None:
- return value
- except (ImportError, RuntimeError):
- # 不在Flask环境中或Flask应用上下文外,尝试从环境变量获取
- pass
-
- # 如果Flask配置中没有,尝试从环境变量获取(用于非Flask环境)
- return os.environ.get(key)
-
- def connect(self):
- if not self._driver:
- self._driver = GraphDatabase.driver(
- self.uri,
- auth=(self.user, self.password),
- encrypted=self.encrypted
- )
- return self._driver
-
- def close(self):
- if self._driver:
- self._driver.close()
- self._driver = None
-
- def verify_connectivity(self):
- try:
- self.connect().verify_connectivity()
- return True
- except ServiceUnavailable:
- return False
-
- def get_session(self):
- """获取 Neo4j 会话"""
- return self.connect().session()
- class Neo4jDriverSingleton:
- """
- Neo4j驱动单例包装类
- 延迟初始化,避免在模块导入时Flask应用上下文未初始化的问题
- """
- def __init__(self):
- self._driver = None
-
- def _get_driver(self):
- """获取或创建Neo4j驱动实例(延迟初始化)"""
- if self._driver is None:
- self._driver = Neo4jDriver()
- return self._driver
-
- def connect(self):
- """连接到Neo4j数据库"""
- return self._get_driver().connect()
-
- def close(self):
- """关闭Neo4j连接"""
- if self._driver:
- self._driver.close()
- self._driver = None
-
- def verify_connectivity(self):
- """验证Neo4j连接"""
- return self._get_driver().verify_connectivity()
-
- def get_session(self):
- """获取 Neo4j 会话"""
- return self._get_driver().get_session()
- # 单例实例(延迟初始化,只在第一次使用时创建)
- neo4j_driver = Neo4jDriverSingleton()
|