__init__.py 7.6 KB


  1. # Copyright 2016 Étienne Bersac
  2. # Copyright 2016 Julien Danjou
  3. # Copyright 2016 Joshua Harlow
  4. # Copyright 2013-2014 Ray Holder
  5. #
  6. # Licensed under the Apache License, Version 2.0 (the "License");
  7. # you may not use this file except in compliance with the License.
  8. # You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. import functools
  18. import sys
  19. import typing as t
  20. import tenacity
  21. from tenacity import AttemptManager
  22. from tenacity import BaseRetrying
  23. from tenacity import DoAttempt
  24. from tenacity import DoSleep
  25. from tenacity import RetryCallState
  26. from tenacity import RetryError
  27. from tenacity import after_nothing
  28. from tenacity import before_nothing
  29. from tenacity import _utils
  30. # Import all built-in retry strategies for easier usage.
  31. from .retry import RetryBaseT
  32. from .retry import retry_all # noqa
  33. from .retry import retry_any # noqa
  34. from .retry import retry_if_exception # noqa
  35. from .retry import retry_if_result # noqa
  36. from ..retry import RetryBaseT as SyncRetryBaseT
  37. if t.TYPE_CHECKING:
  38. from tenacity.stop import StopBaseT
  39. from tenacity.wait import WaitBaseT
  40. WrappedFnReturnT = t.TypeVar("WrappedFnReturnT")
  41. WrappedFn = t.TypeVar("WrappedFn", bound=t.Callable[..., t.Awaitable[t.Any]])
  42. def _portable_async_sleep(seconds: float) -> t.Awaitable[None]:
  43. # If trio is already imported, then importing it is cheap.
  44. # If trio isn't already imported, then it's definitely not running, so we
  45. # can skip further checks.
  46. if "trio" in sys.modules:
  47. # If trio is available, then sniffio is too
  48. import trio
  49. import sniffio
  50. if sniffio.current_async_library() == "trio":
  51. return trio.sleep(seconds)
  52. # Otherwise, assume asyncio
  53. # Lazy import asyncio as it's expensive (responsible for 25-50% of total import overhead).
  54. import asyncio
  55. return asyncio.sleep(seconds)
  56. class AsyncRetrying(BaseRetrying):
  57. def __init__(
  58. self,
  59. sleep: t.Callable[
  60. [t.Union[int, float]], t.Union[None, t.Awaitable[None]]
  61. ] = _portable_async_sleep,
  62. stop: "StopBaseT" = tenacity.stop.stop_never,
  63. wait: "WaitBaseT" = tenacity.wait.wait_none(),
  64. retry: "t.Union[SyncRetryBaseT, RetryBaseT]" = tenacity.retry_if_exception_type(),
  65. before: t.Callable[
  66. ["RetryCallState"], t.Union[None, t.Awaitable[None]]
  67. ] = before_nothing,
  68. after: t.Callable[
  69. ["RetryCallState"], t.Union[None, t.Awaitable[None]]
  70. ] = after_nothing,
  71. before_sleep: t.Optional[
  72. t.Callable[["RetryCallState"], t.Union[None, t.Awaitable[None]]]
  73. ] = None,
  74. reraise: bool = False,
  75. retry_error_cls: t.Type["RetryError"] = RetryError,
  76. retry_error_callback: t.Optional[
  77. t.Callable[["RetryCallState"], t.Union[t.Any, t.Awaitable[t.Any]]]
  78. ] = None,
  79. ) -> None:
  80. super().__init__(
  81. sleep=sleep, # type: ignore[arg-type]
  82. stop=stop,
  83. wait=wait,
  84. retry=retry, # type: ignore[arg-type]
  85. before=before, # type: ignore[arg-type]
  86. after=after, # type: ignore[arg-type]
  87. before_sleep=before_sleep, # type: ignore[arg-type]
  88. reraise=reraise,
  89. retry_error_cls=retry_error_cls,
  90. retry_error_callback=retry_error_callback,
  91. )
  92. async def __call__( # type: ignore[override]
  93. self, fn: WrappedFn, *args: t.Any, **kwargs: t.Any
  94. ) -> WrappedFnReturnT:
  95. self.begin()
  96. retry_state = RetryCallState(retry_object=self, fn=fn, args=args, kwargs=kwargs)
  97. while True:
  98. do = await self.iter(retry_state=retry_state)
  99. if isinstance(do, DoAttempt):
  100. try:
  101. result = await fn(*args, **kwargs)
  102. except BaseException: # noqa: B902
  103. retry_state.set_exception(sys.exc_info()) # type: ignore[arg-type]
  104. else:
  105. retry_state.set_result(result)
  106. elif isinstance(do, DoSleep):
  107. retry_state.prepare_for_next_attempt()
  108. await self.sleep(do) # type: ignore[misc]
  109. else:
  110. return do # type: ignore[no-any-return]
  111. def _add_action_func(self, fn: t.Callable[..., t.Any]) -> None:
  112. self.iter_state.actions.append(_utils.wrap_to_async_func(fn))
  113. async def _run_retry(self, retry_state: "RetryCallState") -> None: # type: ignore[override]
  114. self.iter_state.retry_run_result = await _utils.wrap_to_async_func(self.retry)(
  115. retry_state
  116. )
  117. async def _run_wait(self, retry_state: "RetryCallState") -> None: # type: ignore[override]
  118. if self.wait:
  119. sleep = await _utils.wrap_to_async_func(self.wait)(retry_state)
  120. else:
  121. sleep = 0.0
  122. retry_state.upcoming_sleep = sleep
  123. async def _run_stop(self, retry_state: "RetryCallState") -> None: # type: ignore[override]
  124. self.statistics["delay_since_first_attempt"] = retry_state.seconds_since_start
  125. self.iter_state.stop_run_result = await _utils.wrap_to_async_func(self.stop)(
  126. retry_state
  127. )
  128. async def iter(
  129. self, retry_state: "RetryCallState"
  130. ) -> t.Union[DoAttempt, DoSleep, t.Any]: # noqa: A003
  131. self._begin_iter(retry_state)
  132. result = None
  133. for action in self.iter_state.actions:
  134. result = await action(retry_state)
  135. return result
  136. def __iter__(self) -> t.Generator[AttemptManager, None, None]:
  137. raise TypeError("AsyncRetrying object is not iterable")
  138. def __aiter__(self) -> "AsyncRetrying":
  139. self.begin()
  140. self._retry_state = RetryCallState(self, fn=None, args=(), kwargs={})
  141. return self
  142. async def __anext__(self) -> AttemptManager:
  143. while True:
  144. do = await self.iter(retry_state=self._retry_state)
  145. if do is None:
  146. raise StopAsyncIteration
  147. elif isinstance(do, DoAttempt):
  148. return AttemptManager(retry_state=self._retry_state)
  149. elif isinstance(do, DoSleep):
  150. self._retry_state.prepare_for_next_attempt()
  151. await self.sleep(do) # type: ignore[misc]
  152. else:
  153. raise StopAsyncIteration
  154. def wraps(self, fn: WrappedFn) -> WrappedFn:
  155. wrapped = super().wraps(fn)
  156. # Ensure wrapper is recognized as a coroutine function.
  157. @functools.wraps(
  158. fn, functools.WRAPPER_ASSIGNMENTS + ("__defaults__", "__kwdefaults__")
  159. )
  160. async def async_wrapped(*args: t.Any, **kwargs: t.Any) -> t.Any:
  161. # Always create a copy to prevent overwriting the local contexts when
  162. # calling the same wrapped functions multiple times in the same stack
  163. copy = self.copy()
  164. async_wrapped.statistics = copy.statistics # type: ignore[attr-defined]
  165. return await copy(fn, *args, **kwargs)
  166. # Preserve attributes
  167. async_wrapped.retry = self # type: ignore[attr-defined]
  168. async_wrapped.retry_with = wrapped.retry_with # type: ignore[attr-defined]
  169. async_wrapped.statistics = {} # type: ignore[attr-defined]
  170. return async_wrapped # type: ignore[return-value]
  171. __all__ = [
  172. "retry_all",
  173. "retry_any",
  174. "retry_if_exception",
  175. "retry_if_result",
  176. "WrappedFn",
  177. "AsyncRetrying",
  178. ]