driver.py 53 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431
  1. # Copyright (c) "Neo4j"
  2. # Neo4j Sweden AB [https://neo4j.com]
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # https://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from __future__ import annotations
  16. import asyncio
  17. import typing as t
  18. if t.TYPE_CHECKING:
  19. import ssl
  20. import typing_extensions as te
  21. from .._api import (
  22. T_NotificationDisabledCategory,
  23. T_NotificationMinimumSeverity,
  24. )
  25. from .._api import (
  26. NotificationMinimumSeverity,
  27. RoutingControl,
  28. TelemetryAPI,
  29. )
  30. from .._async_compat.util import AsyncUtil
  31. from .._conf import (
  32. Config,
  33. ConfigurationError,
  34. SessionConfig,
  35. TrustAll,
  36. TrustStore,
  37. WorkspaceConfig,
  38. )
  39. from .._debug import ENABLED as DEBUG_ENABLED
  40. from .._meta import (
  41. deprecation_warn,
  42. experimental_warn,
  43. preview_warn,
  44. unclosed_resource_warn,
  45. )
  46. from .._work import (
  47. EagerResult,
  48. Query,
  49. unit_of_work,
  50. )
  51. from ..addressing import Address
  52. from ..api import (
  53. AsyncBookmarkManager,
  54. Auth,
  55. BookmarkManager,
  56. Bookmarks,
  57. DRIVER_BOLT,
  58. DRIVER_NEO4J,
  59. parse_neo4j_uri,
  60. parse_routing_context,
  61. READ_ACCESS,
  62. SECURITY_TYPE_SECURE,
  63. SECURITY_TYPE_SELF_SIGNED_CERTIFICATE,
  64. ServerInfo,
  65. TRUST_ALL_CERTIFICATES,
  66. TRUST_SYSTEM_CA_SIGNED_CERTIFICATES,
  67. URI_SCHEME_BOLT,
  68. URI_SCHEME_BOLT_SECURE,
  69. URI_SCHEME_BOLT_SELF_SIGNED_CERTIFICATE,
  70. URI_SCHEME_NEO4J,
  71. URI_SCHEME_NEO4J_SECURE,
  72. URI_SCHEME_NEO4J_SELF_SIGNED_CERTIFICATE,
  73. WRITE_ACCESS,
  74. )
  75. from ..auth_management import (
  76. AsyncAuthManager,
  77. AsyncAuthManagers,
  78. AsyncClientCertificateProvider,
  79. ClientCertificate,
  80. )
  81. from ..exceptions import Neo4jError
  82. from .auth_management import _AsyncStaticClientCertificateProvider
  83. from .bookmark_manager import (
  84. AsyncNeo4jBookmarkManager,
  85. TBmConsumer as _TBmConsumer,
  86. TBmSupplier as _TBmSupplier,
  87. )
  88. from .config import AsyncPoolConfig
  89. from .work import (
  90. AsyncManagedTransaction,
  91. AsyncResult,
  92. AsyncSession,
  93. )
  94. if t.TYPE_CHECKING:
  95. import ssl
  96. from enum import Enum
  97. import typing_extensions as te
  98. from .._api import T_RoutingControl
  99. from ..api import _TAuth
  100. class _DefaultEnum(Enum):
  101. default = "default"
  102. _default = _DefaultEnum.default
  103. else:
  104. _default = object()
  105. _T = t.TypeVar("_T")
  106. class AsyncGraphDatabase:
  107. """Accessor for :class:`neo4j.AsyncDriver` construction."""
  108. if t.TYPE_CHECKING:
  109. @classmethod
  110. def driver(
  111. cls,
  112. uri: str,
  113. *,
  114. auth: _TAuth | AsyncAuthManager = ...,
  115. max_connection_lifetime: float = ...,
  116. liveness_check_timeout: float | None = ...,
  117. max_connection_pool_size: int = ...,
  118. connection_timeout: float = ...,
  119. trust: (
  120. te.Literal["TRUST_ALL_CERTIFICATES"]
  121. | te.Literal["TRUST_SYSTEM_CA_SIGNED_CERTIFICATES"]
  122. ) = ...,
  123. resolver: (
  124. t.Callable[[Address], t.Iterable[Address]]
  125. | t.Callable[[Address], t.Awaitable[t.Iterable[Address]]]
  126. ) = ...,
  127. encrypted: bool = ...,
  128. trusted_certificates: TrustStore = ...,
  129. client_certificate: (
  130. ClientCertificate | AsyncClientCertificateProvider | None
  131. ) = ...,
  132. ssl_context: ssl.SSLContext | None = ...,
  133. user_agent: str = ...,
  134. keep_alive: bool = ...,
  135. notifications_min_severity: (
  136. T_NotificationMinimumSeverity | None
  137. ) = ...,
  138. notifications_disabled_categories: (
  139. t.Iterable[T_NotificationDisabledCategory] | None
  140. ) = ...,
  141. notifications_disabled_classifications: (
  142. t.Iterable[T_NotificationDisabledCategory] | None
  143. ) = ...,
  144. warn_notification_severity: (
  145. T_NotificationMinimumSeverity | None
  146. ) = ...,
  147. telemetry_disabled: bool = ...,
  148. # undocumented/unsupported options
  149. # they may be changed or removed any time without prior notice
  150. connection_acquisition_timeout: float = ...,
  151. max_transaction_retry_time: float = ...,
  152. initial_retry_delay: float = ...,
  153. retry_delay_multiplier: float = ...,
  154. retry_delay_jitter_factor: float = ...,
  155. database: str | None = ...,
  156. fetch_size: int = ...,
  157. impersonated_user: str | None = ...,
  158. bookmark_manager: (
  159. AsyncBookmarkManager | BookmarkManager | None
  160. ) = ...,
  161. ) -> AsyncDriver: ...
  162. else:
  163. @classmethod
  164. def driver(
  165. cls,
  166. uri: str,
  167. *,
  168. auth: _TAuth | AsyncAuthManager = None,
  169. **config,
  170. ) -> AsyncDriver:
  171. """
  172. Create a driver.
  173. :param uri: the connection URI for the driver,
  174. see :ref:`async-uri-ref` for available URIs.
  175. :param auth: the authentication details,
  176. see :ref:`auth-ref` for available authentication details.
  177. :param config: driver configuration key-word arguments,
  178. see :ref:`async-driver-configuration-ref` for available
  179. key-word arguments.
  180. """
  181. driver_type, security_type, parsed = parse_neo4j_uri(uri)
  182. if not isinstance(auth, AsyncAuthManager):
  183. auth = AsyncAuthManagers.static(auth)
  184. config["auth"] = auth
  185. client_certificate = config.get("client_certificate")
  186. if isinstance(client_certificate, ClientCertificate):
  187. # using internal class until public factory is GA:
  188. # AsyncClientCertificateProviders.static
  189. config["client_certificate"] = (
  190. _AsyncStaticClientCertificateProvider(client_certificate)
  191. )
  192. # TODO: 6.0 - remove "trust" config option
  193. if "trust" in config and config["trust"] not in {
  194. TRUST_ALL_CERTIFICATES,
  195. TRUST_SYSTEM_CA_SIGNED_CERTIFICATES,
  196. }:
  197. raise ConfigurationError(
  198. "The config setting `trust` values are {!r}".format(
  199. [
  200. TRUST_ALL_CERTIFICATES,
  201. TRUST_SYSTEM_CA_SIGNED_CERTIFICATES,
  202. ]
  203. )
  204. )
  205. if "trusted_certificates" in config and not isinstance(
  206. config["trusted_certificates"], TrustStore
  207. ):
  208. raise ConfigurationError(
  209. 'The config setting "trusted_certificates" must be of '
  210. "type neo4j.TrustAll, neo4j.TrustCustomCAs, or"
  211. "neo4j.TrustSystemCAs but was {}".format(
  212. type(config["trusted_certificates"])
  213. )
  214. )
  215. if security_type in {
  216. SECURITY_TYPE_SELF_SIGNED_CERTIFICATE,
  217. SECURITY_TYPE_SECURE,
  218. } and (
  219. "encrypted" in config
  220. or "trust" in config
  221. or "trusted_certificates" in config
  222. or "ssl_context" in config
  223. ):
  224. # TODO: 6.0 - remove "trust" from error message
  225. raise ConfigurationError(
  226. 'The config settings "encrypted", "trust", '
  227. '"trusted_certificates", and "ssl_context" can only be '
  228. "used with the URI schemes {!r}. Use the other URI "
  229. "schemes {!r} for setting encryption settings.".format(
  230. [
  231. URI_SCHEME_BOLT,
  232. URI_SCHEME_NEO4J,
  233. ],
  234. [
  235. URI_SCHEME_BOLT_SELF_SIGNED_CERTIFICATE,
  236. URI_SCHEME_BOLT_SECURE,
  237. URI_SCHEME_NEO4J_SELF_SIGNED_CERTIFICATE,
  238. URI_SCHEME_NEO4J_SECURE,
  239. ],
  240. )
  241. )
  242. if security_type == SECURITY_TYPE_SECURE:
  243. config["encrypted"] = True
  244. elif security_type == SECURITY_TYPE_SELF_SIGNED_CERTIFICATE:
  245. config["encrypted"] = True
  246. config["trusted_certificates"] = TrustAll()
  247. if "notifications_disabled_classifications" in config:
  248. preview_warn(
  249. "notifications_disabled_classifications "
  250. "is a preview feature.",
  251. stack_level=2,
  252. )
  253. if "warn_notification_severity" in config:
  254. preview_warn(
  255. "notification warnings are a preview feature.",
  256. stack_level=2,
  257. )
  258. _normalize_notifications_config(config, driver_level=True)
  259. liveness_check_timeout = config.get("liveness_check_timeout")
  260. if (
  261. liveness_check_timeout is not None
  262. and liveness_check_timeout < 0
  263. ):
  264. raise ConfigurationError(
  265. 'The config setting "liveness_check_timeout" must be '
  266. "greater than or equal to 0 but was "
  267. f"{liveness_check_timeout}."
  268. )
  269. assert driver_type in {DRIVER_BOLT, DRIVER_NEO4J}
  270. if driver_type == DRIVER_BOLT:
  271. if parse_routing_context(parsed.query):
  272. deprecation_warn(
  273. 'Creating a direct driver ("bolt://" scheme) with '
  274. "routing context (URI parameters) is deprecated. They "
  275. "will be ignored. This will raise an error in a "
  276. f'future release. Given URI "{uri}"',
  277. stack_level=2,
  278. )
  279. # TODO: 6.0 - raise instead of warning
  280. # raise ValueError(
  281. # 'Routing parameters are not supported with scheme '
  282. # '"bolt". Given URI "{}".'.format(uri)
  283. # )
  284. return cls.bolt_driver(parsed.netloc, **config)
  285. # else driver_type == DRIVER_NEO4J
  286. routing_context = parse_routing_context(parsed.query)
  287. return cls.neo4j_driver(
  288. parsed.netloc, routing_context=routing_context, **config
  289. )
  290. @classmethod
  291. def bookmark_manager(
  292. cls,
  293. initial_bookmarks: Bookmarks | t.Iterable[str] | None = None,
  294. bookmarks_supplier: _TBmSupplier | None = None,
  295. bookmarks_consumer: _TBmConsumer | None = None,
  296. ) -> AsyncBookmarkManager:
  297. """
  298. Create a :class:`.AsyncBookmarkManager` with default implementation.
  299. Basic usage example to configure sessions with the built-in bookmark
  300. manager implementation so that all work is automatically causally
  301. chained (i.e., all reads can observe all previous writes even in a
  302. clustered setup)::
  303. import neo4j
  304. # omitting closing the driver for brevity
  305. driver = neo4j.AsyncGraphDatabase.driver(...)
  306. bookmark_manager = neo4j.AsyncGraphDatabase.bookmark_manager(...)
  307. async with driver.session(
  308. bookmark_manager=bookmark_manager
  309. ) as session1:
  310. async with driver.session(
  311. bookmark_manager=bookmark_manager,
  312. access_mode=neo4j.READ_ACCESS
  313. ) as session2:
  314. result1 = await session1.run("<WRITE_QUERY>")
  315. await result1.consume()
  316. # READ_QUERY is guaranteed to see what WRITE_QUERY wrote.
  317. result2 = await session2.run("<READ_QUERY>")
  318. await result2.consume()
  319. This is a very contrived example, and in this particular case, having
  320. both queries in the same session has the exact same effect and might
  321. even be more performant. However, when dealing with sessions spanning
  322. multiple threads, async Tasks, processes, or even hosts, the bookmark
  323. manager can come in handy as sessions are not safe to be used
  324. concurrently.
  325. :param initial_bookmarks:
  326. The initial set of bookmarks. The returned bookmark manager will
  327. use this to initialize its internal bookmarks.
  328. :param bookmarks_supplier:
  329. Function which will be called every time the default bookmark
  330. manager's method :meth:`.AsyncBookmarkManager.get_bookmarks`
  331. gets called.
  332. The function takes no arguments and must return a
  333. :class:`.Bookmarks` object. The result of ``bookmarks_supplier``
  334. will then be concatenated with the internal set of bookmarks and
  335. used to configure the session in creation. It will, however, not
  336. update the internal set of bookmarks.
  337. :param bookmarks_consumer:
  338. Function which will be called whenever the set of bookmarks
  339. handled by the bookmark manager gets updated with the new
  340. internal bookmark set. It will receive the new set of bookmarks
  341. as a :class:`.Bookmarks` object and return :data:`None`.
  342. :returns: A default implementation of :class:`.AsyncBookmarkManager`.
  343. .. versionadded:: 5.0
  344. .. versionchanged:: 5.3
  345. The bookmark manager no longer tracks bookmarks per database.
  346. This effectively changes the signature of almost all bookmark
  347. manager related methods:
  348. * ``initial_bookmarks`` is no longer a mapping from database name
  349. to bookmarks but plain bookmarks.
  350. * ``bookmarks_supplier`` no longer receives the database name as
  351. an argument.
  352. * ``bookmarks_consumer`` no longer receives the database name as
  353. an argument.
  354. .. versionchanged:: 5.8 Stabilized from experimental.
  355. """
  356. return AsyncNeo4jBookmarkManager(
  357. initial_bookmarks=initial_bookmarks,
  358. bookmarks_supplier=bookmarks_supplier,
  359. bookmarks_consumer=bookmarks_consumer,
  360. )
  361. @classmethod
  362. def bolt_driver(cls, target, **config):
  363. """
  364. Create a direct driver.
  365. Create a driver for direct Bolt server access that uses
  366. socket I/O and thread-based concurrency.
  367. """
  368. from .._exceptions import (
  369. BoltHandshakeError,
  370. BoltSecurityError,
  371. )
  372. try:
  373. return AsyncBoltDriver.open(target, **config)
  374. except (BoltHandshakeError, BoltSecurityError) as error:
  375. from ..exceptions import ServiceUnavailable
  376. raise ServiceUnavailable(str(error)) from error
  377. @classmethod
  378. def neo4j_driver(cls, *targets, routing_context=None, **config):
  379. """
  380. Create a routing driver.
  381. Create a driver for routing-capable Neo4j service access
  382. that uses socket I/O and thread-based concurrency.
  383. """
  384. # TODO: 6.0 - adjust signature to only take one target
  385. if len(targets) > 1:
  386. deprecation_warn(
  387. "Creating a routing driver with multiple targets is "
  388. "deprecated. The driver only uses the first target anyway. "
  389. "The method signature will change in a future release.",
  390. )
  391. from .._exceptions import (
  392. BoltHandshakeError,
  393. BoltSecurityError,
  394. )
  395. try:
  396. return AsyncNeo4jDriver.open(
  397. *targets, routing_context=routing_context, **config
  398. )
  399. except (BoltHandshakeError, BoltSecurityError) as error:
  400. from ..exceptions import ServiceUnavailable
  401. raise ServiceUnavailable(str(error)) from error
  402. class _Direct:
  403. # TODO: 6.0 - those attributes should be private
  404. default_host = "localhost"
  405. default_port = 7687
  406. default_target = ":"
  407. def __init__(self, address):
  408. self._address = address
  409. @property
  410. def address(self):
  411. return self._address
  412. @classmethod
  413. def parse_target(cls, target):
  414. """Parse a target string to produce an address."""
  415. if not target:
  416. target = cls.default_target
  417. return Address.parse(
  418. target,
  419. default_host=cls.default_host,
  420. default_port=cls.default_port,
  421. )
  422. class _Routing:
  423. # TODO: 6.0 - those attributes should be private
  424. default_host = "localhost"
  425. default_port = 7687
  426. default_targets = ": :17601 :17687"
  427. def __init__(self, initial_addresses):
  428. self._initial_addresses = initial_addresses
  429. @property
  430. def initial_addresses(self):
  431. return self._initial_addresses
  432. @classmethod
  433. def parse_targets(cls, *targets):
  434. """Parse a sequence of target strings to produce an address list."""
  435. targets = " ".join(targets)
  436. if not targets:
  437. targets = cls.default_targets
  438. return Address.parse_list(
  439. targets,
  440. default_host=cls.default_host,
  441. default_port=cls.default_port,
  442. )
  443. class AsyncDriver:
  444. """
  445. Base class for all driver types.
  446. Drivers are used as the primary access point to Neo4j.
  447. """
  448. #: Connection pool
  449. _pool: t.Any = None
  450. #: Flag if the driver has been closed
  451. _closed = False
  452. def __init__(self, pool, default_workspace_config):
  453. assert pool is not None
  454. assert default_workspace_config is not None
  455. self._pool = pool
  456. self._default_workspace_config = default_workspace_config
  457. self._query_bookmark_manager = AsyncGraphDatabase.bookmark_manager()
  458. async def __aenter__(self) -> AsyncDriver:
  459. return self
  460. async def __aexit__(self, exc_type, exc_value, traceback):
  461. await self.close()
  462. # Copy globals as function locals to make sure that they are available
  463. # during Python shutdown when the Pool is destroyed.
  464. def __del__(
  465. self,
  466. _unclosed_resource_warn=unclosed_resource_warn,
  467. _is_async_code=AsyncUtil.is_async_code,
  468. _deprecation_warn=deprecation_warn,
  469. ):
  470. if not self._closed:
  471. _unclosed_resource_warn(self)
  472. # TODO: 6.0 - remove this
  473. if _is_async_code:
  474. return
  475. if not self._closed:
  476. _deprecation_warn(
  477. "Relying on AsyncDriver's destructor to close the session "
  478. "is deprecated. Please make sure to close the session. "
  479. "Use it as a context (`with` statement) or make sure to "
  480. "call `.close()` explicitly. Future versions of the "
  481. "driver will not close drivers automatically."
  482. )
  483. self.close()
  484. def _check_state(self):
  485. if self._closed:
  486. # TODO: 6.0 - raise the error
  487. # raise DriverError("Driver closed")
  488. deprecation_warn(
  489. "Using a driver after it has been closed is deprecated. "
  490. "Future versions of the driver will raise an error.",
  491. stack_level=3,
  492. )
  493. @property
  494. def encrypted(self) -> bool:
  495. """Indicate whether the driver was configured to use encryption."""
  496. return bool(self._pool.pool_config.encrypted)
  497. if t.TYPE_CHECKING:
  498. def session(
  499. self,
  500. *,
  501. connection_acquisition_timeout: float = ...,
  502. max_transaction_retry_time: float = ...,
  503. database: str | None = ...,
  504. fetch_size: int = ...,
  505. impersonated_user: str | None = ...,
  506. bookmarks: t.Iterable[str] | Bookmarks | None = ...,
  507. default_access_mode: str = ...,
  508. bookmark_manager: (
  509. AsyncBookmarkManager | BookmarkManager | None
  510. ) = ...,
  511. auth: _TAuth = ...,
  512. notifications_min_severity: (
  513. T_NotificationMinimumSeverity | None
  514. ) = ...,
  515. notifications_disabled_categories: (
  516. t.Iterable[T_NotificationDisabledCategory] | None
  517. ) = ...,
  518. notifications_disabled_classifications: (
  519. t.Iterable[T_NotificationDisabledCategory] | None
  520. ) = ...,
  521. # undocumented/unsupported options
  522. # they may be change or removed any time without prior notice
  523. initial_retry_delay: float = ...,
  524. retry_delay_multiplier: float = ...,
  525. retry_delay_jitter_factor: float = ...,
  526. ) -> AsyncSession: ...
  527. else:
  528. def session(self, **config) -> AsyncSession:
  529. """
  530. Create a session.
  531. See :ref:`async-session-construction-ref` for details.
  532. :param config: session configuration key-word arguments,
  533. see :ref:`async-session-configuration-ref` for available
  534. key-word arguments.
  535. :returns: new :class:`neo4j.AsyncSession` object
  536. """
  537. if "warn_notification_severity" in config:
  538. # Would work just fine, but we don't want to introduce yet
  539. # another undocumented/unsupported config option.
  540. del config["warn_notification_severity"]
  541. self._check_state()
  542. if "notifications_disabled_classifications" in config:
  543. preview_warn(
  544. "notifications_disabled_classifications "
  545. "is a preview feature.",
  546. stack_level=2,
  547. )
  548. session_config = self._read_session_config(config)
  549. return self._session(session_config)
  550. def _session(self, session_config) -> AsyncSession:
  551. return AsyncSession(self._pool, session_config)
  552. def _read_session_config(self, config_kwargs):
  553. config = self._prepare_session_config(config_kwargs)
  554. return SessionConfig(self._default_workspace_config, config)
  555. @classmethod
  556. def _prepare_session_config(cls, config_kwargs):
  557. _normalize_notifications_config(config_kwargs)
  558. return config_kwargs
  559. async def close(self) -> None:
  560. """Shut down, closing any open connections in the pool."""
  561. # TODO: 6.0 - NOOP if already closed
  562. # if self._closed:
  563. # return
  564. try:
  565. await self._pool.close()
  566. except asyncio.CancelledError:
  567. self._closed = True
  568. raise
  569. self._closed = True
  570. # overloads to work around https://github.com/python/mypy/issues/3737
  571. @t.overload
  572. async def execute_query(
  573. self,
  574. query_: te.LiteralString | Query,
  575. parameters_: dict[str, t.Any] | None = None,
  576. routing_: T_RoutingControl = RoutingControl.WRITE,
  577. database_: str | None = None,
  578. impersonated_user_: str | None = None,
  579. bookmark_manager_: (
  580. AsyncBookmarkManager | BookmarkManager | None
  581. ) = ...,
  582. auth_: _TAuth = None,
  583. result_transformer_: t.Callable[
  584. [AsyncResult], t.Awaitable[EagerResult]
  585. ] = ...,
  586. **kwargs: t.Any,
  587. ) -> EagerResult: ...
  588. @t.overload
  589. async def execute_query(
  590. self,
  591. query_: te.LiteralString | Query,
  592. parameters_: dict[str, t.Any] | None = None,
  593. routing_: T_RoutingControl = RoutingControl.WRITE,
  594. database_: str | None = None,
  595. impersonated_user_: str | None = None,
  596. bookmark_manager_: (
  597. AsyncBookmarkManager | BookmarkManager | None
  598. ) = ...,
  599. auth_: _TAuth = None,
  600. result_transformer_: t.Callable[[AsyncResult], t.Awaitable[_T]] = ...,
  601. **kwargs: t.Any,
  602. ) -> _T: ...
  603. async def execute_query(
  604. self,
  605. query_: te.LiteralString | Query,
  606. parameters_: dict[str, t.Any] | None = None,
  607. routing_: T_RoutingControl = RoutingControl.WRITE,
  608. database_: str | None = None,
  609. impersonated_user_: str | None = None,
  610. bookmark_manager_: (
  611. AsyncBookmarkManager
  612. | BookmarkManager
  613. | None
  614. | te.Literal[_DefaultEnum.default]
  615. ) = _default,
  616. auth_: _TAuth = None,
  617. result_transformer_: t.Callable[
  618. [AsyncResult], t.Awaitable[t.Any]
  619. ] = AsyncResult.to_eager_result,
  620. **kwargs: t.Any,
  621. ) -> t.Any:
  622. '''
  623. Execute a query in a transaction function and return all results.
  624. This method is a handy wrapper for lower-level driver APIs like
  625. sessions, transactions, and transaction functions. It is intended
  626. for simple use cases where there is no need for managing all possible
  627. options.
  628. The internal usage of transaction functions provides a retry-mechanism
  629. for appropriate errors. Furthermore, this means that queries using
  630. ``CALL {} IN TRANSACTIONS`` or the older ``USING PERIODIC COMMIT``
  631. will not work (use :meth:`AsyncSession.run` for these).
  632. The method is roughly equivalent to::
  633. async def execute_query(
  634. query_, parameters_, routing_, database_, impersonated_user_,
  635. bookmark_manager_, auth_, result_transformer_, **kwargs
  636. ):
  637. @unit_of_work(query_.metadata, query_.timeout)
  638. async def work(tx):
  639. result = await tx.run(query_.text, parameters_, **kwargs)
  640. return await result_transformer_(result)
  641. async with driver.session(
  642. database=database_,
  643. impersonated_user=impersonated_user_,
  644. bookmark_manager=bookmark_manager_,
  645. auth=auth_,
  646. ) as session:
  647. if routing_ == RoutingControl.WRITE:
  648. return await session.execute_write(work)
  649. elif routing_ == RoutingControl.READ:
  650. return await session.execute_read(work)
  651. Usage example::
  652. from typing import List
  653. import neo4j
  654. async def example(driver: neo4j.AsyncDriver) -> List[str]:
  655. """Get the name of all 42 year-olds."""
  656. records, summary, keys = await driver.execute_query(
  657. "MATCH (p:Person {age: $age}) RETURN p.name",
  658. {"age": 42},
  659. routing_=neo4j.RoutingControl.READ, # or just "r"
  660. database_="neo4j",
  661. )
  662. assert keys == ["p.name"] # not needed, just for illustration
  663. # log_summary(summary) # log some metadata
  664. return [str(record["p.name"]) for record in records]
  665. # or: return [str(record[0]) for record in records]
  666. # or even: return list(map(lambda r: str(r[0]), records))
  667. Another example::
  668. import neo4j
  669. async def example(driver: neo4j.AsyncDriver) -> int:
  670. """Call all young people "My dear" and get their count."""
  671. record = await driver.execute_query(
  672. "MATCH (p:Person) WHERE p.age <= $age "
  673. "SET p.nickname = 'My dear' "
  674. "RETURN count(*)",
  675. # optional routing parameter, as write is default
  676. # routing_=neo4j.RoutingControl.WRITE, # or just "w",
  677. database_="neo4j",
  678. result_transformer_=neo4j.AsyncResult.single,
  679. age=15,
  680. )
  681. assert record is not None # for typechecking and illustration
  682. count = record[0]
  683. assert isinstance(count, int)
  684. return count
  685. :param query_:
  686. Cypher query to execute.
  687. Use a :class:`.Query` object to pass a query with additional
  688. transaction configuration.
  689. :type query_: typing.LiteralString | Query
  690. :param parameters_: parameters to use in the query
  691. :type parameters_: typing.Optional[typing.Dict[str, typing.Any]]
  692. :param routing_:
  693. Whether to route the query to a reader (follower/read replica) or
  694. a writer (leader) in the cluster. Default is to route to a writer.
  695. :type routing_: RoutingControl
  696. :param database_:
  697. Database to execute the query against.
  698. :data:`None` (default) uses the database configured on the server
  699. side.
  700. .. Note::
  701. It is recommended to always specify the database explicitly
  702. when possible. This allows the driver to work more efficiently,
  703. as it will not have to resolve the default database first.
  704. See also the Session config :ref:`database-ref`.
  705. :type database_: typing.Optional[str]
  706. :param impersonated_user_:
  707. Name of the user to impersonate.
  708. This means that all query will be executed in the security context
  709. of the impersonated user. For this, the user for which the
  710. :class:`Driver` has been created needs to have the appropriate
  711. permissions.
  712. See also the Session config :ref:`impersonated-user-ref`.
  713. :type impersonated_user_: typing.Optional[str]
  714. :param auth_:
  715. Authentication information to use for this query.
  716. By default, the driver configuration is used.
  717. See also the Session config :ref:`session-auth-ref`.
  718. :type auth_: typing.Tuple[typing.Any, typing.Any] | Auth | None
  719. :param result_transformer_:
  720. A function that gets passed the :class:`neo4j.AsyncResult` object
  721. resulting from the query and converts it to a different type. The
  722. result of the transformer function is returned by this method.
  723. .. warning::
  724. The transformer function must **not** return the
  725. :class:`neo4j.AsyncResult` itself.
  726. .. warning::
  727. N.B. the driver might retry the underlying transaction so the
  728. transformer might get invoked more than once (with different
  729. :class:`neo4j.AsyncResult` objects).
  730. Therefore, it needs to be idempotent (i.e., have the same
  731. effect, regardless if called once or many times).
  732. Example transformer that checks that exactly one record is in the
  733. result stream, then returns the record and the result summary::
  734. from typing import Tuple
  735. import neo4j
  736. async def transformer(
  737. result: neo4j.AsyncResult
  738. ) -> Tuple[neo4j.Record, neo4j.ResultSummary]:
  739. record = await result.single(strict=True)
  740. summary = await result.consume()
  741. return record, summary
  742. Note that methods of :class:`neo4j.AsyncResult` that don't take
  743. mandatory arguments can be used directly as transformer functions.
  744. For example::
  745. import neo4j
  746. async def example(driver: neo4j.AsyncDriver) -> neo4j.Record::
  747. record = await driver.execute_query(
  748. "SOME QUERY",
  749. result_transformer_=neo4j.AsyncResult.single
  750. )
  751. # is equivalent to:
  752. async def transformer(result: neo4j.AsyncResult) -> neo4j.Record:
  753. return await result.single()
  754. async def example(driver: neo4j.AsyncDriver) -> neo4j.Record::
  755. record = await driver.execute_query(
  756. "SOME QUERY",
  757. result_transformer_=transformer
  758. )
  759. :type result_transformer_:
  760. typing.Callable[[AsyncResult], typing.Awaitable[T]]
  761. :param bookmark_manager_:
  762. Specify a bookmark manager to use.
  763. If present, the bookmark manager is used to keep the query causally
  764. consistent with all work executed using the same bookmark manager.
  765. Defaults to the driver's :attr:`.execute_query_bookmark_manager`.
  766. Pass :data:`None` to disable causal consistency.
  767. :type bookmark_manager_: AsyncBookmarkManager | BookmarkManager | None
  768. :param kwargs: additional keyword parameters. None of these can end
  769. with a single underscore. This is to avoid collisions with the
  770. keyword configuration parameters of this method. If you need to
  771. pass such a parameter, use the ``parameters_`` parameter instead.
  772. Parameters passed as kwargs take precedence over those passed in
  773. ``parameters_``.
  774. :type kwargs: typing.Any
  775. :returns: the result of the ``result_transformer_``
  776. :rtype: T
  777. .. versionadded:: 5.5
  778. .. versionchanged:: 5.8
  779. * Added ``auth_`` parameter in preview.
  780. * Stabilized from experimental.
  781. .. versionchanged:: 5.14
  782. Stabilized ``auth_`` parameter from preview.
  783. .. versionchanged:: 5.15
  784. The ``query_`` parameter now also accepts a :class:`.Query` object
  785. instead of only :class:`str`.
  786. ''' # noqa: E501 example code isn't too long
  787. self._check_state()
  788. invalid_kwargs = [
  789. k for k in kwargs if k[-2:-1] != "_" and k[-1:] == "_"
  790. ]
  791. if invalid_kwargs:
  792. raise ValueError(
  793. "keyword parameters must not end with a single '_'. "
  794. f"Found: {invalid_kwargs!r}\n"
  795. "\nYou either misspelled an existing configuration parameter "
  796. "or tried to send a query parameter that is reserved. In the "
  797. "latter case, use the `parameters_` dictionary instead."
  798. )
  799. if isinstance(query_, Query):
  800. timeout = query_.timeout
  801. metadata = query_.metadata
  802. query_str = query_.text
  803. work = unit_of_work(metadata, timeout)(_work)
  804. else:
  805. query_str = query_
  806. work = _work
  807. parameters = dict(parameters_ or {}, **kwargs)
  808. if bookmark_manager_ is _default:
  809. bookmark_manager_ = self._query_bookmark_manager
  810. assert bookmark_manager_ is not _default
  811. session_config = self._read_session_config(
  812. {
  813. "database": database_,
  814. "impersonated_user": impersonated_user_,
  815. "bookmark_manager": bookmark_manager_,
  816. "auth": auth_,
  817. }
  818. )
  819. session = self._session(session_config)
  820. async with session:
  821. if routing_ == RoutingControl.WRITE:
  822. access_mode = WRITE_ACCESS
  823. elif routing_ == RoutingControl.READ:
  824. access_mode = READ_ACCESS
  825. else:
  826. raise ValueError(
  827. f"Invalid routing control value: {routing_!r}"
  828. )
  829. with session._pipelined_begin:
  830. return await session._run_transaction(
  831. access_mode,
  832. TelemetryAPI.DRIVER,
  833. work,
  834. (query_str, parameters, result_transformer_),
  835. {},
  836. )
  837. @property
  838. def execute_query_bookmark_manager(self) -> AsyncBookmarkManager:
  839. """
  840. The driver's default query bookmark manager.
  841. This is the default :class:`.AsyncBookmarkManager` used by
  842. :meth:`.execute_query`. This can be used to causally chain
  843. :meth:`.execute_query` calls and sessions. Example::
  844. async def example(driver: neo4j.AsyncDriver) -> None:
  845. await driver.execute_query("<QUERY 1>")
  846. async with driver.session(
  847. bookmark_manager=driver.execute_query_bookmark_manager
  848. ) as session:
  849. # every query inside this session will be causally chained
  850. # (i.e., can read what was written by <QUERY 1>)
  851. await session.run("<QUERY 2>")
  852. # subsequent execute_query calls will be causally chained
  853. # (i.e., can read what was written by <QUERY 2>)
  854. await driver.execute_query("<QUERY 3>")
  855. .. versionadded:: 5.5
  856. .. versionchanged:: 5.8
  857. * Renamed from ``query_bookmark_manager`` to
  858. ``execute_query_bookmark_manager``.
  859. * Stabilized from experimental.
  860. """
  861. return self._query_bookmark_manager
  862. if t.TYPE_CHECKING:
  863. async def verify_connectivity(
  864. self,
  865. *,
  866. # all arguments are experimental
  867. # they may be change or removed any time without prior notice
  868. session_connection_timeout: float = ...,
  869. connection_acquisition_timeout: float = ...,
  870. max_transaction_retry_time: float = ...,
  871. database: str | None = ...,
  872. fetch_size: int = ...,
  873. impersonated_user: str | None = ...,
  874. bookmarks: t.Iterable[str] | Bookmarks | None = ...,
  875. default_access_mode: str = ...,
  876. bookmark_manager: (
  877. AsyncBookmarkManager | BookmarkManager | None
  878. ) = ...,
  879. auth: Auth | tuple[str, str] = ...,
  880. notifications_min_severity: (
  881. T_NotificationMinimumSeverity | None
  882. ) = ...,
  883. notifications_disabled_categories: (
  884. t.Iterable[T_NotificationDisabledCategory] | None
  885. ) = ...,
  886. notifications_disabled_classifications: (
  887. t.Iterable[T_NotificationDisabledCategory] | None
  888. ) = ...,
  889. # undocumented/unsupported options
  890. initial_retry_delay: float = ...,
  891. retry_delay_multiplier: float = ...,
  892. retry_delay_jitter_factor: float = ...,
  893. ) -> None: ...
  894. else:
  895. async def verify_connectivity(self, **config) -> None:
  896. """
  897. Verify that the driver can establish a connection to the server.
  898. This verifies if the driver can establish a reading connection to a
  899. remote server or a cluster. Some data will be exchanged.
  900. .. note::
  901. Even if this method raises an exception, the driver still needs
  902. to be closed via :meth:`close` to free up all resources.
  903. :param config: accepts the same configuration key-word arguments as
  904. :meth:`session`.
  905. .. warning::
  906. All configuration key-word arguments are experimental.
  907. They might be changed or removed in any future version
  908. without prior notice.
  909. :raises Exception: if the driver cannot connect to the remote.
  910. Use the exception to further understand the cause of the
  911. connectivity problem.
  912. .. versionchanged:: 5.0
  913. The undocumented return value has been removed.
  914. If you need information about the remote server, use
  915. :meth:`get_server_info` instead.
  916. """
  917. self._check_state()
  918. if config:
  919. experimental_warn(
  920. "All configuration key-word arguments to "
  921. "verify_connectivity() are experimental. They might be "
  922. "changed or removed in any future version without prior "
  923. "notice."
  924. )
  925. session_config = self._read_session_config(config)
  926. await self._get_server_info(session_config)
  927. if t.TYPE_CHECKING:
  928. async def get_server_info(
  929. self,
  930. *,
  931. # all arguments are experimental
  932. # they may be change or removed any time without prior notice
  933. session_connection_timeout: float = ...,
  934. connection_acquisition_timeout: float = ...,
  935. max_transaction_retry_time: float = ...,
  936. database: str | None = ...,
  937. fetch_size: int = ...,
  938. impersonated_user: str | None = ...,
  939. bookmarks: t.Iterable[str] | Bookmarks | None = ...,
  940. default_access_mode: str = ...,
  941. bookmark_manager: (
  942. AsyncBookmarkManager | BookmarkManager | None
  943. ) = ...,
  944. auth: Auth | tuple[str, str] = ...,
  945. notifications_min_severity: (
  946. T_NotificationMinimumSeverity | None
  947. ) = ...,
  948. notifications_disabled_categories: (
  949. t.Iterable[T_NotificationDisabledCategory] | None
  950. ) = ...,
  951. notifications_disabled_classifications: (
  952. t.Iterable[T_NotificationDisabledCategory] | None
  953. ) = ...,
  954. # undocumented/unsupported options
  955. initial_retry_delay: float = ...,
  956. retry_delay_multiplier: float = ...,
  957. retry_delay_jitter_factor: float = ...,
  958. ) -> ServerInfo: ...
  959. else:
  960. async def get_server_info(self, **config) -> ServerInfo:
  961. """
  962. Get information about the connected Neo4j server.
  963. Try to establish a working read connection to the remote server or
  964. a member of a cluster and exchange some data. Then return the
  965. contacted server's information.
  966. In a cluster, there is no guarantee about which server will be
  967. contacted.
  968. .. note::
  969. Even if this method raises an exception, the driver still needs
  970. to be closed via :meth:`close` to free up all resources.
  971. :param config: accepts the same configuration key-word arguments as
  972. :meth:`session`.
  973. .. warning::
  974. All configuration key-word arguments are experimental.
  975. They might be changed or removed in any future version
  976. without prior notice.
  977. :raises Exception: if the driver cannot connect to the remote.
  978. Use the exception to further understand the cause of the
  979. connectivity problem.
  980. .. versionadded:: 5.0
  981. """
  982. self._check_state()
  983. if config:
  984. experimental_warn(
  985. "All configuration key-word arguments to "
  986. "get_server_info() are experimental. They might be "
  987. "changed or removed in any future version without prior "
  988. "notice."
  989. )
  990. session_config = self._read_session_config(config)
  991. return await self._get_server_info(session_config)
  992. async def supports_multi_db(self) -> bool:
  993. """
  994. Check if the server or cluster supports multi-databases.
  995. :returns: Returns true if the server or cluster the driver connects to
  996. supports multi-databases, otherwise false.
  997. .. note::
  998. Feature support query based solely on the Bolt protocol version.
  999. The feature might still be disabled on the server side even if this
  1000. function return :data:`True`. It just guarantees that the driver
  1001. won't throw a :exc:`.ConfigurationError` when trying to use this
  1002. driver feature.
  1003. """
  1004. self._check_state()
  1005. session_config = self._read_session_config({})
  1006. async with self._session(session_config) as session:
  1007. await session._connect(READ_ACCESS)
  1008. assert session._connection
  1009. return session._connection.supports_multiple_databases
  1010. if t.TYPE_CHECKING:
  1011. async def verify_authentication(
  1012. self,
  1013. auth: Auth | tuple[str, str] | None = None,
  1014. # all other arguments are experimental
  1015. # they may be change or removed any time without prior notice
  1016. session_connection_timeout: float = ...,
  1017. connection_acquisition_timeout: float = ...,
  1018. max_transaction_retry_time: float = ...,
  1019. database: str | None = ...,
  1020. fetch_size: int = ...,
  1021. impersonated_user: str | None = ...,
  1022. bookmarks: t.Iterable[str] | Bookmarks | None = ...,
  1023. default_access_mode: str = ...,
  1024. bookmark_manager: (
  1025. AsyncBookmarkManager | BookmarkManager | None
  1026. ) = ...,
  1027. # undocumented/unsupported options
  1028. initial_retry_delay: float = ...,
  1029. retry_delay_multiplier: float = ...,
  1030. retry_delay_jitter_factor: float = ...,
  1031. ) -> bool: ...
  1032. else:
  1033. async def verify_authentication(
  1034. self,
  1035. auth: Auth | tuple[str, str] | None = None,
  1036. **config,
  1037. ) -> bool:
  1038. """
  1039. Verify that the authentication information is valid.
  1040. Like :meth:`.verify_connectivity`, but for checking authentication.
  1041. Try to establish a working read connection to the remote server or
  1042. a member of a cluster and exchange some data. In a cluster, there
  1043. is no guarantee about which server will be contacted. If the data
  1044. exchange is successful and the authentication information is valid,
  1045. :data:`True` is returned. Otherwise, the error will be matched
  1046. against a list of known authentication errors. If the error is on
  1047. that list, :data:`False` is returned indicating that the
  1048. authentication information is invalid. Otherwise, the error is
  1049. re-raised.
  1050. :param auth: authentication information to verify.
  1051. Same as the session config :ref:`auth-ref`.
  1052. :param config: accepts the same configuration key-word arguments as
  1053. :meth:`session`.
  1054. .. warning::
  1055. All configuration key-word arguments (except ``auth``) are
  1056. experimental. They might be changed or removed in any
  1057. future version without prior notice.
  1058. :raises Exception: if the driver cannot connect to the remote.
  1059. Use the exception to further understand the cause of the
  1060. connectivity problem.
  1061. .. versionadded:: 5.8
  1062. .. versionchanged:: 5.14 Stabilized from experimental.
  1063. """
  1064. self._check_state()
  1065. if config:
  1066. experimental_warn(
  1067. "All configuration key-word arguments but auth to "
  1068. "verify_authentication() are experimental. They might be "
  1069. "changed or removed in any future version without prior "
  1070. "notice."
  1071. )
  1072. if "database" not in config:
  1073. config["database"] = "system"
  1074. session_config = self._read_session_config(config)
  1075. session_config = SessionConfig(session_config, {"auth": auth})
  1076. async with self._session(session_config) as session:
  1077. try:
  1078. await session._verify_authentication()
  1079. except Neo4jError as exc:
  1080. if exc.code in {
  1081. "Neo.ClientError.Security.CredentialsExpired",
  1082. "Neo.ClientError.Security.Forbidden",
  1083. "Neo.ClientError.Security.TokenExpired",
  1084. "Neo.ClientError.Security.Unauthorized",
  1085. }:
  1086. return False
  1087. raise
  1088. return True
  1089. async def supports_session_auth(self) -> bool:
  1090. """
  1091. Check if the remote supports connection re-authentication.
  1092. :returns: Returns true if the server or cluster the driver connects to
  1093. supports re-authentication of existing connections, otherwise
  1094. false.
  1095. .. note::
  1096. Feature support query based solely on the Bolt protocol version.
  1097. The feature might still be disabled on the server side even if this
  1098. function return :data:`True`. It just guarantees that the driver
  1099. won't throw a :exc:`.ConfigurationError` when trying to use this
  1100. driver feature.
  1101. .. versionadded:: 5.8
  1102. """
  1103. self._check_state()
  1104. session_config = self._read_session_config({})
  1105. async with self._session(session_config) as session:
  1106. await session._connect(READ_ACCESS)
  1107. assert session._connection
  1108. return session._connection.supports_re_auth
  1109. async def _get_server_info(self, session_config) -> ServerInfo:
  1110. async with self._session(session_config) as session:
  1111. return await session._get_server_info()
  1112. async def _work(
  1113. tx: AsyncManagedTransaction,
  1114. query: te.LiteralString,
  1115. parameters: dict[str, t.Any],
  1116. transformer: t.Callable[[AsyncResult], t.Awaitable[_T]],
  1117. ) -> _T:
  1118. res = await tx.run(query, parameters)
  1119. return await transformer(res)
  1120. class AsyncBoltDriver(_Direct, AsyncDriver):
  1121. """
  1122. :class:`.AsyncBoltDriver` is instantiated for ``bolt`` URIs.
  1123. It addresses a single database machine. This may be a standalone server or
  1124. could be a specific member of a cluster.
  1125. Connections established by a :class:`.AsyncBoltDriver` are always made to
  1126. the exact host and port detailed in the URI.
  1127. This class is not supposed to be instantiated externally. Use
  1128. :meth:`AsyncGraphDatabase.driver` instead.
  1129. """
  1130. @classmethod
  1131. def open(cls, target, **config):
  1132. from .io import AsyncBoltPool
  1133. address = cls.parse_target(target)
  1134. pool_config, default_workspace_config = Config.consume_chain(
  1135. config, AsyncPoolConfig, WorkspaceConfig
  1136. )
  1137. pool = AsyncBoltPool.open(
  1138. address,
  1139. pool_config=pool_config,
  1140. workspace_config=default_workspace_config,
  1141. )
  1142. return cls(pool, default_workspace_config)
  1143. def __init__(self, pool, default_workspace_config):
  1144. _Direct.__init__(self, pool.address)
  1145. AsyncDriver.__init__(self, pool, default_workspace_config)
  1146. self._default_workspace_config = default_workspace_config
  1147. class AsyncNeo4jDriver(_Routing, AsyncDriver):
  1148. """
  1149. :class:`.AsyncNeo4jDriver` is instantiated for ``neo4j`` URIs.
  1150. The routing behaviour works in tandem with Neo4j's `Causal Clustering
  1151. <https://neo4j.com/docs/operations-manual/current/clustering/>`_
  1152. feature by directing read and write behaviour to appropriate
  1153. cluster members.
  1154. This class is not supposed to be instantiated externally. Use
  1155. :meth:`AsyncGraphDatabase.driver` instead.
  1156. """
  1157. @classmethod
  1158. def open(cls, *targets, routing_context=None, **config):
  1159. from .io import AsyncNeo4jPool
  1160. addresses = cls.parse_targets(*targets)
  1161. pool_config, default_workspace_config = Config.consume_chain(
  1162. config, AsyncPoolConfig, WorkspaceConfig
  1163. )
  1164. pool = AsyncNeo4jPool.open(
  1165. *addresses,
  1166. routing_context=routing_context,
  1167. pool_config=pool_config,
  1168. workspace_config=default_workspace_config,
  1169. )
  1170. return cls(pool, default_workspace_config)
  1171. def __init__(self, pool, default_workspace_config):
  1172. _Routing.__init__(self, [pool.address])
  1173. AsyncDriver.__init__(self, pool, default_workspace_config)
  1174. def _normalize_notifications_config(config_kwargs, *, driver_level=False):
  1175. list_config_keys = (
  1176. "notifications_disabled_categories",
  1177. "notifications_disabled_classifications",
  1178. )
  1179. for key in list_config_keys:
  1180. value = config_kwargs.get(key)
  1181. if value is not None:
  1182. config_kwargs[key] = [getattr(e, "value", e) for e in value]
  1183. disabled_categories = config_kwargs.pop(
  1184. "notifications_disabled_categories", None
  1185. )
  1186. if disabled_categories is not None:
  1187. disabled_classifications = config_kwargs.get(
  1188. "notifications_disabled_classifications"
  1189. )
  1190. if disabled_classifications is None:
  1191. disabled_classifications = disabled_categories
  1192. else:
  1193. disabled_classifications = list(
  1194. {*disabled_categories, *disabled_classifications}
  1195. )
  1196. config_kwargs["notifications_disabled_classifications"] = (
  1197. disabled_classifications
  1198. )
  1199. single_config_keys = (
  1200. "notifications_min_severity",
  1201. "warn_notification_severity",
  1202. )
  1203. for key in single_config_keys:
  1204. value = config_kwargs.get(key)
  1205. if value is not None:
  1206. config_kwargs[key] = getattr(value, "value", value)
  1207. value = config_kwargs.get("warn_notification_severity")
  1208. if value not in {*NotificationMinimumSeverity, None}:
  1209. raise ValueError(
  1210. f"Invalid value for configuration "
  1211. f"warn_notification_severity: {value}. Should be None, a "
  1212. f"NotificationMinimumSeverity, or a string representing a "
  1213. f"NotificationMinimumSeverity."
  1214. )
  1215. if driver_level:
  1216. if value is None:
  1217. if DEBUG_ENABLED:
  1218. config_kwargs["warn_notification_severity"] = (
  1219. NotificationMinimumSeverity.INFORMATION
  1220. )
  1221. elif value == NotificationMinimumSeverity.OFF:
  1222. config_kwargs["warn_notification_severity"] = None