base.py 40 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171
  1. # pool/base.py
  2. # Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. """Base constructs for connection pools.
  8. """
  9. from collections import deque
  10. import time
  11. import weakref
  12. from .. import event
  13. from .. import exc
  14. from .. import log
  15. from .. import util
  16. reset_rollback = util.symbol("reset_rollback")
  17. reset_commit = util.symbol("reset_commit")
  18. reset_none = util.symbol("reset_none")
  19. class _ConnDialect(object):
  20. """partial implementation of :class:`.Dialect`
  21. which provides DBAPI connection methods.
  22. When a :class:`_pool.Pool` is combined with an :class:`_engine.Engine`,
  23. the :class:`_engine.Engine` replaces this with its own
  24. :class:`.Dialect`.
  25. """
  26. is_async = False
  27. has_terminate = False
  28. def do_rollback(self, dbapi_connection):
  29. dbapi_connection.rollback()
  30. def do_commit(self, dbapi_connection):
  31. dbapi_connection.commit()
  32. def do_terminate(self, dbapi_connection):
  33. dbapi_connection.close()
  34. def do_close(self, dbapi_connection):
  35. dbapi_connection.close()
  36. def do_ping(self, dbapi_connection):
  37. raise NotImplementedError(
  38. "The ping feature requires that a dialect is "
  39. "passed to the connection pool."
  40. )
  41. def get_driver_connection(self, connection):
  42. return connection
  43. class _AsyncConnDialect(_ConnDialect):
  44. is_async = True
  45. class Pool(log.Identified):
  46. """Abstract base class for connection pools."""
  47. _dialect = _ConnDialect()
  48. def __init__(
  49. self,
  50. creator,
  51. recycle=-1,
  52. echo=None,
  53. logging_name=None,
  54. reset_on_return=True,
  55. events=None,
  56. dialect=None,
  57. pre_ping=False,
  58. _dispatch=None,
  59. ):
  60. """
  61. Construct a Pool.
  62. :param creator: a callable function that returns a DB-API
  63. connection object. The function will be called with
  64. parameters.
  65. :param recycle: If set to a value other than -1, number of
  66. seconds between connection recycling, which means upon
  67. checkout, if this timeout is surpassed the connection will be
  68. closed and replaced with a newly opened connection. Defaults to -1.
  69. :param logging_name: String identifier which will be used within
  70. the "name" field of logging records generated within the
  71. "sqlalchemy.pool" logger. Defaults to a hexstring of the object's
  72. id.
  73. :param echo: if True, the connection pool will log
  74. informational output such as when connections are invalidated
  75. as well as when connections are recycled to the default log handler,
  76. which defaults to ``sys.stdout`` for output.. If set to the string
  77. ``"debug"``, the logging will include pool checkouts and checkins.
  78. The :paramref:`_pool.Pool.echo` parameter can also be set from the
  79. :func:`_sa.create_engine` call by using the
  80. :paramref:`_sa.create_engine.echo_pool` parameter.
  81. .. seealso::
  82. :ref:`dbengine_logging` - further detail on how to configure
  83. logging.
  84. :param reset_on_return: Determine steps to take on
  85. connections as they are returned to the pool, which were
  86. not otherwise handled by a :class:`_engine.Connection`.
  87. Available from :func:`_sa.create_engine` via the
  88. :paramref:`_sa.create_engine.pool_reset_on_return` parameter.
  89. :paramref:`_pool.Pool.reset_on_return` can have any of these values:
  90. * ``"rollback"`` - call rollback() on the connection,
  91. to release locks and transaction resources.
  92. This is the default value. The vast majority
  93. of use cases should leave this value set.
  94. * ``"commit"`` - call commit() on the connection,
  95. to release locks and transaction resources.
  96. A commit here may be desirable for databases that
  97. cache query plans if a commit is emitted,
  98. such as Microsoft SQL Server. However, this
  99. value is more dangerous than 'rollback' because
  100. any data changes present on the transaction
  101. are committed unconditionally.
  102. * ``None`` - don't do anything on the connection.
  103. This setting may be appropriate if the database / DBAPI
  104. works in pure "autocommit" mode at all times, or if
  105. a custom reset handler is established using the
  106. :meth:`.PoolEvents.reset` event handler.
  107. * ``True`` - same as 'rollback', this is here for
  108. backwards compatibility.
  109. * ``False`` - same as None, this is here for
  110. backwards compatibility.
  111. For further customization of reset on return, the
  112. :meth:`.PoolEvents.reset` event hook may be used which can perform
  113. any connection activity desired on reset. (requires version 1.4.43
  114. or greater)
  115. .. seealso::
  116. :ref:`pool_reset_on_return`
  117. :param events: a list of 2-tuples, each of the form
  118. ``(callable, target)`` which will be passed to :func:`.event.listen`
  119. upon construction. Provided here so that event listeners
  120. can be assigned via :func:`_sa.create_engine` before dialect-level
  121. listeners are applied.
  122. :param dialect: a :class:`.Dialect` that will handle the job
  123. of calling rollback(), close(), or commit() on DBAPI connections.
  124. If omitted, a built-in "stub" dialect is used. Applications that
  125. make use of :func:`_sa.create_engine` should not use this parameter
  126. as it is handled by the engine creation strategy.
  127. .. versionadded:: 1.1 - ``dialect`` is now a public parameter
  128. to the :class:`_pool.Pool`.
  129. :param pre_ping: if True, the pool will emit a "ping" (typically
  130. "SELECT 1", but is dialect-specific) on the connection
  131. upon checkout, to test if the connection is alive or not. If not,
  132. the connection is transparently re-connected and upon success, all
  133. other pooled connections established prior to that timestamp are
  134. invalidated. Requires that a dialect is passed as well to
  135. interpret the disconnection error.
  136. .. versionadded:: 1.2
  137. """
  138. if logging_name:
  139. self.logging_name = self._orig_logging_name = logging_name
  140. else:
  141. self._orig_logging_name = None
  142. log.instance_logger(self, echoflag=echo)
  143. self._creator = creator
  144. self._recycle = recycle
  145. self._invalidate_time = 0
  146. self._pre_ping = pre_ping
  147. self._reset_on_return = util.symbol.parse_user_argument(
  148. reset_on_return,
  149. {
  150. reset_rollback: ["rollback", True],
  151. reset_none: ["none", None, False],
  152. reset_commit: ["commit"],
  153. },
  154. "reset_on_return",
  155. resolve_symbol_names=False,
  156. )
  157. self.echo = echo
  158. if _dispatch:
  159. self.dispatch._update(_dispatch, only_propagate=False)
  160. if dialect:
  161. self._dialect = dialect
  162. if events:
  163. for fn, target in events:
  164. event.listen(self, target, fn)
  165. @util.hybridproperty
  166. def _is_asyncio(self):
  167. return self._dialect.is_async
  168. @property
  169. def _creator(self):
  170. return self.__dict__["_creator"]
  171. @_creator.setter
  172. def _creator(self, creator):
  173. self.__dict__["_creator"] = creator
  174. self._invoke_creator = self._should_wrap_creator(creator)
  175. def _should_wrap_creator(self, creator):
  176. """Detect if creator accepts a single argument, or is sent
  177. as a legacy style no-arg function.
  178. """
  179. try:
  180. argspec = util.get_callable_argspec(self._creator, no_self=True)
  181. except TypeError:
  182. return lambda crec: creator()
  183. defaulted = argspec[3] is not None and len(argspec[3]) or 0
  184. positionals = len(argspec[0]) - defaulted
  185. # look for the exact arg signature that DefaultStrategy
  186. # sends us
  187. if (argspec[0], argspec[3]) == (["connection_record"], (None,)):
  188. return creator
  189. # or just a single positional
  190. elif positionals == 1:
  191. return creator
  192. # all other cases, just wrap and assume legacy "creator" callable
  193. # thing
  194. else:
  195. return lambda crec: creator()
  196. def _close_connection(self, connection, terminate=False):
  197. self.logger.debug(
  198. "%s connection %r",
  199. "Hard-closing" if terminate else "Closing",
  200. connection,
  201. )
  202. try:
  203. if terminate:
  204. self._dialect.do_terminate(connection)
  205. else:
  206. self._dialect.do_close(connection)
  207. except BaseException as e:
  208. self.logger.error(
  209. "Exception closing connection %r", connection, exc_info=True
  210. )
  211. if not isinstance(e, Exception):
  212. raise
  213. def _create_connection(self):
  214. """Called by subclasses to create a new ConnectionRecord."""
  215. return _ConnectionRecord(self)
  216. def _invalidate(self, connection, exception=None, _checkin=True):
  217. """Mark all connections established within the generation
  218. of the given connection as invalidated.
  219. If this pool's last invalidate time is before when the given
  220. connection was created, update the timestamp til now. Otherwise,
  221. no action is performed.
  222. Connections with a start time prior to this pool's invalidation
  223. time will be recycled upon next checkout.
  224. """
  225. rec = getattr(connection, "_connection_record", None)
  226. if not rec or self._invalidate_time < rec.starttime:
  227. self._invalidate_time = time.time()
  228. if _checkin and getattr(connection, "is_valid", False):
  229. connection.invalidate(exception)
  230. def recreate(self):
  231. """Return a new :class:`_pool.Pool`, of the same class as this one
  232. and configured with identical creation arguments.
  233. This method is used in conjunction with :meth:`dispose`
  234. to close out an entire :class:`_pool.Pool` and create a new one in
  235. its place.
  236. """
  237. raise NotImplementedError()
  238. def dispose(self):
  239. """Dispose of this pool.
  240. This method leaves the possibility of checked-out connections
  241. remaining open, as it only affects connections that are
  242. idle in the pool.
  243. .. seealso::
  244. :meth:`Pool.recreate`
  245. """
  246. raise NotImplementedError()
  247. def connect(self):
  248. """Return a DBAPI connection from the pool.
  249. The connection is instrumented such that when its
  250. ``close()`` method is called, the connection will be returned to
  251. the pool.
  252. """
  253. return _ConnectionFairy._checkout(self)
  254. def _return_conn(self, record):
  255. """Given a _ConnectionRecord, return it to the :class:`_pool.Pool`.
  256. This method is called when an instrumented DBAPI connection
  257. has its ``close()`` method called.
  258. """
  259. self._do_return_conn(record)
  260. def _do_get(self):
  261. """Implementation for :meth:`get`, supplied by subclasses."""
  262. raise NotImplementedError()
  263. def _do_return_conn(self, conn):
  264. """Implementation for :meth:`return_conn`, supplied by subclasses."""
  265. raise NotImplementedError()
  266. def status(self):
  267. raise NotImplementedError()
  268. class _ConnectionRecord(object):
  269. """Internal object which maintains an individual DBAPI connection
  270. referenced by a :class:`_pool.Pool`.
  271. The :class:`._ConnectionRecord` object always exists for any particular
  272. DBAPI connection whether or not that DBAPI connection has been
  273. "checked out". This is in contrast to the :class:`._ConnectionFairy`
  274. which is only a public facade to the DBAPI connection while it is checked
  275. out.
  276. A :class:`._ConnectionRecord` may exist for a span longer than that
  277. of a single DBAPI connection. For example, if the
  278. :meth:`._ConnectionRecord.invalidate`
  279. method is called, the DBAPI connection associated with this
  280. :class:`._ConnectionRecord`
  281. will be discarded, but the :class:`._ConnectionRecord` may be used again,
  282. in which case a new DBAPI connection is produced when the
  283. :class:`_pool.Pool`
  284. next uses this record.
  285. The :class:`._ConnectionRecord` is delivered along with connection
  286. pool events, including :meth:`_events.PoolEvents.connect` and
  287. :meth:`_events.PoolEvents.checkout`, however :class:`._ConnectionRecord`
  288. still
  289. remains an internal object whose API and internals may change.
  290. .. seealso::
  291. :class:`._ConnectionFairy`
  292. """
  293. def __init__(self, pool, connect=True):
  294. self.__pool = pool
  295. if connect:
  296. self.__connect()
  297. self.finalize_callback = deque()
  298. fresh = False
  299. fairy_ref = None
  300. starttime = None
  301. dbapi_connection = None
  302. """A reference to the actual DBAPI connection being tracked.
  303. May be ``None`` if this :class:`._ConnectionRecord` has been marked
  304. as invalidated; a new DBAPI connection may replace it if the owning
  305. pool calls upon this :class:`._ConnectionRecord` to reconnect.
  306. For adapted drivers, like the Asyncio implementations, this is a
  307. :class:`.AdaptedConnection` that adapts the driver connection
  308. to the DBAPI protocol.
  309. Use :attr:`._ConnectionRecord.driver_connection` to obtain the
  310. connection objected returned by the driver.
  311. .. versionadded:: 1.4.24
  312. """
  313. @property
  314. def driver_connection(self):
  315. """The connection object as returned by the driver after a connect.
  316. For normal sync drivers that support the DBAPI protocol, this object
  317. is the same as the one referenced by
  318. :attr:`._ConnectionRecord.dbapi_connection`.
  319. For adapted drivers, like the Asyncio ones, this is the actual object
  320. that was returned by the driver ``connect`` call.
  321. As :attr:`._ConnectionRecord.dbapi_connection` it may be ``None``
  322. if this :class:`._ConnectionRecord` has been marked as invalidated.
  323. .. versionadded:: 1.4.24
  324. """
  325. if self.dbapi_connection is None:
  326. return None
  327. else:
  328. return self.__pool._dialect.get_driver_connection(
  329. self.dbapi_connection
  330. )
  331. @property
  332. def connection(self):
  333. """An alias to :attr:`._ConnectionRecord.dbapi_connection`.
  334. This alias is deprecated, please use the new name.
  335. .. deprecated:: 1.4.24
  336. """
  337. return self.dbapi_connection
  338. @connection.setter
  339. def connection(self, value):
  340. self.dbapi_connection = value
  341. _soft_invalidate_time = 0
  342. @util.memoized_property
  343. def info(self):
  344. """The ``.info`` dictionary associated with the DBAPI connection.
  345. This dictionary is shared among the :attr:`._ConnectionFairy.info`
  346. and :attr:`_engine.Connection.info` accessors.
  347. .. note::
  348. The lifespan of this dictionary is linked to the
  349. DBAPI connection itself, meaning that it is **discarded** each time
  350. the DBAPI connection is closed and/or invalidated. The
  351. :attr:`._ConnectionRecord.record_info` dictionary remains
  352. persistent throughout the lifespan of the
  353. :class:`._ConnectionRecord` container.
  354. """
  355. return {}
  356. @util.memoized_property
  357. def record_info(self):
  358. """An "info' dictionary associated with the connection record
  359. itself.
  360. Unlike the :attr:`._ConnectionRecord.info` dictionary, which is linked
  361. to the lifespan of the DBAPI connection, this dictionary is linked
  362. to the lifespan of the :class:`._ConnectionRecord` container itself
  363. and will remain persistent throughout the life of the
  364. :class:`._ConnectionRecord`.
  365. .. versionadded:: 1.1
  366. """
  367. return {}
  368. @classmethod
  369. def checkout(cls, pool):
  370. rec = pool._do_get()
  371. try:
  372. dbapi_connection = rec.get_connection()
  373. except BaseException as err:
  374. with util.safe_reraise():
  375. rec._checkin_failed(err, _fairy_was_created=False)
  376. # never called, this is for code linters
  377. raise
  378. echo = pool._should_log_debug()
  379. fairy = _ConnectionFairy(dbapi_connection, rec, echo)
  380. rec.fairy_ref = ref = weakref.ref(
  381. fairy,
  382. lambda ref: _finalize_fairy
  383. and _finalize_fairy(
  384. None, rec, pool, ref, echo, transaction_was_reset=False
  385. ),
  386. )
  387. _strong_ref_connection_records[ref] = rec
  388. if echo:
  389. pool.logger.debug(
  390. "Connection %r checked out from pool", dbapi_connection
  391. )
  392. return fairy
  393. def _checkin_failed(self, err, _fairy_was_created=True):
  394. self.invalidate(e=err)
  395. self.checkin(
  396. _fairy_was_created=_fairy_was_created,
  397. )
  398. def checkin(self, _fairy_was_created=True):
  399. if self.fairy_ref is None and _fairy_was_created:
  400. # _fairy_was_created is False for the initial get connection phase;
  401. # meaning there was no _ConnectionFairy and we must unconditionally
  402. # do a checkin.
  403. #
  404. # otherwise, if fairy_was_created==True, if fairy_ref is None here
  405. # that means we were checked in already, so this looks like
  406. # a double checkin.
  407. util.warn("Double checkin attempted on %s" % self)
  408. return
  409. self.fairy_ref = None
  410. connection = self.dbapi_connection
  411. pool = self.__pool
  412. while self.finalize_callback:
  413. finalizer = self.finalize_callback.pop()
  414. finalizer(connection)
  415. if pool.dispatch.checkin:
  416. pool.dispatch.checkin(connection, self)
  417. pool._return_conn(self)
  418. @property
  419. def in_use(self):
  420. return self.fairy_ref is not None
  421. @property
  422. def last_connect_time(self):
  423. return self.starttime
  424. def close(self):
  425. if self.dbapi_connection is not None:
  426. self.__close()
  427. def invalidate(self, e=None, soft=False):
  428. """Invalidate the DBAPI connection held by this
  429. :class:`._ConnectionRecord`.
  430. This method is called for all connection invalidations, including
  431. when the :meth:`._ConnectionFairy.invalidate` or
  432. :meth:`_engine.Connection.invalidate` methods are called,
  433. as well as when any
  434. so-called "automatic invalidation" condition occurs.
  435. :param e: an exception object indicating a reason for the
  436. invalidation.
  437. :param soft: if True, the connection isn't closed; instead, this
  438. connection will be recycled on next checkout.
  439. .. versionadded:: 1.0.3
  440. .. seealso::
  441. :ref:`pool_connection_invalidation`
  442. """
  443. # already invalidated
  444. if self.dbapi_connection is None:
  445. return
  446. if soft:
  447. self.__pool.dispatch.soft_invalidate(
  448. self.dbapi_connection, self, e
  449. )
  450. else:
  451. self.__pool.dispatch.invalidate(self.dbapi_connection, self, e)
  452. if e is not None:
  453. self.__pool.logger.info(
  454. "%sInvalidate connection %r (reason: %s:%s)",
  455. "Soft " if soft else "",
  456. self.dbapi_connection,
  457. e.__class__.__name__,
  458. e,
  459. )
  460. else:
  461. self.__pool.logger.info(
  462. "%sInvalidate connection %r",
  463. "Soft " if soft else "",
  464. self.dbapi_connection,
  465. )
  466. if soft:
  467. self._soft_invalidate_time = time.time()
  468. else:
  469. self.__close(terminate=True)
  470. self.dbapi_connection = None
  471. def get_connection(self):
  472. recycle = False
  473. # NOTE: the various comparisons here are assuming that measurable time
  474. # passes between these state changes. however, time.time() is not
  475. # guaranteed to have sub-second precision. comparisons of
  476. # "invalidation time" to "starttime" should perhaps use >= so that the
  477. # state change can take place assuming no measurable time has passed,
  478. # however this does not guarantee correct behavior here as if time
  479. # continues to not pass, it will try to reconnect repeatedly until
  480. # these timestamps diverge, so in that sense using > is safer. Per
  481. # https://stackoverflow.com/a/1938096/34549, Windows time.time() may be
  482. # within 16 milliseconds accuracy, so unit tests for connection
  483. # invalidation need a sleep of at least this long between initial start
  484. # time and invalidation for the logic below to work reliably.
  485. if self.dbapi_connection is None:
  486. self.info.clear()
  487. self.__connect()
  488. elif (
  489. self.__pool._recycle > -1
  490. and time.time() - self.starttime > self.__pool._recycle
  491. ):
  492. self.__pool.logger.info(
  493. "Connection %r exceeded timeout; recycling",
  494. self.dbapi_connection,
  495. )
  496. recycle = True
  497. elif self.__pool._invalidate_time > self.starttime:
  498. self.__pool.logger.info(
  499. "Connection %r invalidated due to pool invalidation; "
  500. + "recycling",
  501. self.dbapi_connection,
  502. )
  503. recycle = True
  504. elif self._soft_invalidate_time > self.starttime:
  505. self.__pool.logger.info(
  506. "Connection %r invalidated due to local soft invalidation; "
  507. + "recycling",
  508. self.dbapi_connection,
  509. )
  510. recycle = True
  511. if recycle:
  512. self.__close(terminate=True)
  513. self.info.clear()
  514. self.__connect()
  515. return self.dbapi_connection
  516. def _is_hard_or_soft_invalidated(self):
  517. return (
  518. self.dbapi_connection is None
  519. or self.__pool._invalidate_time > self.starttime
  520. or (self._soft_invalidate_time > self.starttime)
  521. )
  522. def __close(self, terminate=False):
  523. self.finalize_callback.clear()
  524. if self.__pool.dispatch.close:
  525. self.__pool.dispatch.close(self.dbapi_connection, self)
  526. self.__pool._close_connection(
  527. self.dbapi_connection, terminate=terminate
  528. )
  529. self.dbapi_connection = None
  530. def __connect(self):
  531. pool = self.__pool
  532. # ensure any existing connection is removed, so that if
  533. # creator fails, this attribute stays None
  534. self.dbapi_connection = None
  535. try:
  536. self.starttime = time.time()
  537. self.dbapi_connection = connection = pool._invoke_creator(self)
  538. pool.logger.debug("Created new connection %r", connection)
  539. self.fresh = True
  540. except BaseException as e:
  541. with util.safe_reraise():
  542. pool.logger.debug("Error on connect(): %s", e)
  543. else:
  544. # in SQLAlchemy 1.4 the first_connect event is not used by
  545. # the engine, so this will usually not be set
  546. if pool.dispatch.first_connect:
  547. pool.dispatch.first_connect.for_modify(
  548. pool.dispatch
  549. ).exec_once_unless_exception(self.dbapi_connection, self)
  550. # init of the dialect now takes place within the connect
  551. # event, so ensure a mutex is used on the first run
  552. pool.dispatch.connect.for_modify(
  553. pool.dispatch
  554. )._exec_w_sync_on_first_run(self.dbapi_connection, self)
  555. def _finalize_fairy(
  556. dbapi_connection,
  557. connection_record,
  558. pool,
  559. ref, # this is None when called directly, not by the gc
  560. echo,
  561. transaction_was_reset=False,
  562. fairy=None,
  563. ):
  564. """Cleanup for a :class:`._ConnectionFairy` whether or not it's already
  565. been garbage collected.
  566. When using an async dialect no IO can happen here (without using
  567. a dedicated thread), since this is called outside the greenlet
  568. context and with an already running loop. In this case function
  569. will only log a message and raise a warning.
  570. """
  571. if ref:
  572. _strong_ref_connection_records.pop(ref, None)
  573. elif fairy:
  574. _strong_ref_connection_records.pop(weakref.ref(fairy), None)
  575. if ref is not None:
  576. if connection_record.fairy_ref is not ref:
  577. return
  578. assert dbapi_connection is None
  579. dbapi_connection = connection_record.dbapi_connection
  580. # null pool is not _is_asyncio but can be used also with async dialects
  581. dont_restore_gced = (
  582. pool._dialect.is_async and not pool._dialect.has_terminate
  583. )
  584. if dont_restore_gced:
  585. detach = not connection_record or ref
  586. can_manipulate_connection = not ref
  587. else:
  588. detach = not connection_record
  589. can_manipulate_connection = True
  590. if dbapi_connection is not None:
  591. if connection_record and echo:
  592. pool.logger.debug(
  593. "Connection %r being returned to pool",
  594. dbapi_connection,
  595. )
  596. try:
  597. fairy = fairy or _ConnectionFairy(
  598. dbapi_connection,
  599. connection_record,
  600. echo,
  601. )
  602. assert fairy.dbapi_connection is dbapi_connection
  603. if can_manipulate_connection:
  604. fairy._reset(pool, transaction_was_reset)
  605. if detach:
  606. if connection_record:
  607. fairy._pool = pool
  608. fairy.detach()
  609. if can_manipulate_connection:
  610. if pool.dispatch.close_detached:
  611. pool.dispatch.close_detached(dbapi_connection)
  612. pool._close_connection(dbapi_connection)
  613. else:
  614. message = (
  615. "The garbage collector is trying to clean up "
  616. "connection %r. This feature is unsupported on "
  617. "unsupported on asyncio "
  618. 'dbapis that lack a "terminate" feature, '
  619. "since no IO can be performed at this stage to "
  620. "reset the connection. Please close out all "
  621. "connections when they are no longer used, calling "
  622. "``close()`` or using a context manager to "
  623. "manage their lifetime."
  624. ) % dbapi_connection
  625. pool.logger.error(message)
  626. util.warn(message)
  627. except BaseException as e:
  628. pool.logger.error(
  629. "Exception during reset or similar", exc_info=True
  630. )
  631. if connection_record:
  632. connection_record.invalidate(e=e)
  633. if not isinstance(e, Exception):
  634. raise
  635. if connection_record and connection_record.fairy_ref is not None:
  636. connection_record.checkin()
  637. # a dictionary of the _ConnectionFairy weakrefs to _ConnectionRecord, so that
  638. # GC under pypy will call ConnectionFairy finalizers. linked directly to the
  639. # weakref that will empty itself when collected so that it should not create
  640. # any unmanaged memory references.
  641. _strong_ref_connection_records = {}
  642. class _ConnectionFairy(object):
  643. """Proxies a DBAPI connection and provides return-on-dereference
  644. support.
  645. This is an internal object used by the :class:`_pool.Pool` implementation
  646. to provide context management to a DBAPI connection delivered by
  647. that :class:`_pool.Pool`.
  648. The name "fairy" is inspired by the fact that the
  649. :class:`._ConnectionFairy` object's lifespan is transitory, as it lasts
  650. only for the length of a specific DBAPI connection being checked out from
  651. the pool, and additionally that as a transparent proxy, it is mostly
  652. invisible.
  653. .. seealso::
  654. :class:`._ConnectionRecord`
  655. """
  656. def __init__(self, dbapi_connection, connection_record, echo):
  657. self.dbapi_connection = dbapi_connection
  658. self._connection_record = connection_record
  659. self._echo = echo
  660. dbapi_connection = None
  661. """A reference to the actual DBAPI connection being tracked.
  662. .. versionadded:: 1.4.24
  663. .. seealso::
  664. :attr:`._ConnectionFairy.driver_connection`
  665. :attr:`._ConnectionRecord.dbapi_connection`
  666. :ref:`faq_dbapi_connection`
  667. """
  668. _connection_record = None
  669. """A reference to the :class:`._ConnectionRecord` object associated
  670. with the DBAPI connection.
  671. This is currently an internal accessor which is subject to change.
  672. """
  673. @property
  674. def driver_connection(self):
  675. """The connection object as returned by the driver after a connect.
  676. .. versionadded:: 1.4.24
  677. .. seealso::
  678. :attr:`._ConnectionFairy.dbapi_connection`
  679. :attr:`._ConnectionRecord.driver_connection`
  680. :ref:`faq_dbapi_connection`
  681. """
  682. return self._connection_record.driver_connection
  683. @property
  684. def connection(self):
  685. """An alias to :attr:`._ConnectionFairy.dbapi_connection`.
  686. This alias is deprecated, please use the new name.
  687. .. deprecated:: 1.4.24
  688. """
  689. return self.dbapi_connection
  690. @connection.setter
  691. def connection(self, value):
  692. self.dbapi_connection = value
  693. @classmethod
  694. def _checkout(cls, pool, threadconns=None, fairy=None):
  695. if not fairy:
  696. fairy = _ConnectionRecord.checkout(pool)
  697. fairy._pool = pool
  698. fairy._counter = 0
  699. if threadconns is not None:
  700. threadconns.current = weakref.ref(fairy)
  701. if fairy.dbapi_connection is None:
  702. raise exc.InvalidRequestError("This connection is closed")
  703. fairy._counter += 1
  704. if (
  705. not pool.dispatch.checkout and not pool._pre_ping
  706. ) or fairy._counter != 1:
  707. return fairy
  708. # Pool listeners can trigger a reconnection on checkout, as well
  709. # as the pre-pinger.
  710. # there are three attempts made here, but note that if the database
  711. # is not accessible from a connection standpoint, those won't proceed
  712. # here.
  713. attempts = 2
  714. while attempts > 0:
  715. connection_is_fresh = fairy._connection_record.fresh
  716. fairy._connection_record.fresh = False
  717. try:
  718. if pool._pre_ping:
  719. if not connection_is_fresh:
  720. if fairy._echo:
  721. pool.logger.debug(
  722. "Pool pre-ping on connection %s",
  723. fairy.dbapi_connection,
  724. )
  725. result = pool._dialect.do_ping(fairy.dbapi_connection)
  726. if not result:
  727. if fairy._echo:
  728. pool.logger.debug(
  729. "Pool pre-ping on connection %s failed, "
  730. "will invalidate pool",
  731. fairy.dbapi_connection,
  732. )
  733. raise exc.InvalidatePoolError()
  734. elif fairy._echo:
  735. pool.logger.debug(
  736. "Connection %s is fresh, skipping pre-ping",
  737. fairy.dbapi_connection,
  738. )
  739. pool.dispatch.checkout(
  740. fairy.dbapi_connection, fairy._connection_record, fairy
  741. )
  742. return fairy
  743. except exc.DisconnectionError as e:
  744. if e.invalidate_pool:
  745. pool.logger.info(
  746. "Disconnection detected on checkout, "
  747. "invalidating all pooled connections prior to "
  748. "current timestamp (reason: %r)",
  749. e,
  750. )
  751. fairy._connection_record.invalidate(e)
  752. pool._invalidate(fairy, e, _checkin=False)
  753. else:
  754. pool.logger.info(
  755. "Disconnection detected on checkout, "
  756. "invalidating individual connection %s (reason: %r)",
  757. fairy.dbapi_connection,
  758. e,
  759. )
  760. fairy._connection_record.invalidate(e)
  761. try:
  762. fairy.dbapi_connection = (
  763. fairy._connection_record.get_connection()
  764. )
  765. except BaseException as err:
  766. with util.safe_reraise():
  767. fairy._connection_record._checkin_failed(
  768. err,
  769. _fairy_was_created=True,
  770. )
  771. # prevent _ConnectionFairy from being carried
  772. # in the stack trace. Do this after the
  773. # connection record has been checked in, so that
  774. # if the del triggers a finalize fairy, it won't
  775. # try to checkin a second time.
  776. del fairy
  777. attempts -= 1
  778. except BaseException as be_outer:
  779. with util.safe_reraise():
  780. rec = fairy._connection_record
  781. if rec is not None:
  782. rec._checkin_failed(
  783. be_outer,
  784. _fairy_was_created=True,
  785. )
  786. # prevent _ConnectionFairy from being carried
  787. # in the stack trace, see above
  788. del fairy
  789. # never called, this is for code linters
  790. raise
  791. pool.logger.info("Reconnection attempts exhausted on checkout")
  792. fairy.invalidate()
  793. raise exc.InvalidRequestError("This connection is closed")
  794. def _checkout_existing(self):
  795. return _ConnectionFairy._checkout(self._pool, fairy=self)
  796. def _checkin(self, transaction_was_reset=False):
  797. _finalize_fairy(
  798. self.dbapi_connection,
  799. self._connection_record,
  800. self._pool,
  801. None,
  802. self._echo,
  803. transaction_was_reset=transaction_was_reset,
  804. fairy=self,
  805. )
  806. self.dbapi_connection = None
  807. self._connection_record = None
  808. _close = _checkin
  809. def _reset(self, pool, transaction_was_reset=False):
  810. if pool.dispatch.reset:
  811. pool.dispatch.reset(self, self._connection_record)
  812. if pool._reset_on_return is reset_rollback:
  813. if transaction_was_reset:
  814. if self._echo:
  815. pool.logger.debug(
  816. "Connection %s reset, transaction already reset",
  817. self.dbapi_connection,
  818. )
  819. else:
  820. if self._echo:
  821. pool.logger.debug(
  822. "Connection %s rollback-on-return",
  823. self.dbapi_connection,
  824. )
  825. pool._dialect.do_rollback(self)
  826. elif pool._reset_on_return is reset_commit:
  827. if self._echo:
  828. pool.logger.debug(
  829. "Connection %s commit-on-return",
  830. self.dbapi_connection,
  831. )
  832. pool._dialect.do_commit(self)
  833. @property
  834. def _logger(self):
  835. return self._pool.logger
  836. @property
  837. def is_valid(self):
  838. """Return True if this :class:`._ConnectionFairy` still refers
  839. to an active DBAPI connection."""
  840. return self.dbapi_connection is not None
  841. @util.memoized_property
  842. def info(self):
  843. """Info dictionary associated with the underlying DBAPI connection
  844. referred to by this :class:`.ConnectionFairy`, allowing user-defined
  845. data to be associated with the connection.
  846. The data here will follow along with the DBAPI connection including
  847. after it is returned to the connection pool and used again
  848. in subsequent instances of :class:`._ConnectionFairy`. It is shared
  849. with the :attr:`._ConnectionRecord.info` and
  850. :attr:`_engine.Connection.info`
  851. accessors.
  852. The dictionary associated with a particular DBAPI connection is
  853. discarded when the connection itself is discarded.
  854. """
  855. return self._connection_record.info
  856. @property
  857. def record_info(self):
  858. """Info dictionary associated with the :class:`._ConnectionRecord
  859. container referred to by this :class:`.ConnectionFairy`.
  860. Unlike the :attr:`._ConnectionFairy.info` dictionary, the lifespan
  861. of this dictionary is persistent across connections that are
  862. disconnected and/or invalidated within the lifespan of a
  863. :class:`._ConnectionRecord`.
  864. .. versionadded:: 1.1
  865. """
  866. if self._connection_record:
  867. return self._connection_record.record_info
  868. else:
  869. return None
  870. def invalidate(self, e=None, soft=False):
  871. """Mark this connection as invalidated.
  872. This method can be called directly, and is also called as a result
  873. of the :meth:`_engine.Connection.invalidate` method. When invoked,
  874. the DBAPI connection is immediately closed and discarded from
  875. further use by the pool. The invalidation mechanism proceeds
  876. via the :meth:`._ConnectionRecord.invalidate` internal method.
  877. :param e: an exception object indicating a reason for the invalidation.
  878. :param soft: if True, the connection isn't closed; instead, this
  879. connection will be recycled on next checkout.
  880. .. versionadded:: 1.0.3
  881. .. seealso::
  882. :ref:`pool_connection_invalidation`
  883. """
  884. if self.dbapi_connection is None:
  885. util.warn("Can't invalidate an already-closed connection.")
  886. return
  887. if self._connection_record:
  888. self._connection_record.invalidate(e=e, soft=soft)
  889. if not soft:
  890. self.dbapi_connection = None
  891. self._checkin()
  892. def cursor(self, *args, **kwargs):
  893. """Return a new DBAPI cursor for the underlying connection.
  894. This method is a proxy for the ``connection.cursor()`` DBAPI
  895. method.
  896. """
  897. return self.dbapi_connection.cursor(*args, **kwargs)
  898. def __getattr__(self, key):
  899. return getattr(self.dbapi_connection, key)
  900. def detach(self):
  901. """Separate this connection from its Pool.
  902. This means that the connection will no longer be returned to the
  903. pool when closed, and will instead be literally closed. The
  904. containing ConnectionRecord is separated from the DB-API connection,
  905. and will create a new connection when next used.
  906. Note that any overall connection limiting constraints imposed by a
  907. Pool implementation may be violated after a detach, as the detached
  908. connection is removed from the pool's knowledge and control.
  909. """
  910. if self._connection_record is not None:
  911. rec = self._connection_record
  912. rec.fairy_ref = None
  913. rec.dbapi_connection = None
  914. # TODO: should this be _return_conn?
  915. self._pool._do_return_conn(self._connection_record)
  916. self.info = self.info.copy()
  917. self._connection_record = None
  918. if self._pool.dispatch.detach:
  919. self._pool.dispatch.detach(self.dbapi_connection, rec)
  920. def close(self):
  921. self._counter -= 1
  922. if self._counter == 0:
  923. self._checkin()
  924. def _close_special(self, transaction_reset=False):
  925. self._counter -= 1
  926. if self._counter == 0:
  927. self._checkin(transaction_was_reset=transaction_reset)