12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430 |
- # Copyright (c) "Neo4j"
- # Neo4j Sweden AB [https://neo4j.com]
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # https://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- from __future__ import annotations
- import asyncio
- import typing as t
- if t.TYPE_CHECKING:
- import ssl
- import typing_extensions as te
- from .._api import (
- T_NotificationDisabledCategory,
- T_NotificationMinimumSeverity,
- )
- from .._api import (
- NotificationMinimumSeverity,
- RoutingControl,
- TelemetryAPI,
- )
- from .._async_compat.util import Util
- from .._conf import (
- Config,
- ConfigurationError,
- SessionConfig,
- TrustAll,
- TrustStore,
- WorkspaceConfig,
- )
- from .._debug import ENABLED as DEBUG_ENABLED
- from .._meta import (
- deprecation_warn,
- experimental_warn,
- preview_warn,
- unclosed_resource_warn,
- )
- from .._work import (
- EagerResult,
- Query,
- unit_of_work,
- )
- from ..addressing import Address
- from ..api import (
- Auth,
- BookmarkManager,
- Bookmarks,
- DRIVER_BOLT,
- DRIVER_NEO4J,
- parse_neo4j_uri,
- parse_routing_context,
- READ_ACCESS,
- SECURITY_TYPE_SECURE,
- SECURITY_TYPE_SELF_SIGNED_CERTIFICATE,
- ServerInfo,
- TRUST_ALL_CERTIFICATES,
- TRUST_SYSTEM_CA_SIGNED_CERTIFICATES,
- URI_SCHEME_BOLT,
- URI_SCHEME_BOLT_SECURE,
- URI_SCHEME_BOLT_SELF_SIGNED_CERTIFICATE,
- URI_SCHEME_NEO4J,
- URI_SCHEME_NEO4J_SECURE,
- URI_SCHEME_NEO4J_SELF_SIGNED_CERTIFICATE,
- WRITE_ACCESS,
- )
- from ..auth_management import (
- AuthManager,
- AuthManagers,
- ClientCertificate,
- ClientCertificateProvider,
- )
- from ..exceptions import Neo4jError
- from .auth_management import _StaticClientCertificateProvider
- from .bookmark_manager import (
- Neo4jBookmarkManager,
- TBmConsumer as _TBmConsumer,
- TBmSupplier as _TBmSupplier,
- )
- from .config import PoolConfig
- from .work import (
- ManagedTransaction,
- Result,
- Session,
- )
- if t.TYPE_CHECKING:
- import ssl
- from enum import Enum
- import typing_extensions as te
- from .._api import T_RoutingControl
- from ..api import _TAuth
- class _DefaultEnum(Enum):
- default = "default"
- _default = _DefaultEnum.default
- else:
- _default = object()
- _T = t.TypeVar("_T")
- class GraphDatabase:
- """Accessor for :class:`neo4j.Driver` construction."""
- if t.TYPE_CHECKING:
- @classmethod
- def driver(
- cls,
- uri: str,
- *,
- auth: _TAuth | AuthManager = ...,
- max_connection_lifetime: float = ...,
- liveness_check_timeout: float | None = ...,
- max_connection_pool_size: int = ...,
- connection_timeout: float = ...,
- trust: (
- te.Literal["TRUST_ALL_CERTIFICATES"]
- | te.Literal["TRUST_SYSTEM_CA_SIGNED_CERTIFICATES"]
- ) = ...,
- resolver: (
- t.Callable[[Address], t.Iterable[Address]]
- | t.Callable[[Address], t.Union[t.Iterable[Address]]]
- ) = ...,
- encrypted: bool = ...,
- trusted_certificates: TrustStore = ...,
- client_certificate: (
- ClientCertificate | ClientCertificateProvider | None
- ) = ...,
- ssl_context: ssl.SSLContext | None = ...,
- user_agent: str = ...,
- keep_alive: bool = ...,
- notifications_min_severity: (
- T_NotificationMinimumSeverity | None
- ) = ...,
- notifications_disabled_categories: (
- t.Iterable[T_NotificationDisabledCategory] | None
- ) = ...,
- notifications_disabled_classifications: (
- t.Iterable[T_NotificationDisabledCategory] | None
- ) = ...,
- warn_notification_severity: (
- T_NotificationMinimumSeverity | None
- ) = ...,
- telemetry_disabled: bool = ...,
- # undocumented/unsupported options
- # they may be changed or removed any time without prior notice
- connection_acquisition_timeout: float = ...,
- max_transaction_retry_time: float = ...,
- initial_retry_delay: float = ...,
- retry_delay_multiplier: float = ...,
- retry_delay_jitter_factor: float = ...,
- database: str | None = ...,
- fetch_size: int = ...,
- impersonated_user: str | None = ...,
- bookmark_manager: (
- BookmarkManager | BookmarkManager | None
- ) = ...,
- ) -> Driver: ...
- else:
- @classmethod
- def driver(
- cls,
- uri: str,
- *,
- auth: _TAuth | AuthManager = None,
- **config,
- ) -> Driver:
- """
- Create a driver.
- :param uri: the connection URI for the driver,
- see :ref:`uri-ref` for available URIs.
- :param auth: the authentication details,
- see :ref:`auth-ref` for available authentication details.
- :param config: driver configuration key-word arguments,
- see :ref:`driver-configuration-ref` for available
- key-word arguments.
- """
- driver_type, security_type, parsed = parse_neo4j_uri(uri)
- if not isinstance(auth, AuthManager):
- auth = AuthManagers.static(auth)
- config["auth"] = auth
- client_certificate = config.get("client_certificate")
- if isinstance(client_certificate, ClientCertificate):
- # using internal class until public factory is GA:
- # AsyncClientCertificateProviders.static
- config["client_certificate"] = (
- _StaticClientCertificateProvider(client_certificate)
- )
- # TODO: 6.0 - remove "trust" config option
- if "trust" in config and config["trust"] not in {
- TRUST_ALL_CERTIFICATES,
- TRUST_SYSTEM_CA_SIGNED_CERTIFICATES,
- }:
- raise ConfigurationError(
- "The config setting `trust` values are {!r}".format(
- [
- TRUST_ALL_CERTIFICATES,
- TRUST_SYSTEM_CA_SIGNED_CERTIFICATES,
- ]
- )
- )
- if "trusted_certificates" in config and not isinstance(
- config["trusted_certificates"], TrustStore
- ):
- raise ConfigurationError(
- 'The config setting "trusted_certificates" must be of '
- "type neo4j.TrustAll, neo4j.TrustCustomCAs, or"
- "neo4j.TrustSystemCAs but was {}".format(
- type(config["trusted_certificates"])
- )
- )
- if security_type in {
- SECURITY_TYPE_SELF_SIGNED_CERTIFICATE,
- SECURITY_TYPE_SECURE,
- } and (
- "encrypted" in config
- or "trust" in config
- or "trusted_certificates" in config
- or "ssl_context" in config
- ):
- # TODO: 6.0 - remove "trust" from error message
- raise ConfigurationError(
- 'The config settings "encrypted", "trust", '
- '"trusted_certificates", and "ssl_context" can only be '
- "used with the URI schemes {!r}. Use the other URI "
- "schemes {!r} for setting encryption settings.".format(
- [
- URI_SCHEME_BOLT,
- URI_SCHEME_NEO4J,
- ],
- [
- URI_SCHEME_BOLT_SELF_SIGNED_CERTIFICATE,
- URI_SCHEME_BOLT_SECURE,
- URI_SCHEME_NEO4J_SELF_SIGNED_CERTIFICATE,
- URI_SCHEME_NEO4J_SECURE,
- ],
- )
- )
- if security_type == SECURITY_TYPE_SECURE:
- config["encrypted"] = True
- elif security_type == SECURITY_TYPE_SELF_SIGNED_CERTIFICATE:
- config["encrypted"] = True
- config["trusted_certificates"] = TrustAll()
- if "notifications_disabled_classifications" in config:
- preview_warn(
- "notifications_disabled_classifications "
- "is a preview feature.",
- stack_level=2,
- )
- if "warn_notification_severity" in config:
- preview_warn(
- "notification warnings are a preview feature.",
- stack_level=2,
- )
- _normalize_notifications_config(config, driver_level=True)
- liveness_check_timeout = config.get("liveness_check_timeout")
- if (
- liveness_check_timeout is not None
- and liveness_check_timeout < 0
- ):
- raise ConfigurationError(
- 'The config setting "liveness_check_timeout" must be '
- "greater than or equal to 0 but was "
- f"{liveness_check_timeout}."
- )
- assert driver_type in {DRIVER_BOLT, DRIVER_NEO4J}
- if driver_type == DRIVER_BOLT:
- if parse_routing_context(parsed.query):
- deprecation_warn(
- 'Creating a direct driver ("bolt://" scheme) with '
- "routing context (URI parameters) is deprecated. They "
- "will be ignored. This will raise an error in a "
- f'future release. Given URI "{uri}"',
- stack_level=2,
- )
- # TODO: 6.0 - raise instead of warning
- # raise ValueError(
- # 'Routing parameters are not supported with scheme '
- # '"bolt". Given URI "{}".'.format(uri)
- # )
- return cls.bolt_driver(parsed.netloc, **config)
- # else driver_type == DRIVER_NEO4J
- routing_context = parse_routing_context(parsed.query)
- return cls.neo4j_driver(
- parsed.netloc, routing_context=routing_context, **config
- )
- @classmethod
- def bookmark_manager(
- cls,
- initial_bookmarks: Bookmarks | t.Iterable[str] | None = None,
- bookmarks_supplier: _TBmSupplier | None = None,
- bookmarks_consumer: _TBmConsumer | None = None,
- ) -> BookmarkManager:
- """
- Create a :class:`.BookmarkManager` with default implementation.
- Basic usage example to configure sessions with the built-in bookmark
- manager implementation so that all work is automatically causally
- chained (i.e., all reads can observe all previous writes even in a
- clustered setup)::
- import neo4j
- # omitting closing the driver for brevity
- driver = neo4j.GraphDatabase.driver(...)
- bookmark_manager = neo4j.GraphDatabase.bookmark_manager(...)
- with driver.session(
- bookmark_manager=bookmark_manager
- ) as session1:
- with driver.session(
- bookmark_manager=bookmark_manager,
- access_mode=neo4j.READ_ACCESS
- ) as session2:
- result1 = session1.run("<WRITE_QUERY>")
- result1.consume()
- # READ_QUERY is guaranteed to see what WRITE_QUERY wrote.
- result2 = session2.run("<READ_QUERY>")
- result2.consume()
- This is a very contrived example, and in this particular case, having
- both queries in the same session has the exact same effect and might
- even be more performant. However, when dealing with sessions spanning
- multiple threads, Tasks, processes, or even hosts, the bookmark
- manager can come in handy as sessions are not safe to be used
- concurrently.
- :param initial_bookmarks:
- The initial set of bookmarks. The returned bookmark manager will
- use this to initialize its internal bookmarks.
- :param bookmarks_supplier:
- Function which will be called every time the default bookmark
- manager's method :meth:`.BookmarkManager.get_bookmarks`
- gets called.
- The function takes no arguments and must return a
- :class:`.Bookmarks` object. The result of ``bookmarks_supplier``
- will then be concatenated with the internal set of bookmarks and
- used to configure the session in creation. It will, however, not
- update the internal set of bookmarks.
- :param bookmarks_consumer:
- Function which will be called whenever the set of bookmarks
- handled by the bookmark manager gets updated with the new
- internal bookmark set. It will receive the new set of bookmarks
- as a :class:`.Bookmarks` object and return :data:`None`.
- :returns: A default implementation of :class:`.BookmarkManager`.
- .. versionadded:: 5.0
- .. versionchanged:: 5.3
- The bookmark manager no longer tracks bookmarks per database.
- This effectively changes the signature of almost all bookmark
- manager related methods:
- * ``initial_bookmarks`` is no longer a mapping from database name
- to bookmarks but plain bookmarks.
- * ``bookmarks_supplier`` no longer receives the database name as
- an argument.
- * ``bookmarks_consumer`` no longer receives the database name as
- an argument.
- .. versionchanged:: 5.8 Stabilized from experimental.
- """
- return Neo4jBookmarkManager(
- initial_bookmarks=initial_bookmarks,
- bookmarks_supplier=bookmarks_supplier,
- bookmarks_consumer=bookmarks_consumer,
- )
- @classmethod
- def bolt_driver(cls, target, **config):
- """
- Create a direct driver.
- Create a driver for direct Bolt server access that uses
- socket I/O and thread-based concurrency.
- """
- from .._exceptions import (
- BoltHandshakeError,
- BoltSecurityError,
- )
- try:
- return BoltDriver.open(target, **config)
- except (BoltHandshakeError, BoltSecurityError) as error:
- from ..exceptions import ServiceUnavailable
- raise ServiceUnavailable(str(error)) from error
- @classmethod
- def neo4j_driver(cls, *targets, routing_context=None, **config):
- """
- Create a routing driver.
- Create a driver for routing-capable Neo4j service access
- that uses socket I/O and thread-based concurrency.
- """
- # TODO: 6.0 - adjust signature to only take one target
- if len(targets) > 1:
- deprecation_warn(
- "Creating a routing driver with multiple targets is "
- "deprecated. The driver only uses the first target anyway. "
- "The method signature will change in a future release.",
- )
- from .._exceptions import (
- BoltHandshakeError,
- BoltSecurityError,
- )
- try:
- return Neo4jDriver.open(
- *targets, routing_context=routing_context, **config
- )
- except (BoltHandshakeError, BoltSecurityError) as error:
- from ..exceptions import ServiceUnavailable
- raise ServiceUnavailable(str(error)) from error
- class _Direct:
- # TODO: 6.0 - those attributes should be private
- default_host = "localhost"
- default_port = 7687
- default_target = ":"
- def __init__(self, address):
- self._address = address
- @property
- def address(self):
- return self._address
- @classmethod
- def parse_target(cls, target):
- """Parse a target string to produce an address."""
- if not target:
- target = cls.default_target
- return Address.parse(
- target,
- default_host=cls.default_host,
- default_port=cls.default_port,
- )
- class _Routing:
- # TODO: 6.0 - those attributes should be private
- default_host = "localhost"
- default_port = 7687
- default_targets = ": :17601 :17687"
- def __init__(self, initial_addresses):
- self._initial_addresses = initial_addresses
- @property
- def initial_addresses(self):
- return self._initial_addresses
- @classmethod
- def parse_targets(cls, *targets):
- """Parse a sequence of target strings to produce an address list."""
- targets = " ".join(targets)
- if not targets:
- targets = cls.default_targets
- return Address.parse_list(
- targets,
- default_host=cls.default_host,
- default_port=cls.default_port,
- )
- class Driver:
- """
- Base class for all driver types.
- Drivers are used as the primary access point to Neo4j.
- """
- #: Connection pool
- _pool: t.Any = None
- #: Flag if the driver has been closed
- _closed = False
- def __init__(self, pool, default_workspace_config):
- assert pool is not None
- assert default_workspace_config is not None
- self._pool = pool
- self._default_workspace_config = default_workspace_config
- self._query_bookmark_manager = GraphDatabase.bookmark_manager()
- def __enter__(self) -> Driver:
- return self
- def __exit__(self, exc_type, exc_value, traceback):
- self.close()
- # Copy globals as function locals to make sure that they are available
- # during Python shutdown when the Pool is destroyed.
- def __del__(
- self,
- _unclosed_resource_warn=unclosed_resource_warn,
- _is_async_code=Util.is_async_code,
- _deprecation_warn=deprecation_warn,
- ):
- if not self._closed:
- _unclosed_resource_warn(self)
- # TODO: 6.0 - remove this
- if _is_async_code:
- return
- if not self._closed:
- _deprecation_warn(
- "Relying on Driver's destructor to close the session "
- "is deprecated. Please make sure to close the session. "
- "Use it as a context (`with` statement) or make sure to "
- "call `.close()` explicitly. Future versions of the "
- "driver will not close drivers automatically."
- )
- self.close()
- def _check_state(self):
- if self._closed:
- # TODO: 6.0 - raise the error
- # raise DriverError("Driver closed")
- deprecation_warn(
- "Using a driver after it has been closed is deprecated. "
- "Future versions of the driver will raise an error.",
- stack_level=3,
- )
- @property
- def encrypted(self) -> bool:
- """Indicate whether the driver was configured to use encryption."""
- return bool(self._pool.pool_config.encrypted)
- if t.TYPE_CHECKING:
- def session(
- self,
- *,
- connection_acquisition_timeout: float = ...,
- max_transaction_retry_time: float = ...,
- database: str | None = ...,
- fetch_size: int = ...,
- impersonated_user: str | None = ...,
- bookmarks: t.Iterable[str] | Bookmarks | None = ...,
- default_access_mode: str = ...,
- bookmark_manager: (
- BookmarkManager | BookmarkManager | None
- ) = ...,
- auth: _TAuth = ...,
- notifications_min_severity: (
- T_NotificationMinimumSeverity | None
- ) = ...,
- notifications_disabled_categories: (
- t.Iterable[T_NotificationDisabledCategory] | None
- ) = ...,
- notifications_disabled_classifications: (
- t.Iterable[T_NotificationDisabledCategory] | None
- ) = ...,
- # undocumented/unsupported options
- # they may be change or removed any time without prior notice
- initial_retry_delay: float = ...,
- retry_delay_multiplier: float = ...,
- retry_delay_jitter_factor: float = ...,
- ) -> Session: ...
- else:
- def session(self, **config) -> Session:
- """
- Create a session.
- See :ref:`session-construction-ref` for details.
- :param config: session configuration key-word arguments,
- see :ref:`session-configuration-ref` for available
- key-word arguments.
- :returns: new :class:`neo4j.Session` object
- """
- if "warn_notification_severity" in config:
- # Would work just fine, but we don't want to introduce yet
- # another undocumented/unsupported config option.
- del config["warn_notification_severity"]
- self._check_state()
- if "notifications_disabled_classifications" in config:
- preview_warn(
- "notifications_disabled_classifications "
- "is a preview feature.",
- stack_level=2,
- )
- session_config = self._read_session_config(config)
- return self._session(session_config)
- def _session(self, session_config) -> Session:
- return Session(self._pool, session_config)
- def _read_session_config(self, config_kwargs):
- config = self._prepare_session_config(config_kwargs)
- return SessionConfig(self._default_workspace_config, config)
- @classmethod
- def _prepare_session_config(cls, config_kwargs):
- _normalize_notifications_config(config_kwargs)
- return config_kwargs
- def close(self) -> None:
- """Shut down, closing any open connections in the pool."""
- # TODO: 6.0 - NOOP if already closed
- # if self._closed:
- # return
- try:
- self._pool.close()
- except asyncio.CancelledError:
- self._closed = True
- raise
- self._closed = True
- # overloads to work around https://github.com/python/mypy/issues/3737
- @t.overload
- def execute_query(
- self,
- query_: te.LiteralString | Query,
- parameters_: dict[str, t.Any] | None = None,
- routing_: T_RoutingControl = RoutingControl.WRITE,
- database_: str | None = None,
- impersonated_user_: str | None = None,
- bookmark_manager_: (
- BookmarkManager | BookmarkManager | None
- ) = ...,
- auth_: _TAuth = None,
- result_transformer_: t.Callable[
- [Result], t.Union[EagerResult]
- ] = ...,
- **kwargs: t.Any,
- ) -> EagerResult: ...
- @t.overload
- def execute_query(
- self,
- query_: te.LiteralString | Query,
- parameters_: dict[str, t.Any] | None = None,
- routing_: T_RoutingControl = RoutingControl.WRITE,
- database_: str | None = None,
- impersonated_user_: str | None = None,
- bookmark_manager_: (
- BookmarkManager | BookmarkManager | None
- ) = ...,
- auth_: _TAuth = None,
- result_transformer_: t.Callable[[Result], t.Union[_T]] = ...,
- **kwargs: t.Any,
- ) -> _T: ...
- def execute_query(
- self,
- query_: te.LiteralString | Query,
- parameters_: dict[str, t.Any] | None = None,
- routing_: T_RoutingControl = RoutingControl.WRITE,
- database_: str | None = None,
- impersonated_user_: str | None = None,
- bookmark_manager_: (
- BookmarkManager
- | BookmarkManager
- | None
- | te.Literal[_DefaultEnum.default]
- ) = _default,
- auth_: _TAuth = None,
- result_transformer_: t.Callable[
- [Result], t.Union[t.Any]
- ] = Result.to_eager_result,
- **kwargs: t.Any,
- ) -> t.Any:
- '''
- Execute a query in a transaction function and return all results.
- This method is a handy wrapper for lower-level driver APIs like
- sessions, transactions, and transaction functions. It is intended
- for simple use cases where there is no need for managing all possible
- options.
- The internal usage of transaction functions provides a retry-mechanism
- for appropriate errors. Furthermore, this means that queries using
- ``CALL {} IN TRANSACTIONS`` or the older ``USING PERIODIC COMMIT``
- will not work (use :meth:`Session.run` for these).
- The method is roughly equivalent to::
- def execute_query(
- query_, parameters_, routing_, database_, impersonated_user_,
- bookmark_manager_, auth_, result_transformer_, **kwargs
- ):
- @unit_of_work(query_.metadata, query_.timeout)
- def work(tx):
- result = tx.run(query_.text, parameters_, **kwargs)
- return result_transformer_(result)
- with driver.session(
- database=database_,
- impersonated_user=impersonated_user_,
- bookmark_manager=bookmark_manager_,
- auth=auth_,
- ) as session:
- if routing_ == RoutingControl.WRITE:
- return session.execute_write(work)
- elif routing_ == RoutingControl.READ:
- return session.execute_read(work)
- Usage example::
- from typing import List
- import neo4j
- def example(driver: neo4j.Driver) -> List[str]:
- """Get the name of all 42 year-olds."""
- records, summary, keys = driver.execute_query(
- "MATCH (p:Person {age: $age}) RETURN p.name",
- {"age": 42},
- routing_=neo4j.RoutingControl.READ, # or just "r"
- database_="neo4j",
- )
- assert keys == ["p.name"] # not needed, just for illustration
- # log_summary(summary) # log some metadata
- return [str(record["p.name"]) for record in records]
- # or: return [str(record[0]) for record in records]
- # or even: return list(map(lambda r: str(r[0]), records))
- Another example::
- import neo4j
- def example(driver: neo4j.Driver) -> int:
- """Call all young people "My dear" and get their count."""
- record = driver.execute_query(
- "MATCH (p:Person) WHERE p.age <= $age "
- "SET p.nickname = 'My dear' "
- "RETURN count(*)",
- # optional routing parameter, as write is default
- # routing_=neo4j.RoutingControl.WRITE, # or just "w",
- database_="neo4j",
- result_transformer_=neo4j.Result.single,
- age=15,
- )
- assert record is not None # for typechecking and illustration
- count = record[0]
- assert isinstance(count, int)
- return count
- :param query_:
- Cypher query to execute.
- Use a :class:`.Query` object to pass a query with additional
- transaction configuration.
- :type query_: typing.LiteralString | Query
- :param parameters_: parameters to use in the query
- :type parameters_: typing.Optional[typing.Dict[str, typing.Any]]
- :param routing_:
- Whether to route the query to a reader (follower/read replica) or
- a writer (leader) in the cluster. Default is to route to a writer.
- :type routing_: RoutingControl
- :param database_:
- Database to execute the query against.
- :data:`None` (default) uses the database configured on the server
- side.
- .. Note::
- It is recommended to always specify the database explicitly
- when possible. This allows the driver to work more efficiently,
- as it will not have to resolve the default database first.
- See also the Session config :ref:`database-ref`.
- :type database_: typing.Optional[str]
- :param impersonated_user_:
- Name of the user to impersonate.
- This means that all query will be executed in the security context
- of the impersonated user. For this, the user for which the
- :class:`Driver` has been created needs to have the appropriate
- permissions.
- See also the Session config :ref:`impersonated-user-ref`.
- :type impersonated_user_: typing.Optional[str]
- :param auth_:
- Authentication information to use for this query.
- By default, the driver configuration is used.
- See also the Session config :ref:`session-auth-ref`.
- :type auth_: typing.Tuple[typing.Any, typing.Any] | Auth | None
- :param result_transformer_:
- A function that gets passed the :class:`neo4j.Result` object
- resulting from the query and converts it to a different type. The
- result of the transformer function is returned by this method.
- .. warning::
- The transformer function must **not** return the
- :class:`neo4j.Result` itself.
- .. warning::
- N.B. the driver might retry the underlying transaction so the
- transformer might get invoked more than once (with different
- :class:`neo4j.Result` objects).
- Therefore, it needs to be idempotent (i.e., have the same
- effect, regardless if called once or many times).
- Example transformer that checks that exactly one record is in the
- result stream, then returns the record and the result summary::
- from typing import Tuple
- import neo4j
- def transformer(
- result: neo4j.Result
- ) -> Tuple[neo4j.Record, neo4j.ResultSummary]:
- record = result.single(strict=True)
- summary = result.consume()
- return record, summary
- Note that methods of :class:`neo4j.Result` that don't take
- mandatory arguments can be used directly as transformer functions.
- For example::
- import neo4j
- def example(driver: neo4j.Driver) -> neo4j.Record::
- record = driver.execute_query(
- "SOME QUERY",
- result_transformer_=neo4j.Result.single
- )
- # is equivalent to:
- def transformer(result: neo4j.Result) -> neo4j.Record:
- return result.single()
- def example(driver: neo4j.Driver) -> neo4j.Record::
- record = driver.execute_query(
- "SOME QUERY",
- result_transformer_=transformer
- )
- :type result_transformer_:
- typing.Callable[[Result], typing.Union[T]]
- :param bookmark_manager_:
- Specify a bookmark manager to use.
- If present, the bookmark manager is used to keep the query causally
- consistent with all work executed using the same bookmark manager.
- Defaults to the driver's :attr:`.execute_query_bookmark_manager`.
- Pass :data:`None` to disable causal consistency.
- :type bookmark_manager_: BookmarkManager | BookmarkManager | None
- :param kwargs: additional keyword parameters. None of these can end
- with a single underscore. This is to avoid collisions with the
- keyword configuration parameters of this method. If you need to
- pass such a parameter, use the ``parameters_`` parameter instead.
- Parameters passed as kwargs take precedence over those passed in
- ``parameters_``.
- :type kwargs: typing.Any
- :returns: the result of the ``result_transformer_``
- :rtype: T
- .. versionadded:: 5.5
- .. versionchanged:: 5.8
- * Added ``auth_`` parameter in preview.
- * Stabilized from experimental.
- .. versionchanged:: 5.14
- Stabilized ``auth_`` parameter from preview.
- .. versionchanged:: 5.15
- The ``query_`` parameter now also accepts a :class:`.Query` object
- instead of only :class:`str`.
- ''' # noqa: E501 example code isn't too long
- self._check_state()
- invalid_kwargs = [
- k for k in kwargs if k[-2:-1] != "_" and k[-1:] == "_"
- ]
- if invalid_kwargs:
- raise ValueError(
- "keyword parameters must not end with a single '_'. "
- f"Found: {invalid_kwargs!r}\n"
- "\nYou either misspelled an existing configuration parameter "
- "or tried to send a query parameter that is reserved. In the "
- "latter case, use the `parameters_` dictionary instead."
- )
- if isinstance(query_, Query):
- timeout = query_.timeout
- metadata = query_.metadata
- query_str = query_.text
- work = unit_of_work(metadata, timeout)(_work)
- else:
- query_str = query_
- work = _work
- parameters = dict(parameters_ or {}, **kwargs)
- if bookmark_manager_ is _default:
- bookmark_manager_ = self._query_bookmark_manager
- assert bookmark_manager_ is not _default
- session_config = self._read_session_config(
- {
- "database": database_,
- "impersonated_user": impersonated_user_,
- "bookmark_manager": bookmark_manager_,
- "auth": auth_,
- }
- )
- session = self._session(session_config)
- with session:
- if routing_ == RoutingControl.WRITE:
- access_mode = WRITE_ACCESS
- elif routing_ == RoutingControl.READ:
- access_mode = READ_ACCESS
- else:
- raise ValueError(
- f"Invalid routing control value: {routing_!r}"
- )
- with session._pipelined_begin:
- return session._run_transaction(
- access_mode,
- TelemetryAPI.DRIVER,
- work,
- (query_str, parameters, result_transformer_),
- {},
- )
- @property
- def execute_query_bookmark_manager(self) -> BookmarkManager:
- """
- The driver's default query bookmark manager.
- This is the default :class:`.BookmarkManager` used by
- :meth:`.execute_query`. This can be used to causally chain
- :meth:`.execute_query` calls and sessions. Example::
- def example(driver: neo4j.Driver) -> None:
- driver.execute_query("<QUERY 1>")
- with driver.session(
- bookmark_manager=driver.execute_query_bookmark_manager
- ) as session:
- # every query inside this session will be causally chained
- # (i.e., can read what was written by <QUERY 1>)
- session.run("<QUERY 2>")
- # subsequent execute_query calls will be causally chained
- # (i.e., can read what was written by <QUERY 2>)
- driver.execute_query("<QUERY 3>")
- .. versionadded:: 5.5
- .. versionchanged:: 5.8
- * Renamed from ``query_bookmark_manager`` to
- ``execute_query_bookmark_manager``.
- * Stabilized from experimental.
- """
- return self._query_bookmark_manager
- if t.TYPE_CHECKING:
- def verify_connectivity(
- self,
- *,
- # all arguments are experimental
- # they may be change or removed any time without prior notice
- session_connection_timeout: float = ...,
- connection_acquisition_timeout: float = ...,
- max_transaction_retry_time: float = ...,
- database: str | None = ...,
- fetch_size: int = ...,
- impersonated_user: str | None = ...,
- bookmarks: t.Iterable[str] | Bookmarks | None = ...,
- default_access_mode: str = ...,
- bookmark_manager: (
- BookmarkManager | BookmarkManager | None
- ) = ...,
- auth: Auth | tuple[str, str] = ...,
- notifications_min_severity: (
- T_NotificationMinimumSeverity | None
- ) = ...,
- notifications_disabled_categories: (
- t.Iterable[T_NotificationDisabledCategory] | None
- ) = ...,
- notifications_disabled_classifications: (
- t.Iterable[T_NotificationDisabledCategory] | None
- ) = ...,
- # undocumented/unsupported options
- initial_retry_delay: float = ...,
- retry_delay_multiplier: float = ...,
- retry_delay_jitter_factor: float = ...,
- ) -> None: ...
- else:
- def verify_connectivity(self, **config) -> None:
- """
- Verify that the driver can establish a connection to the server.
- This verifies if the driver can establish a reading connection to a
- remote server or a cluster. Some data will be exchanged.
- .. note::
- Even if this method raises an exception, the driver still needs
- to be closed via :meth:`close` to free up all resources.
- :param config: accepts the same configuration key-word arguments as
- :meth:`session`.
- .. warning::
- All configuration key-word arguments are experimental.
- They might be changed or removed in any future version
- without prior notice.
- :raises Exception: if the driver cannot connect to the remote.
- Use the exception to further understand the cause of the
- connectivity problem.
- .. versionchanged:: 5.0
- The undocumented return value has been removed.
- If you need information about the remote server, use
- :meth:`get_server_info` instead.
- """
- self._check_state()
- if config:
- experimental_warn(
- "All configuration key-word arguments to "
- "verify_connectivity() are experimental. They might be "
- "changed or removed in any future version without prior "
- "notice."
- )
- session_config = self._read_session_config(config)
- self._get_server_info(session_config)
- if t.TYPE_CHECKING:
- def get_server_info(
- self,
- *,
- # all arguments are experimental
- # they may be change or removed any time without prior notice
- session_connection_timeout: float = ...,
- connection_acquisition_timeout: float = ...,
- max_transaction_retry_time: float = ...,
- database: str | None = ...,
- fetch_size: int = ...,
- impersonated_user: str | None = ...,
- bookmarks: t.Iterable[str] | Bookmarks | None = ...,
- default_access_mode: str = ...,
- bookmark_manager: (
- BookmarkManager | BookmarkManager | None
- ) = ...,
- auth: Auth | tuple[str, str] = ...,
- notifications_min_severity: (
- T_NotificationMinimumSeverity | None
- ) = ...,
- notifications_disabled_categories: (
- t.Iterable[T_NotificationDisabledCategory] | None
- ) = ...,
- notifications_disabled_classifications: (
- t.Iterable[T_NotificationDisabledCategory] | None
- ) = ...,
- # undocumented/unsupported options
- initial_retry_delay: float = ...,
- retry_delay_multiplier: float = ...,
- retry_delay_jitter_factor: float = ...,
- ) -> ServerInfo: ...
- else:
- def get_server_info(self, **config) -> ServerInfo:
- """
- Get information about the connected Neo4j server.
- Try to establish a working read connection to the remote server or
- a member of a cluster and exchange some data. Then return the
- contacted server's information.
- In a cluster, there is no guarantee about which server will be
- contacted.
- .. note::
- Even if this method raises an exception, the driver still needs
- to be closed via :meth:`close` to free up all resources.
- :param config: accepts the same configuration key-word arguments as
- :meth:`session`.
- .. warning::
- All configuration key-word arguments are experimental.
- They might be changed or removed in any future version
- without prior notice.
- :raises Exception: if the driver cannot connect to the remote.
- Use the exception to further understand the cause of the
- connectivity problem.
- .. versionadded:: 5.0
- """
- self._check_state()
- if config:
- experimental_warn(
- "All configuration key-word arguments to "
- "get_server_info() are experimental. They might be "
- "changed or removed in any future version without prior "
- "notice."
- )
- session_config = self._read_session_config(config)
- return self._get_server_info(session_config)
- def supports_multi_db(self) -> bool:
- """
- Check if the server or cluster supports multi-databases.
- :returns: Returns true if the server or cluster the driver connects to
- supports multi-databases, otherwise false.
- .. note::
- Feature support query based solely on the Bolt protocol version.
- The feature might still be disabled on the server side even if this
- function return :data:`True`. It just guarantees that the driver
- won't throw a :exc:`.ConfigurationError` when trying to use this
- driver feature.
- """
- self._check_state()
- session_config = self._read_session_config({})
- with self._session(session_config) as session:
- session._connect(READ_ACCESS)
- assert session._connection
- return session._connection.supports_multiple_databases
- if t.TYPE_CHECKING:
- def verify_authentication(
- self,
- auth: Auth | tuple[str, str] | None = None,
- # all other arguments are experimental
- # they may be change or removed any time without prior notice
- session_connection_timeout: float = ...,
- connection_acquisition_timeout: float = ...,
- max_transaction_retry_time: float = ...,
- database: str | None = ...,
- fetch_size: int = ...,
- impersonated_user: str | None = ...,
- bookmarks: t.Iterable[str] | Bookmarks | None = ...,
- default_access_mode: str = ...,
- bookmark_manager: (
- BookmarkManager | BookmarkManager | None
- ) = ...,
- # undocumented/unsupported options
- initial_retry_delay: float = ...,
- retry_delay_multiplier: float = ...,
- retry_delay_jitter_factor: float = ...,
- ) -> bool: ...
- else:
- def verify_authentication(
- self,
- auth: Auth | tuple[str, str] | None = None,
- **config,
- ) -> bool:
- """
- Verify that the authentication information is valid.
- Like :meth:`.verify_connectivity`, but for checking authentication.
- Try to establish a working read connection to the remote server or
- a member of a cluster and exchange some data. In a cluster, there
- is no guarantee about which server will be contacted. If the data
- exchange is successful and the authentication information is valid,
- :data:`True` is returned. Otherwise, the error will be matched
- against a list of known authentication errors. If the error is on
- that list, :data:`False` is returned indicating that the
- authentication information is invalid. Otherwise, the error is
- re-raised.
- :param auth: authentication information to verify.
- Same as the session config :ref:`auth-ref`.
- :param config: accepts the same configuration key-word arguments as
- :meth:`session`.
- .. warning::
- All configuration key-word arguments (except ``auth``) are
- experimental. They might be changed or removed in any
- future version without prior notice.
- :raises Exception: if the driver cannot connect to the remote.
- Use the exception to further understand the cause of the
- connectivity problem.
- .. versionadded:: 5.8
- .. versionchanged:: 5.14 Stabilized from experimental.
- """
- self._check_state()
- if config:
- experimental_warn(
- "All configuration key-word arguments but auth to "
- "verify_authentication() are experimental. They might be "
- "changed or removed in any future version without prior "
- "notice."
- )
- if "database" not in config:
- config["database"] = "system"
- session_config = self._read_session_config(config)
- session_config = SessionConfig(session_config, {"auth": auth})
- with self._session(session_config) as session:
- try:
- session._verify_authentication()
- except Neo4jError as exc:
- if exc.code in {
- "Neo.ClientError.Security.CredentialsExpired",
- "Neo.ClientError.Security.Forbidden",
- "Neo.ClientError.Security.TokenExpired",
- "Neo.ClientError.Security.Unauthorized",
- }:
- return False
- raise
- return True
- def supports_session_auth(self) -> bool:
- """
- Check if the remote supports connection re-authentication.
- :returns: Returns true if the server or cluster the driver connects to
- supports re-authentication of existing connections, otherwise
- false.
- .. note::
- Feature support query based solely on the Bolt protocol version.
- The feature might still be disabled on the server side even if this
- function return :data:`True`. It just guarantees that the driver
- won't throw a :exc:`.ConfigurationError` when trying to use this
- driver feature.
- .. versionadded:: 5.8
- """
- self._check_state()
- session_config = self._read_session_config({})
- with self._session(session_config) as session:
- session._connect(READ_ACCESS)
- assert session._connection
- return session._connection.supports_re_auth
- def _get_server_info(self, session_config) -> ServerInfo:
- with self._session(session_config) as session:
- return session._get_server_info()
- def _work(
- tx: ManagedTransaction,
- query: te.LiteralString,
- parameters: dict[str, t.Any],
- transformer: t.Callable[[Result], t.Union[_T]],
- ) -> _T:
- res = tx.run(query, parameters)
- return transformer(res)
- class BoltDriver(_Direct, Driver):
- """
- :class:`.BoltDriver` is instantiated for ``bolt`` URIs.
- It addresses a single database machine. This may be a standalone server or
- could be a specific member of a cluster.
- Connections established by a :class:`.BoltDriver` are always made to
- the exact host and port detailed in the URI.
- This class is not supposed to be instantiated externally. Use
- :meth:`GraphDatabase.driver` instead.
- """
- @classmethod
- def open(cls, target, **config):
- from .io import BoltPool
- address = cls.parse_target(target)
- pool_config, default_workspace_config = Config.consume_chain(
- config, PoolConfig, WorkspaceConfig
- )
- pool = BoltPool.open(
- address,
- pool_config=pool_config,
- workspace_config=default_workspace_config,
- )
- return cls(pool, default_workspace_config)
- def __init__(self, pool, default_workspace_config):
- _Direct.__init__(self, pool.address)
- Driver.__init__(self, pool, default_workspace_config)
- self._default_workspace_config = default_workspace_config
- class Neo4jDriver(_Routing, Driver):
- """
- :class:`.Neo4jDriver` is instantiated for ``neo4j`` URIs.
- The routing behaviour works in tandem with Neo4j's `Causal Clustering
- <https://neo4j.com/docs/operations-manual/current/clustering/>`_
- feature by directing read and write behaviour to appropriate
- cluster members.
- This class is not supposed to be instantiated externally. Use
- :meth:`GraphDatabase.driver` instead.
- """
- @classmethod
- def open(cls, *targets, routing_context=None, **config):
- from .io import Neo4jPool
- addresses = cls.parse_targets(*targets)
- pool_config, default_workspace_config = Config.consume_chain(
- config, PoolConfig, WorkspaceConfig
- )
- pool = Neo4jPool.open(
- *addresses,
- routing_context=routing_context,
- pool_config=pool_config,
- workspace_config=default_workspace_config,
- )
- return cls(pool, default_workspace_config)
- def __init__(self, pool, default_workspace_config):
- _Routing.__init__(self, [pool.address])
- Driver.__init__(self, pool, default_workspace_config)
- def _normalize_notifications_config(config_kwargs, *, driver_level=False):
- list_config_keys = (
- "notifications_disabled_categories",
- "notifications_disabled_classifications",
- )
- for key in list_config_keys:
- value = config_kwargs.get(key)
- if value is not None:
- config_kwargs[key] = [getattr(e, "value", e) for e in value]
- disabled_categories = config_kwargs.pop(
- "notifications_disabled_categories", None
- )
- if disabled_categories is not None:
- disabled_classifications = config_kwargs.get(
- "notifications_disabled_classifications"
- )
- if disabled_classifications is None:
- disabled_classifications = disabled_categories
- else:
- disabled_classifications = list(
- {*disabled_categories, *disabled_classifications}
- )
- config_kwargs["notifications_disabled_classifications"] = (
- disabled_classifications
- )
- single_config_keys = (
- "notifications_min_severity",
- "warn_notification_severity",
- )
- for key in single_config_keys:
- value = config_kwargs.get(key)
- if value is not None:
- config_kwargs[key] = getattr(value, "value", value)
- value = config_kwargs.get("warn_notification_severity")
- if value not in {*NotificationMinimumSeverity, None}:
- raise ValueError(
- f"Invalid value for configuration "
- f"warn_notification_severity: {value}. Should be None, a "
- f"NotificationMinimumSeverity, or a string representing a "
- f"NotificationMinimumSeverity."
- )
- if driver_level:
- if value is None:
- if DEBUG_ENABLED:
- config_kwargs["warn_notification_severity"] = (
- NotificationMinimumSeverity.INFORMATION
- )
- elif value == NotificationMinimumSeverity.OFF:
- config_kwargs["warn_notification_severity"] = None
|