123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431 |
- # Copyright (c) "Neo4j"
- # Neo4j Sweden AB [https://neo4j.com]
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # https://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- from __future__ import annotations
- import asyncio
- import typing as t
- if t.TYPE_CHECKING:
- import ssl
- import typing_extensions as te
- from .._api import (
- T_NotificationDisabledCategory,
- T_NotificationMinimumSeverity,
- )
- from .._api import (
- NotificationMinimumSeverity,
- RoutingControl,
- TelemetryAPI,
- )
- from .._async_compat.util import AsyncUtil
- from .._conf import (
- Config,
- ConfigurationError,
- SessionConfig,
- TrustAll,
- TrustStore,
- WorkspaceConfig,
- )
- from .._debug import ENABLED as DEBUG_ENABLED
- from .._meta import (
- deprecation_warn,
- experimental_warn,
- preview_warn,
- unclosed_resource_warn,
- )
- from .._work import (
- EagerResult,
- Query,
- unit_of_work,
- )
- from ..addressing import Address
- from ..api import (
- AsyncBookmarkManager,
- Auth,
- BookmarkManager,
- Bookmarks,
- DRIVER_BOLT,
- DRIVER_NEO4J,
- parse_neo4j_uri,
- parse_routing_context,
- READ_ACCESS,
- SECURITY_TYPE_SECURE,
- SECURITY_TYPE_SELF_SIGNED_CERTIFICATE,
- ServerInfo,
- TRUST_ALL_CERTIFICATES,
- TRUST_SYSTEM_CA_SIGNED_CERTIFICATES,
- URI_SCHEME_BOLT,
- URI_SCHEME_BOLT_SECURE,
- URI_SCHEME_BOLT_SELF_SIGNED_CERTIFICATE,
- URI_SCHEME_NEO4J,
- URI_SCHEME_NEO4J_SECURE,
- URI_SCHEME_NEO4J_SELF_SIGNED_CERTIFICATE,
- WRITE_ACCESS,
- )
- from ..auth_management import (
- AsyncAuthManager,
- AsyncAuthManagers,
- AsyncClientCertificateProvider,
- ClientCertificate,
- )
- from ..exceptions import Neo4jError
- from .auth_management import _AsyncStaticClientCertificateProvider
- from .bookmark_manager import (
- AsyncNeo4jBookmarkManager,
- TBmConsumer as _TBmConsumer,
- TBmSupplier as _TBmSupplier,
- )
- from .config import AsyncPoolConfig
- from .work import (
- AsyncManagedTransaction,
- AsyncResult,
- AsyncSession,
- )
- if t.TYPE_CHECKING:
- import ssl
- from enum import Enum
- import typing_extensions as te
- from .._api import T_RoutingControl
- from ..api import _TAuth
- class _DefaultEnum(Enum):
- default = "default"
- _default = _DefaultEnum.default
- else:
- _default = object()
- _T = t.TypeVar("_T")
- class AsyncGraphDatabase:
- """Accessor for :class:`neo4j.AsyncDriver` construction."""
- if t.TYPE_CHECKING:
- @classmethod
- def driver(
- cls,
- uri: str,
- *,
- auth: _TAuth | AsyncAuthManager = ...,
- max_connection_lifetime: float = ...,
- liveness_check_timeout: float | None = ...,
- max_connection_pool_size: int = ...,
- connection_timeout: float = ...,
- trust: (
- te.Literal["TRUST_ALL_CERTIFICATES"]
- | te.Literal["TRUST_SYSTEM_CA_SIGNED_CERTIFICATES"]
- ) = ...,
- resolver: (
- t.Callable[[Address], t.Iterable[Address]]
- | t.Callable[[Address], t.Awaitable[t.Iterable[Address]]]
- ) = ...,
- encrypted: bool = ...,
- trusted_certificates: TrustStore = ...,
- client_certificate: (
- ClientCertificate | AsyncClientCertificateProvider | None
- ) = ...,
- ssl_context: ssl.SSLContext | None = ...,
- user_agent: str = ...,
- keep_alive: bool = ...,
- notifications_min_severity: (
- T_NotificationMinimumSeverity | None
- ) = ...,
- notifications_disabled_categories: (
- t.Iterable[T_NotificationDisabledCategory] | None
- ) = ...,
- notifications_disabled_classifications: (
- t.Iterable[T_NotificationDisabledCategory] | None
- ) = ...,
- warn_notification_severity: (
- T_NotificationMinimumSeverity | None
- ) = ...,
- telemetry_disabled: bool = ...,
- # undocumented/unsupported options
- # they may be changed or removed any time without prior notice
- connection_acquisition_timeout: float = ...,
- max_transaction_retry_time: float = ...,
- initial_retry_delay: float = ...,
- retry_delay_multiplier: float = ...,
- retry_delay_jitter_factor: float = ...,
- database: str | None = ...,
- fetch_size: int = ...,
- impersonated_user: str | None = ...,
- bookmark_manager: (
- AsyncBookmarkManager | BookmarkManager | None
- ) = ...,
- ) -> AsyncDriver: ...
- else:
- @classmethod
- def driver(
- cls,
- uri: str,
- *,
- auth: _TAuth | AsyncAuthManager = None,
- **config,
- ) -> AsyncDriver:
- """
- Create a driver.
- :param uri: the connection URI for the driver,
- see :ref:`async-uri-ref` for available URIs.
- :param auth: the authentication details,
- see :ref:`auth-ref` for available authentication details.
- :param config: driver configuration key-word arguments,
- see :ref:`async-driver-configuration-ref` for available
- key-word arguments.
- """
- driver_type, security_type, parsed = parse_neo4j_uri(uri)
- if not isinstance(auth, AsyncAuthManager):
- auth = AsyncAuthManagers.static(auth)
- config["auth"] = auth
- client_certificate = config.get("client_certificate")
- if isinstance(client_certificate, ClientCertificate):
- # using internal class until public factory is GA:
- # AsyncClientCertificateProviders.static
- config["client_certificate"] = (
- _AsyncStaticClientCertificateProvider(client_certificate)
- )
- # TODO: 6.0 - remove "trust" config option
- if "trust" in config and config["trust"] not in {
- TRUST_ALL_CERTIFICATES,
- TRUST_SYSTEM_CA_SIGNED_CERTIFICATES,
- }:
- raise ConfigurationError(
- "The config setting `trust` values are {!r}".format(
- [
- TRUST_ALL_CERTIFICATES,
- TRUST_SYSTEM_CA_SIGNED_CERTIFICATES,
- ]
- )
- )
- if "trusted_certificates" in config and not isinstance(
- config["trusted_certificates"], TrustStore
- ):
- raise ConfigurationError(
- 'The config setting "trusted_certificates" must be of '
- "type neo4j.TrustAll, neo4j.TrustCustomCAs, or"
- "neo4j.TrustSystemCAs but was {}".format(
- type(config["trusted_certificates"])
- )
- )
- if security_type in {
- SECURITY_TYPE_SELF_SIGNED_CERTIFICATE,
- SECURITY_TYPE_SECURE,
- } and (
- "encrypted" in config
- or "trust" in config
- or "trusted_certificates" in config
- or "ssl_context" in config
- ):
- # TODO: 6.0 - remove "trust" from error message
- raise ConfigurationError(
- 'The config settings "encrypted", "trust", '
- '"trusted_certificates", and "ssl_context" can only be '
- "used with the URI schemes {!r}. Use the other URI "
- "schemes {!r} for setting encryption settings.".format(
- [
- URI_SCHEME_BOLT,
- URI_SCHEME_NEO4J,
- ],
- [
- URI_SCHEME_BOLT_SELF_SIGNED_CERTIFICATE,
- URI_SCHEME_BOLT_SECURE,
- URI_SCHEME_NEO4J_SELF_SIGNED_CERTIFICATE,
- URI_SCHEME_NEO4J_SECURE,
- ],
- )
- )
- if security_type == SECURITY_TYPE_SECURE:
- config["encrypted"] = True
- elif security_type == SECURITY_TYPE_SELF_SIGNED_CERTIFICATE:
- config["encrypted"] = True
- config["trusted_certificates"] = TrustAll()
- if "notifications_disabled_classifications" in config:
- preview_warn(
- "notifications_disabled_classifications "
- "is a preview feature.",
- stack_level=2,
- )
- if "warn_notification_severity" in config:
- preview_warn(
- "notification warnings are a preview feature.",
- stack_level=2,
- )
- _normalize_notifications_config(config, driver_level=True)
- liveness_check_timeout = config.get("liveness_check_timeout")
- if (
- liveness_check_timeout is not None
- and liveness_check_timeout < 0
- ):
- raise ConfigurationError(
- 'The config setting "liveness_check_timeout" must be '
- "greater than or equal to 0 but was "
- f"{liveness_check_timeout}."
- )
- assert driver_type in {DRIVER_BOLT, DRIVER_NEO4J}
- if driver_type == DRIVER_BOLT:
- if parse_routing_context(parsed.query):
- deprecation_warn(
- 'Creating a direct driver ("bolt://" scheme) with '
- "routing context (URI parameters) is deprecated. They "
- "will be ignored. This will raise an error in a "
- f'future release. Given URI "{uri}"',
- stack_level=2,
- )
- # TODO: 6.0 - raise instead of warning
- # raise ValueError(
- # 'Routing parameters are not supported with scheme '
- # '"bolt". Given URI "{}".'.format(uri)
- # )
- return cls.bolt_driver(parsed.netloc, **config)
- # else driver_type == DRIVER_NEO4J
- routing_context = parse_routing_context(parsed.query)
- return cls.neo4j_driver(
- parsed.netloc, routing_context=routing_context, **config
- )
- @classmethod
- def bookmark_manager(
- cls,
- initial_bookmarks: Bookmarks | t.Iterable[str] | None = None,
- bookmarks_supplier: _TBmSupplier | None = None,
- bookmarks_consumer: _TBmConsumer | None = None,
- ) -> AsyncBookmarkManager:
- """
- Create a :class:`.AsyncBookmarkManager` with default implementation.
- Basic usage example to configure sessions with the built-in bookmark
- manager implementation so that all work is automatically causally
- chained (i.e., all reads can observe all previous writes even in a
- clustered setup)::
- import neo4j
- # omitting closing the driver for brevity
- driver = neo4j.AsyncGraphDatabase.driver(...)
- bookmark_manager = neo4j.AsyncGraphDatabase.bookmark_manager(...)
- async with driver.session(
- bookmark_manager=bookmark_manager
- ) as session1:
- async with driver.session(
- bookmark_manager=bookmark_manager,
- access_mode=neo4j.READ_ACCESS
- ) as session2:
- result1 = await session1.run("<WRITE_QUERY>")
- await result1.consume()
- # READ_QUERY is guaranteed to see what WRITE_QUERY wrote.
- result2 = await session2.run("<READ_QUERY>")
- await result2.consume()
- This is a very contrived example, and in this particular case, having
- both queries in the same session has the exact same effect and might
- even be more performant. However, when dealing with sessions spanning
- multiple threads, async Tasks, processes, or even hosts, the bookmark
- manager can come in handy as sessions are not safe to be used
- concurrently.
- :param initial_bookmarks:
- The initial set of bookmarks. The returned bookmark manager will
- use this to initialize its internal bookmarks.
- :param bookmarks_supplier:
- Function which will be called every time the default bookmark
- manager's method :meth:`.AsyncBookmarkManager.get_bookmarks`
- gets called.
- The function takes no arguments and must return a
- :class:`.Bookmarks` object. The result of ``bookmarks_supplier``
- will then be concatenated with the internal set of bookmarks and
- used to configure the session in creation. It will, however, not
- update the internal set of bookmarks.
- :param bookmarks_consumer:
- Function which will be called whenever the set of bookmarks
- handled by the bookmark manager gets updated with the new
- internal bookmark set. It will receive the new set of bookmarks
- as a :class:`.Bookmarks` object and return :data:`None`.
- :returns: A default implementation of :class:`.AsyncBookmarkManager`.
- .. versionadded:: 5.0
- .. versionchanged:: 5.3
- The bookmark manager no longer tracks bookmarks per database.
- This effectively changes the signature of almost all bookmark
- manager related methods:
- * ``initial_bookmarks`` is no longer a mapping from database name
- to bookmarks but plain bookmarks.
- * ``bookmarks_supplier`` no longer receives the database name as
- an argument.
- * ``bookmarks_consumer`` no longer receives the database name as
- an argument.
- .. versionchanged:: 5.8 Stabilized from experimental.
- """
- return AsyncNeo4jBookmarkManager(
- initial_bookmarks=initial_bookmarks,
- bookmarks_supplier=bookmarks_supplier,
- bookmarks_consumer=bookmarks_consumer,
- )
- @classmethod
- def bolt_driver(cls, target, **config):
- """
- Create a direct driver.
- Create a driver for direct Bolt server access that uses
- socket I/O and thread-based concurrency.
- """
- from .._exceptions import (
- BoltHandshakeError,
- BoltSecurityError,
- )
- try:
- return AsyncBoltDriver.open(target, **config)
- except (BoltHandshakeError, BoltSecurityError) as error:
- from ..exceptions import ServiceUnavailable
- raise ServiceUnavailable(str(error)) from error
- @classmethod
- def neo4j_driver(cls, *targets, routing_context=None, **config):
- """
- Create a routing driver.
- Create a driver for routing-capable Neo4j service access
- that uses socket I/O and thread-based concurrency.
- """
- # TODO: 6.0 - adjust signature to only take one target
- if len(targets) > 1:
- deprecation_warn(
- "Creating a routing driver with multiple targets is "
- "deprecated. The driver only uses the first target anyway. "
- "The method signature will change in a future release.",
- )
- from .._exceptions import (
- BoltHandshakeError,
- BoltSecurityError,
- )
- try:
- return AsyncNeo4jDriver.open(
- *targets, routing_context=routing_context, **config
- )
- except (BoltHandshakeError, BoltSecurityError) as error:
- from ..exceptions import ServiceUnavailable
- raise ServiceUnavailable(str(error)) from error
- class _Direct:
- # TODO: 6.0 - those attributes should be private
- default_host = "localhost"
- default_port = 7687
- default_target = ":"
- def __init__(self, address):
- self._address = address
- @property
- def address(self):
- return self._address
- @classmethod
- def parse_target(cls, target):
- """Parse a target string to produce an address."""
- if not target:
- target = cls.default_target
- return Address.parse(
- target,
- default_host=cls.default_host,
- default_port=cls.default_port,
- )
- class _Routing:
- # TODO: 6.0 - those attributes should be private
- default_host = "localhost"
- default_port = 7687
- default_targets = ": :17601 :17687"
- def __init__(self, initial_addresses):
- self._initial_addresses = initial_addresses
- @property
- def initial_addresses(self):
- return self._initial_addresses
- @classmethod
- def parse_targets(cls, *targets):
- """Parse a sequence of target strings to produce an address list."""
- targets = " ".join(targets)
- if not targets:
- targets = cls.default_targets
- return Address.parse_list(
- targets,
- default_host=cls.default_host,
- default_port=cls.default_port,
- )
- class AsyncDriver:
- """
- Base class for all driver types.
- Drivers are used as the primary access point to Neo4j.
- """
- #: Connection pool
- _pool: t.Any = None
- #: Flag if the driver has been closed
- _closed = False
- def __init__(self, pool, default_workspace_config):
- assert pool is not None
- assert default_workspace_config is not None
- self._pool = pool
- self._default_workspace_config = default_workspace_config
- self._query_bookmark_manager = AsyncGraphDatabase.bookmark_manager()
- async def __aenter__(self) -> AsyncDriver:
- return self
- async def __aexit__(self, exc_type, exc_value, traceback):
- await self.close()
- # Copy globals as function locals to make sure that they are available
- # during Python shutdown when the Pool is destroyed.
- def __del__(
- self,
- _unclosed_resource_warn=unclosed_resource_warn,
- _is_async_code=AsyncUtil.is_async_code,
- _deprecation_warn=deprecation_warn,
- ):
- if not self._closed:
- _unclosed_resource_warn(self)
- # TODO: 6.0 - remove this
- if _is_async_code:
- return
- if not self._closed:
- _deprecation_warn(
- "Relying on AsyncDriver's destructor to close the session "
- "is deprecated. Please make sure to close the session. "
- "Use it as a context (`with` statement) or make sure to "
- "call `.close()` explicitly. Future versions of the "
- "driver will not close drivers automatically."
- )
- self.close()
- def _check_state(self):
- if self._closed:
- # TODO: 6.0 - raise the error
- # raise DriverError("Driver closed")
- deprecation_warn(
- "Using a driver after it has been closed is deprecated. "
- "Future versions of the driver will raise an error.",
- stack_level=3,
- )
- @property
- def encrypted(self) -> bool:
- """Indicate whether the driver was configured to use encryption."""
- return bool(self._pool.pool_config.encrypted)
- if t.TYPE_CHECKING:
- def session(
- self,
- *,
- connection_acquisition_timeout: float = ...,
- max_transaction_retry_time: float = ...,
- database: str | None = ...,
- fetch_size: int = ...,
- impersonated_user: str | None = ...,
- bookmarks: t.Iterable[str] | Bookmarks | None = ...,
- default_access_mode: str = ...,
- bookmark_manager: (
- AsyncBookmarkManager | BookmarkManager | None
- ) = ...,
- auth: _TAuth = ...,
- notifications_min_severity: (
- T_NotificationMinimumSeverity | None
- ) = ...,
- notifications_disabled_categories: (
- t.Iterable[T_NotificationDisabledCategory] | None
- ) = ...,
- notifications_disabled_classifications: (
- t.Iterable[T_NotificationDisabledCategory] | None
- ) = ...,
- # undocumented/unsupported options
- # they may be change or removed any time without prior notice
- initial_retry_delay: float = ...,
- retry_delay_multiplier: float = ...,
- retry_delay_jitter_factor: float = ...,
- ) -> AsyncSession: ...
- else:
- def session(self, **config) -> AsyncSession:
- """
- Create a session.
- See :ref:`async-session-construction-ref` for details.
- :param config: session configuration key-word arguments,
- see :ref:`async-session-configuration-ref` for available
- key-word arguments.
- :returns: new :class:`neo4j.AsyncSession` object
- """
- if "warn_notification_severity" in config:
- # Would work just fine, but we don't want to introduce yet
- # another undocumented/unsupported config option.
- del config["warn_notification_severity"]
- self._check_state()
- if "notifications_disabled_classifications" in config:
- preview_warn(
- "notifications_disabled_classifications "
- "is a preview feature.",
- stack_level=2,
- )
- session_config = self._read_session_config(config)
- return self._session(session_config)
- def _session(self, session_config) -> AsyncSession:
- return AsyncSession(self._pool, session_config)
- def _read_session_config(self, config_kwargs):
- config = self._prepare_session_config(config_kwargs)
- return SessionConfig(self._default_workspace_config, config)
- @classmethod
- def _prepare_session_config(cls, config_kwargs):
- _normalize_notifications_config(config_kwargs)
- return config_kwargs
- async def close(self) -> None:
- """Shut down, closing any open connections in the pool."""
- # TODO: 6.0 - NOOP if already closed
- # if self._closed:
- # return
- try:
- await self._pool.close()
- except asyncio.CancelledError:
- self._closed = True
- raise
- self._closed = True
- # overloads to work around https://github.com/python/mypy/issues/3737
- @t.overload
- async def execute_query(
- self,
- query_: te.LiteralString | Query,
- parameters_: dict[str, t.Any] | None = None,
- routing_: T_RoutingControl = RoutingControl.WRITE,
- database_: str | None = None,
- impersonated_user_: str | None = None,
- bookmark_manager_: (
- AsyncBookmarkManager | BookmarkManager | None
- ) = ...,
- auth_: _TAuth = None,
- result_transformer_: t.Callable[
- [AsyncResult], t.Awaitable[EagerResult]
- ] = ...,
- **kwargs: t.Any,
- ) -> EagerResult: ...
- @t.overload
- async def execute_query(
- self,
- query_: te.LiteralString | Query,
- parameters_: dict[str, t.Any] | None = None,
- routing_: T_RoutingControl = RoutingControl.WRITE,
- database_: str | None = None,
- impersonated_user_: str | None = None,
- bookmark_manager_: (
- AsyncBookmarkManager | BookmarkManager | None
- ) = ...,
- auth_: _TAuth = None,
- result_transformer_: t.Callable[[AsyncResult], t.Awaitable[_T]] = ...,
- **kwargs: t.Any,
- ) -> _T: ...
- async def execute_query(
- self,
- query_: te.LiteralString | Query,
- parameters_: dict[str, t.Any] | None = None,
- routing_: T_RoutingControl = RoutingControl.WRITE,
- database_: str | None = None,
- impersonated_user_: str | None = None,
- bookmark_manager_: (
- AsyncBookmarkManager
- | BookmarkManager
- | None
- | te.Literal[_DefaultEnum.default]
- ) = _default,
- auth_: _TAuth = None,
- result_transformer_: t.Callable[
- [AsyncResult], t.Awaitable[t.Any]
- ] = AsyncResult.to_eager_result,
- **kwargs: t.Any,
- ) -> t.Any:
- '''
- Execute a query in a transaction function and return all results.
- This method is a handy wrapper for lower-level driver APIs like
- sessions, transactions, and transaction functions. It is intended
- for simple use cases where there is no need for managing all possible
- options.
- The internal usage of transaction functions provides a retry-mechanism
- for appropriate errors. Furthermore, this means that queries using
- ``CALL {} IN TRANSACTIONS`` or the older ``USING PERIODIC COMMIT``
- will not work (use :meth:`AsyncSession.run` for these).
- The method is roughly equivalent to::
- async def execute_query(
- query_, parameters_, routing_, database_, impersonated_user_,
- bookmark_manager_, auth_, result_transformer_, **kwargs
- ):
- @unit_of_work(query_.metadata, query_.timeout)
- async def work(tx):
- result = await tx.run(query_.text, parameters_, **kwargs)
- return await result_transformer_(result)
- async with driver.session(
- database=database_,
- impersonated_user=impersonated_user_,
- bookmark_manager=bookmark_manager_,
- auth=auth_,
- ) as session:
- if routing_ == RoutingControl.WRITE:
- return await session.execute_write(work)
- elif routing_ == RoutingControl.READ:
- return await session.execute_read(work)
- Usage example::
- from typing import List
- import neo4j
- async def example(driver: neo4j.AsyncDriver) -> List[str]:
- """Get the name of all 42 year-olds."""
- records, summary, keys = await driver.execute_query(
- "MATCH (p:Person {age: $age}) RETURN p.name",
- {"age": 42},
- routing_=neo4j.RoutingControl.READ, # or just "r"
- database_="neo4j",
- )
- assert keys == ["p.name"] # not needed, just for illustration
- # log_summary(summary) # log some metadata
- return [str(record["p.name"]) for record in records]
- # or: return [str(record[0]) for record in records]
- # or even: return list(map(lambda r: str(r[0]), records))
- Another example::
- import neo4j
- async def example(driver: neo4j.AsyncDriver) -> int:
- """Call all young people "My dear" and get their count."""
- record = await driver.execute_query(
- "MATCH (p:Person) WHERE p.age <= $age "
- "SET p.nickname = 'My dear' "
- "RETURN count(*)",
- # optional routing parameter, as write is default
- # routing_=neo4j.RoutingControl.WRITE, # or just "w",
- database_="neo4j",
- result_transformer_=neo4j.AsyncResult.single,
- age=15,
- )
- assert record is not None # for typechecking and illustration
- count = record[0]
- assert isinstance(count, int)
- return count
- :param query_:
- Cypher query to execute.
- Use a :class:`.Query` object to pass a query with additional
- transaction configuration.
- :type query_: typing.LiteralString | Query
- :param parameters_: parameters to use in the query
- :type parameters_: typing.Optional[typing.Dict[str, typing.Any]]
- :param routing_:
- Whether to route the query to a reader (follower/read replica) or
- a writer (leader) in the cluster. Default is to route to a writer.
- :type routing_: RoutingControl
- :param database_:
- Database to execute the query against.
- :data:`None` (default) uses the database configured on the server
- side.
- .. Note::
- It is recommended to always specify the database explicitly
- when possible. This allows the driver to work more efficiently,
- as it will not have to resolve the default database first.
- See also the Session config :ref:`database-ref`.
- :type database_: typing.Optional[str]
- :param impersonated_user_:
- Name of the user to impersonate.
- This means that all query will be executed in the security context
- of the impersonated user. For this, the user for which the
- :class:`Driver` has been created needs to have the appropriate
- permissions.
- See also the Session config :ref:`impersonated-user-ref`.
- :type impersonated_user_: typing.Optional[str]
- :param auth_:
- Authentication information to use for this query.
- By default, the driver configuration is used.
- See also the Session config :ref:`session-auth-ref`.
- :type auth_: typing.Tuple[typing.Any, typing.Any] | Auth | None
- :param result_transformer_:
- A function that gets passed the :class:`neo4j.AsyncResult` object
- resulting from the query and converts it to a different type. The
- result of the transformer function is returned by this method.
- .. warning::
- The transformer function must **not** return the
- :class:`neo4j.AsyncResult` itself.
- .. warning::
- N.B. the driver might retry the underlying transaction so the
- transformer might get invoked more than once (with different
- :class:`neo4j.AsyncResult` objects).
- Therefore, it needs to be idempotent (i.e., have the same
- effect, regardless if called once or many times).
- Example transformer that checks that exactly one record is in the
- result stream, then returns the record and the result summary::
- from typing import Tuple
- import neo4j
- async def transformer(
- result: neo4j.AsyncResult
- ) -> Tuple[neo4j.Record, neo4j.ResultSummary]:
- record = await result.single(strict=True)
- summary = await result.consume()
- return record, summary
- Note that methods of :class:`neo4j.AsyncResult` that don't take
- mandatory arguments can be used directly as transformer functions.
- For example::
- import neo4j
- async def example(driver: neo4j.AsyncDriver) -> neo4j.Record::
- record = await driver.execute_query(
- "SOME QUERY",
- result_transformer_=neo4j.AsyncResult.single
- )
- # is equivalent to:
- async def transformer(result: neo4j.AsyncResult) -> neo4j.Record:
- return await result.single()
- async def example(driver: neo4j.AsyncDriver) -> neo4j.Record::
- record = await driver.execute_query(
- "SOME QUERY",
- result_transformer_=transformer
- )
- :type result_transformer_:
- typing.Callable[[AsyncResult], typing.Awaitable[T]]
- :param bookmark_manager_:
- Specify a bookmark manager to use.
- If present, the bookmark manager is used to keep the query causally
- consistent with all work executed using the same bookmark manager.
- Defaults to the driver's :attr:`.execute_query_bookmark_manager`.
- Pass :data:`None` to disable causal consistency.
- :type bookmark_manager_: AsyncBookmarkManager | BookmarkManager | None
- :param kwargs: additional keyword parameters. None of these can end
- with a single underscore. This is to avoid collisions with the
- keyword configuration parameters of this method. If you need to
- pass such a parameter, use the ``parameters_`` parameter instead.
- Parameters passed as kwargs take precedence over those passed in
- ``parameters_``.
- :type kwargs: typing.Any
- :returns: the result of the ``result_transformer_``
- :rtype: T
- .. versionadded:: 5.5
- .. versionchanged:: 5.8
- * Added ``auth_`` parameter in preview.
- * Stabilized from experimental.
- .. versionchanged:: 5.14
- Stabilized ``auth_`` parameter from preview.
- .. versionchanged:: 5.15
- The ``query_`` parameter now also accepts a :class:`.Query` object
- instead of only :class:`str`.
- ''' # noqa: E501 example code isn't too long
- self._check_state()
- invalid_kwargs = [
- k for k in kwargs if k[-2:-1] != "_" and k[-1:] == "_"
- ]
- if invalid_kwargs:
- raise ValueError(
- "keyword parameters must not end with a single '_'. "
- f"Found: {invalid_kwargs!r}\n"
- "\nYou either misspelled an existing configuration parameter "
- "or tried to send a query parameter that is reserved. In the "
- "latter case, use the `parameters_` dictionary instead."
- )
- if isinstance(query_, Query):
- timeout = query_.timeout
- metadata = query_.metadata
- query_str = query_.text
- work = unit_of_work(metadata, timeout)(_work)
- else:
- query_str = query_
- work = _work
- parameters = dict(parameters_ or {}, **kwargs)
- if bookmark_manager_ is _default:
- bookmark_manager_ = self._query_bookmark_manager
- assert bookmark_manager_ is not _default
- session_config = self._read_session_config(
- {
- "database": database_,
- "impersonated_user": impersonated_user_,
- "bookmark_manager": bookmark_manager_,
- "auth": auth_,
- }
- )
- session = self._session(session_config)
- async with session:
- if routing_ == RoutingControl.WRITE:
- access_mode = WRITE_ACCESS
- elif routing_ == RoutingControl.READ:
- access_mode = READ_ACCESS
- else:
- raise ValueError(
- f"Invalid routing control value: {routing_!r}"
- )
- with session._pipelined_begin:
- return await session._run_transaction(
- access_mode,
- TelemetryAPI.DRIVER,
- work,
- (query_str, parameters, result_transformer_),
- {},
- )
- @property
- def execute_query_bookmark_manager(self) -> AsyncBookmarkManager:
- """
- The driver's default query bookmark manager.
- This is the default :class:`.AsyncBookmarkManager` used by
- :meth:`.execute_query`. This can be used to causally chain
- :meth:`.execute_query` calls and sessions. Example::
- async def example(driver: neo4j.AsyncDriver) -> None:
- await driver.execute_query("<QUERY 1>")
- async with driver.session(
- bookmark_manager=driver.execute_query_bookmark_manager
- ) as session:
- # every query inside this session will be causally chained
- # (i.e., can read what was written by <QUERY 1>)
- await session.run("<QUERY 2>")
- # subsequent execute_query calls will be causally chained
- # (i.e., can read what was written by <QUERY 2>)
- await driver.execute_query("<QUERY 3>")
- .. versionadded:: 5.5
- .. versionchanged:: 5.8
- * Renamed from ``query_bookmark_manager`` to
- ``execute_query_bookmark_manager``.
- * Stabilized from experimental.
- """
- return self._query_bookmark_manager
- if t.TYPE_CHECKING:
- async def verify_connectivity(
- self,
- *,
- # all arguments are experimental
- # they may be change or removed any time without prior notice
- session_connection_timeout: float = ...,
- connection_acquisition_timeout: float = ...,
- max_transaction_retry_time: float = ...,
- database: str | None = ...,
- fetch_size: int = ...,
- impersonated_user: str | None = ...,
- bookmarks: t.Iterable[str] | Bookmarks | None = ...,
- default_access_mode: str = ...,
- bookmark_manager: (
- AsyncBookmarkManager | BookmarkManager | None
- ) = ...,
- auth: Auth | tuple[str, str] = ...,
- notifications_min_severity: (
- T_NotificationMinimumSeverity | None
- ) = ...,
- notifications_disabled_categories: (
- t.Iterable[T_NotificationDisabledCategory] | None
- ) = ...,
- notifications_disabled_classifications: (
- t.Iterable[T_NotificationDisabledCategory] | None
- ) = ...,
- # undocumented/unsupported options
- initial_retry_delay: float = ...,
- retry_delay_multiplier: float = ...,
- retry_delay_jitter_factor: float = ...,
- ) -> None: ...
- else:
- async def verify_connectivity(self, **config) -> None:
- """
- Verify that the driver can establish a connection to the server.
- This verifies if the driver can establish a reading connection to a
- remote server or a cluster. Some data will be exchanged.
- .. note::
- Even if this method raises an exception, the driver still needs
- to be closed via :meth:`close` to free up all resources.
- :param config: accepts the same configuration key-word arguments as
- :meth:`session`.
- .. warning::
- All configuration key-word arguments are experimental.
- They might be changed or removed in any future version
- without prior notice.
- :raises Exception: if the driver cannot connect to the remote.
- Use the exception to further understand the cause of the
- connectivity problem.
- .. versionchanged:: 5.0
- The undocumented return value has been removed.
- If you need information about the remote server, use
- :meth:`get_server_info` instead.
- """
- self._check_state()
- if config:
- experimental_warn(
- "All configuration key-word arguments to "
- "verify_connectivity() are experimental. They might be "
- "changed or removed in any future version without prior "
- "notice."
- )
- session_config = self._read_session_config(config)
- await self._get_server_info(session_config)
- if t.TYPE_CHECKING:
- async def get_server_info(
- self,
- *,
- # all arguments are experimental
- # they may be change or removed any time without prior notice
- session_connection_timeout: float = ...,
- connection_acquisition_timeout: float = ...,
- max_transaction_retry_time: float = ...,
- database: str | None = ...,
- fetch_size: int = ...,
- impersonated_user: str | None = ...,
- bookmarks: t.Iterable[str] | Bookmarks | None = ...,
- default_access_mode: str = ...,
- bookmark_manager: (
- AsyncBookmarkManager | BookmarkManager | None
- ) = ...,
- auth: Auth | tuple[str, str] = ...,
- notifications_min_severity: (
- T_NotificationMinimumSeverity | None
- ) = ...,
- notifications_disabled_categories: (
- t.Iterable[T_NotificationDisabledCategory] | None
- ) = ...,
- notifications_disabled_classifications: (
- t.Iterable[T_NotificationDisabledCategory] | None
- ) = ...,
- # undocumented/unsupported options
- initial_retry_delay: float = ...,
- retry_delay_multiplier: float = ...,
- retry_delay_jitter_factor: float = ...,
- ) -> ServerInfo: ...
- else:
- async def get_server_info(self, **config) -> ServerInfo:
- """
- Get information about the connected Neo4j server.
- Try to establish a working read connection to the remote server or
- a member of a cluster and exchange some data. Then return the
- contacted server's information.
- In a cluster, there is no guarantee about which server will be
- contacted.
- .. note::
- Even if this method raises an exception, the driver still needs
- to be closed via :meth:`close` to free up all resources.
- :param config: accepts the same configuration key-word arguments as
- :meth:`session`.
- .. warning::
- All configuration key-word arguments are experimental.
- They might be changed or removed in any future version
- without prior notice.
- :raises Exception: if the driver cannot connect to the remote.
- Use the exception to further understand the cause of the
- connectivity problem.
- .. versionadded:: 5.0
- """
- self._check_state()
- if config:
- experimental_warn(
- "All configuration key-word arguments to "
- "get_server_info() are experimental. They might be "
- "changed or removed in any future version without prior "
- "notice."
- )
- session_config = self._read_session_config(config)
- return await self._get_server_info(session_config)
- async def supports_multi_db(self) -> bool:
- """
- Check if the server or cluster supports multi-databases.
- :returns: Returns true if the server or cluster the driver connects to
- supports multi-databases, otherwise false.
- .. note::
- Feature support query based solely on the Bolt protocol version.
- The feature might still be disabled on the server side even if this
- function return :data:`True`. It just guarantees that the driver
- won't throw a :exc:`.ConfigurationError` when trying to use this
- driver feature.
- """
- self._check_state()
- session_config = self._read_session_config({})
- async with self._session(session_config) as session:
- await session._connect(READ_ACCESS)
- assert session._connection
- return session._connection.supports_multiple_databases
- if t.TYPE_CHECKING:
- async def verify_authentication(
- self,
- auth: Auth | tuple[str, str] | None = None,
- # all other arguments are experimental
- # they may be change or removed any time without prior notice
- session_connection_timeout: float = ...,
- connection_acquisition_timeout: float = ...,
- max_transaction_retry_time: float = ...,
- database: str | None = ...,
- fetch_size: int = ...,
- impersonated_user: str | None = ...,
- bookmarks: t.Iterable[str] | Bookmarks | None = ...,
- default_access_mode: str = ...,
- bookmark_manager: (
- AsyncBookmarkManager | BookmarkManager | None
- ) = ...,
- # undocumented/unsupported options
- initial_retry_delay: float = ...,
- retry_delay_multiplier: float = ...,
- retry_delay_jitter_factor: float = ...,
- ) -> bool: ...
- else:
- async def verify_authentication(
- self,
- auth: Auth | tuple[str, str] | None = None,
- **config,
- ) -> bool:
- """
- Verify that the authentication information is valid.
- Like :meth:`.verify_connectivity`, but for checking authentication.
- Try to establish a working read connection to the remote server or
- a member of a cluster and exchange some data. In a cluster, there
- is no guarantee about which server will be contacted. If the data
- exchange is successful and the authentication information is valid,
- :data:`True` is returned. Otherwise, the error will be matched
- against a list of known authentication errors. If the error is on
- that list, :data:`False` is returned indicating that the
- authentication information is invalid. Otherwise, the error is
- re-raised.
- :param auth: authentication information to verify.
- Same as the session config :ref:`auth-ref`.
- :param config: accepts the same configuration key-word arguments as
- :meth:`session`.
- .. warning::
- All configuration key-word arguments (except ``auth``) are
- experimental. They might be changed or removed in any
- future version without prior notice.
- :raises Exception: if the driver cannot connect to the remote.
- Use the exception to further understand the cause of the
- connectivity problem.
- .. versionadded:: 5.8
- .. versionchanged:: 5.14 Stabilized from experimental.
- """
- self._check_state()
- if config:
- experimental_warn(
- "All configuration key-word arguments but auth to "
- "verify_authentication() are experimental. They might be "
- "changed or removed in any future version without prior "
- "notice."
- )
- if "database" not in config:
- config["database"] = "system"
- session_config = self._read_session_config(config)
- session_config = SessionConfig(session_config, {"auth": auth})
- async with self._session(session_config) as session:
- try:
- await session._verify_authentication()
- except Neo4jError as exc:
- if exc.code in {
- "Neo.ClientError.Security.CredentialsExpired",
- "Neo.ClientError.Security.Forbidden",
- "Neo.ClientError.Security.TokenExpired",
- "Neo.ClientError.Security.Unauthorized",
- }:
- return False
- raise
- return True
- async def supports_session_auth(self) -> bool:
- """
- Check if the remote supports connection re-authentication.
- :returns: Returns true if the server or cluster the driver connects to
- supports re-authentication of existing connections, otherwise
- false.
- .. note::
- Feature support query based solely on the Bolt protocol version.
- The feature might still be disabled on the server side even if this
- function return :data:`True`. It just guarantees that the driver
- won't throw a :exc:`.ConfigurationError` when trying to use this
- driver feature.
- .. versionadded:: 5.8
- """
- self._check_state()
- session_config = self._read_session_config({})
- async with self._session(session_config) as session:
- await session._connect(READ_ACCESS)
- assert session._connection
- return session._connection.supports_re_auth
- async def _get_server_info(self, session_config) -> ServerInfo:
- async with self._session(session_config) as session:
- return await session._get_server_info()
- async def _work(
- tx: AsyncManagedTransaction,
- query: te.LiteralString,
- parameters: dict[str, t.Any],
- transformer: t.Callable[[AsyncResult], t.Awaitable[_T]],
- ) -> _T:
- res = await tx.run(query, parameters)
- return await transformer(res)
- class AsyncBoltDriver(_Direct, AsyncDriver):
- """
- :class:`.AsyncBoltDriver` is instantiated for ``bolt`` URIs.
- It addresses a single database machine. This may be a standalone server or
- could be a specific member of a cluster.
- Connections established by a :class:`.AsyncBoltDriver` are always made to
- the exact host and port detailed in the URI.
- This class is not supposed to be instantiated externally. Use
- :meth:`AsyncGraphDatabase.driver` instead.
- """
- @classmethod
- def open(cls, target, **config):
- from .io import AsyncBoltPool
- address = cls.parse_target(target)
- pool_config, default_workspace_config = Config.consume_chain(
- config, AsyncPoolConfig, WorkspaceConfig
- )
- pool = AsyncBoltPool.open(
- address,
- pool_config=pool_config,
- workspace_config=default_workspace_config,
- )
- return cls(pool, default_workspace_config)
- def __init__(self, pool, default_workspace_config):
- _Direct.__init__(self, pool.address)
- AsyncDriver.__init__(self, pool, default_workspace_config)
- self._default_workspace_config = default_workspace_config
- class AsyncNeo4jDriver(_Routing, AsyncDriver):
- """
- :class:`.AsyncNeo4jDriver` is instantiated for ``neo4j`` URIs.
- The routing behaviour works in tandem with Neo4j's `Causal Clustering
- <https://neo4j.com/docs/operations-manual/current/clustering/>`_
- feature by directing read and write behaviour to appropriate
- cluster members.
- This class is not supposed to be instantiated externally. Use
- :meth:`AsyncGraphDatabase.driver` instead.
- """
- @classmethod
- def open(cls, *targets, routing_context=None, **config):
- from .io import AsyncNeo4jPool
- addresses = cls.parse_targets(*targets)
- pool_config, default_workspace_config = Config.consume_chain(
- config, AsyncPoolConfig, WorkspaceConfig
- )
- pool = AsyncNeo4jPool.open(
- *addresses,
- routing_context=routing_context,
- pool_config=pool_config,
- workspace_config=default_workspace_config,
- )
- return cls(pool, default_workspace_config)
- def __init__(self, pool, default_workspace_config):
- _Routing.__init__(self, [pool.address])
- AsyncDriver.__init__(self, pool, default_workspace_config)
- def _normalize_notifications_config(config_kwargs, *, driver_level=False):
- list_config_keys = (
- "notifications_disabled_categories",
- "notifications_disabled_classifications",
- )
- for key in list_config_keys:
- value = config_kwargs.get(key)
- if value is not None:
- config_kwargs[key] = [getattr(e, "value", e) for e in value]
- disabled_categories = config_kwargs.pop(
- "notifications_disabled_categories", None
- )
- if disabled_categories is not None:
- disabled_classifications = config_kwargs.get(
- "notifications_disabled_classifications"
- )
- if disabled_classifications is None:
- disabled_classifications = disabled_categories
- else:
- disabled_classifications = list(
- {*disabled_categories, *disabled_classifications}
- )
- config_kwargs["notifications_disabled_classifications"] = (
- disabled_classifications
- )
- single_config_keys = (
- "notifications_min_severity",
- "warn_notification_severity",
- )
- for key in single_config_keys:
- value = config_kwargs.get(key)
- if value is not None:
- config_kwargs[key] = getattr(value, "value", value)
- value = config_kwargs.get("warn_notification_severity")
- if value not in {*NotificationMinimumSeverity, None}:
- raise ValueError(
- f"Invalid value for configuration "
- f"warn_notification_severity: {value}. Should be None, a "
- f"NotificationMinimumSeverity, or a string representing a "
- f"NotificationMinimumSeverity."
- )
- if driver_level:
- if value is None:
- if DEBUG_ENABLED:
- config_kwargs["warn_notification_severity"] = (
- NotificationMinimumSeverity.INFORMATION
- )
- elif value == NotificationMinimumSeverity.OFF:
- config_kwargs["warn_notification_severity"] = None
|