123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327 |
- # Copyright (c) "Neo4j"
- # Neo4j Sweden AB [https://neo4j.com]
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # https://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- from __future__ import annotations
- import abc
- import time
- import typing as t
- from dataclasses import dataclass
- from .api import Auth
- from .exceptions import AuthError
- if t.TYPE_CHECKING:
- from os import PathLike
- from typing_extensions import Protocol as _Protocol
- from .api import _TAuth
- from .exceptions import Neo4jError
- else:
- _Protocol = object
- @dataclass
- class ExpiringAuth:
- """
- Represents potentially expiring authentication information.
- This class is used with :meth:`.AuthManagers.bearer` and
- :meth:`.AsyncAuthManagers.bearer`.
- :param auth: The authentication information.
- :param expires_at:
- Unix timestamp (seconds since 1970-01-01 00:00:00 UTC)
- indicating when the authentication information expires.
- If :data:`None`, the authentication information is considered to not
- expire until the server explicitly indicates so.
- .. seealso::
- :meth:`.AuthManagers.bearer`,
- :meth:`.AsyncAuthManagers.bearer`
- .. versionadded:: 5.8
- .. versionchanged:: 5.9
- * Removed parameter and attribute ``expires_in`` (relative expiration
- time). Replaced with ``expires_at`` (absolute expiration time).
- * :meth:`.expires_in` can be used to create an :class:`.ExpiringAuth`
- with a relative expiration time.
- .. versionchanged:: 5.14 Stabilized from preview.
- """
- auth: _TAuth
- expires_at: float | None = None
- def expires_in(self, seconds: float) -> ExpiringAuth:
- """
- Return a (flat) copy of this object with a new expiration time.
- This is a convenience method for creating an :class:`.ExpiringAuth`
- for a relative expiration time ("expires in" instead of "expires at").
- >>> import time, freezegun
- >>> with freezegun.freeze_time("1970-01-01 00:00:40"):
- ... ExpiringAuth(("user", "pass")).expires_in(2)
- ExpiringAuth(auth=('user', 'pass'), expires_at=42.0)
- >>> with freezegun.freeze_time("1970-01-01 00:00:40"):
- ... ExpiringAuth(("user", "pass"), time.time() + 2)
- ExpiringAuth(auth=('user', 'pass'), expires_at=42.0)
- :param seconds:
- The number of seconds from now until the authentication information
- expires.
- .. versionadded:: 5.9
- """
- return ExpiringAuth(self.auth, time.time() + seconds)
- def expiring_auth_has_expired(auth: ExpiringAuth) -> bool:
- expires_at = auth.expires_at
- return expires_at is not None and expires_at < time.time()
- class AuthManager(metaclass=abc.ABCMeta):
- """
- Abstract base class for authentication information managers.
- The driver provides some default implementations of this class in
- :class:`.AuthManagers` for convenience.
- Custom implementations of this class can be used to provide more complex
- authentication refresh functionality.
- .. warning::
- The manager **must not** interact with the driver in any way as this
- can cause deadlocks and undefined behaviour.
- Furthermore, the manager is expected to be thread-safe.
- The token returned must always belong to the same identity.
- Switching identities using the `AuthManager` is undefined behavior.
- You may use :ref:`session-level authentication<session-auth-ref>`
- for such use-cases.
- .. seealso:: :class:`.AuthManagers`
- .. versionadded:: 5.8
- .. versionchanged:: 5.12
- ``on_auth_expired`` was removed from the interface and replaced by
- :meth:`handle_security_exception`. The new method is called when the
- server returns any `Neo.ClientError.Security.*` error. Its signature
- differs in that it additionally receives the error returned by the
- server and returns a boolean indicating whether the error was handled.
- .. versionchanged:: 5.14 Stabilized from preview.
- """
- @abc.abstractmethod
- def get_auth(self) -> _TAuth:
- """
- Return the current authentication information.
- The driver will call this method very frequently. It is recommended
- to implement some form of caching to avoid unnecessary overhead.
- .. warning::
- The method must only ever return auth information belonging to the
- same identity.
- Switching identities using the `AuthManager` is undefined behavior.
- You may use :ref:`session-level authentication<session-auth-ref>`
- for such use-cases.
- """
- ...
- @abc.abstractmethod
- def handle_security_exception(
- self, auth: _TAuth, error: Neo4jError
- ) -> bool:
- """
- Handle the server indicating authentication failure.
- The driver will call this method when the server returns any
- `Neo.ClientError.Security.*` error. The error will then be processed
- further as usual.
- :param auth:
- The authentication information that was used when the server
- returned the error.
- :param error:
- The error returned by the server.
- :returns:
- Whether the error was handled (:data:`True`), in which case the
- driver will mark the error as retryable
- (see :meth:`.Neo4jError.is_retryable`).
- .. versionadded:: 5.12
- """
- ...
- class AsyncAuthManager(_Protocol, metaclass=abc.ABCMeta):
- """
- Async version of :class:`.AuthManager`.
- .. seealso:: :class:`.AuthManager`
- .. versionadded:: 5.8
- .. versionchanged:: 5.12
- ``on_auth_expired`` was removed from the interface and replaced by
- :meth:`handle_security_exception`. See :class:`.AuthManager`.
- .. versionchanged:: 5.14 Stabilized from preview.
- """
- @abc.abstractmethod
- async def get_auth(self) -> _TAuth:
- """
- Async version of :meth:`.AuthManager.get_auth`.
- .. seealso:: :meth:`.AuthManager.get_auth`
- """
- ...
- @abc.abstractmethod
- async def handle_security_exception(
- self, auth: _TAuth, error: Neo4jError
- ) -> bool:
- """
- Async version of :meth:`.AuthManager.handle_security_exception`.
- .. seealso:: :meth:`.AuthManager.handle_security_exception`
- """
- ...
- @dataclass
- class ClientCertificate:
- """
- Simple data class to hold client certificate information.
- The attributes are the same as the arguments to
- :meth:`ssl.SSLContext.load_cert_chain()`.
- .. versionadded:: 5.19
- .. versionchanged:: 5.27 Stabilized from preview.
- """
- certfile: str | bytes | PathLike[str] | PathLike[bytes]
- keyfile: str | bytes | PathLike[str] | PathLike[bytes] | None = None
- password: t.Callable[[], str | bytes] | str | bytes | None = None
- class ClientCertificateProvider(_Protocol, metaclass=abc.ABCMeta):
- """
- Interface for providing a client certificate to the driver for mutual TLS.
- This is an abstract base class (:class:`abc.ABC`) as well as a protocol
- (:class:`typing.Protocol`). Meaning you can either inherit from it or just
- implement all required method on a class to satisfy the type constraints.
- The package provides some default implementations of this class in
- :class:`.ClientCertificateProviders` for convenience.
- The driver will call :meth:`.get_certificate` to check if the client wants
- the driver to use as new certificate for mutual TLS.
- The certificate is only used as a second factor for authenticating the
- client.
- The DBMS user still needs to authenticate with an authentication token.
- Note that the work done in the methods of this interface count towards the
- connection acquisition affected by the respective timeout setting
- :ref:`connection-acquisition-timeout-ref`.
- Should fetching the certificate be particularly slow, it might be necessary
- to increase the timeout.
- .. warning::
- The provider **must not** interact with the driver in any way as this
- can cause deadlocks and undefined behaviour.
- .. versionadded:: 5.19
- .. versionchanged:: 5.27 Stabilized from preview.
- """
- @abc.abstractmethod
- def get_certificate(self) -> ClientCertificate | None:
- """
- Return the new certificate (if present) to use for new connections.
- If no new certificate is available, return :data:`None`.
- This will make the driver continue using the current certificate.
- Note that a new certificate will only be used for new connections.
- Already established connections will continue using the old
- certificate as TLS is established during connection setup.
- :returns: The new certificate to use for new connections.
- """
- ...
- class AsyncClientCertificateProvider(_Protocol, metaclass=abc.ABCMeta):
- """
- Async version of :class:`.ClientCertificateProvider`.
- The package provides some default implementations of this class in
- :class:`.AsyncClientCertificateProviders` for convenience.
- .. seealso::
- :class:`.ClientCertificateProvider`,
- :class:`.AsyncClientCertificateProviders`
- .. versionadded:: 5.19
- .. versionchanged:: 5.27 Stabilized from preview.
- """
- @abc.abstractmethod
- async def get_certificate(self) -> ClientCertificate | None:
- """
- Return the new certificate (if present) to use for new connections.
- .. seealso:: :meth:`.ClientCertificateProvider.get_certificate`
- """
- ...
- def to_auth_dict(auth: _TAuth) -> dict[str, t.Any]:
- # Determine auth details
- if not auth:
- return {}
- elif isinstance(auth, tuple) and 2 <= len(auth) <= 3:
- return vars(Auth("basic", *auth))
- else:
- try:
- return vars(auth)
- except (KeyError, TypeError) as e:
- # TODO: 6.0 - change this to be a DriverError (or subclass)
- raise AuthError(
- f"Cannot determine auth details from {auth!r}"
- ) from e
|