123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722 |
- # Copyright 2016-2018 Julien Danjou
- # Copyright 2017 Elisey Zanko
- # Copyright 2016 Étienne Bersac
- # Copyright 2016 Joshua Harlow
- # Copyright 2013-2014 Ray Holder
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import dataclasses
- import functools
- import sys
- import threading
- import time
- import typing as t
- import warnings
- from abc import ABC, abstractmethod
- from concurrent import futures
- from . import _utils
- # Import all built-in retry strategies for easier usage.
- from .retry import retry_base # noqa
- from .retry import retry_all # noqa
- from .retry import retry_always # noqa
- from .retry import retry_any # noqa
- from .retry import retry_if_exception # noqa
- from .retry import retry_if_exception_type # noqa
- from .retry import retry_if_exception_cause_type # noqa
- from .retry import retry_if_not_exception_type # noqa
- from .retry import retry_if_not_result # noqa
- from .retry import retry_if_result # noqa
- from .retry import retry_never # noqa
- from .retry import retry_unless_exception_type # noqa
- from .retry import retry_if_exception_message # noqa
- from .retry import retry_if_not_exception_message # noqa
- # Import all nap strategies for easier usage.
- from .nap import sleep # noqa
- from .nap import sleep_using_event # noqa
- # Import all built-in stop strategies for easier usage.
- from .stop import stop_after_attempt # noqa
- from .stop import stop_after_delay # noqa
- from .stop import stop_before_delay # noqa
- from .stop import stop_all # noqa
- from .stop import stop_any # noqa
- from .stop import stop_never # noqa
- from .stop import stop_when_event_set # noqa
- # Import all built-in wait strategies for easier usage.
- from .wait import wait_chain # noqa
- from .wait import wait_combine # noqa
- from .wait import wait_exponential # noqa
- from .wait import wait_fixed # noqa
- from .wait import wait_incrementing # noqa
- from .wait import wait_none # noqa
- from .wait import wait_random # noqa
- from .wait import wait_random_exponential # noqa
- from .wait import wait_random_exponential as wait_full_jitter # noqa
- from .wait import wait_exponential_jitter # noqa
- # Import all built-in before strategies for easier usage.
- from .before import before_log # noqa
- from .before import before_nothing # noqa
- # Import all built-in after strategies for easier usage.
- from .after import after_log # noqa
- from .after import after_nothing # noqa
- # Import all built-in before sleep strategies for easier usage.
- from .before_sleep import before_sleep_log # noqa
- from .before_sleep import before_sleep_nothing # noqa
- try:
- import tornado
- except ImportError:
- tornado = None
- if t.TYPE_CHECKING:
- import types
- from typing_extensions import Self
- from . import asyncio as tasyncio
- from .retry import RetryBaseT
- from .stop import StopBaseT
- from .wait import WaitBaseT
- WrappedFnReturnT = t.TypeVar("WrappedFnReturnT")
- WrappedFn = t.TypeVar("WrappedFn", bound=t.Callable[..., t.Any])
- dataclass_kwargs = {}
- if sys.version_info >= (3, 10):
- dataclass_kwargs.update({"slots": True})
- @dataclasses.dataclass(**dataclass_kwargs)
- class IterState:
- actions: t.List[t.Callable[["RetryCallState"], t.Any]] = dataclasses.field(
- default_factory=list
- )
- retry_run_result: bool = False
- delay_since_first_attempt: int = 0
- stop_run_result: bool = False
- is_explicit_retry: bool = False
- def reset(self) -> None:
- self.actions = []
- self.retry_run_result = False
- self.delay_since_first_attempt = 0
- self.stop_run_result = False
- self.is_explicit_retry = False
- class TryAgain(Exception):
- """Always retry the executed function when raised."""
- NO_RESULT = object()
- class DoAttempt:
- pass
- class DoSleep(float):
- pass
- class BaseAction:
- """Base class for representing actions to take by retry object.
- Concrete implementations must define:
- - __init__: to initialize all necessary fields
- - REPR_FIELDS: class variable specifying attributes to include in repr(self)
- - NAME: for identification in retry object methods and callbacks
- """
- REPR_FIELDS: t.Sequence[str] = ()
- NAME: t.Optional[str] = None
- def __repr__(self) -> str:
- state_str = ", ".join(
- f"{field}={getattr(self, field)!r}" for field in self.REPR_FIELDS
- )
- return f"{self.__class__.__name__}({state_str})"
- def __str__(self) -> str:
- return repr(self)
- class RetryAction(BaseAction):
- REPR_FIELDS = ("sleep",)
- NAME = "retry"
- def __init__(self, sleep: t.SupportsFloat) -> None:
- self.sleep = float(sleep)
- _unset = object()
- def _first_set(first: t.Union[t.Any, object], second: t.Any) -> t.Any:
- return second if first is _unset else first
- class RetryError(Exception):
- """Encapsulates the last attempt instance right before giving up."""
- def __init__(self, last_attempt: "Future") -> None:
- self.last_attempt = last_attempt
- super().__init__(last_attempt)
- def reraise(self) -> t.NoReturn:
- if self.last_attempt.failed:
- raise self.last_attempt.result()
- raise self
- def __str__(self) -> str:
- return f"{self.__class__.__name__}[{self.last_attempt}]"
- class AttemptManager:
- """Manage attempt context."""
- def __init__(self, retry_state: "RetryCallState"):
- self.retry_state = retry_state
- def __enter__(self) -> None:
- pass
- def __exit__(
- self,
- exc_type: t.Optional[t.Type[BaseException]],
- exc_value: t.Optional[BaseException],
- traceback: t.Optional["types.TracebackType"],
- ) -> t.Optional[bool]:
- if exc_type is not None and exc_value is not None:
- self.retry_state.set_exception((exc_type, exc_value, traceback))
- return True # Swallow exception.
- else:
- # We don't have the result, actually.
- self.retry_state.set_result(None)
- return None
- class BaseRetrying(ABC):
- def __init__(
- self,
- sleep: t.Callable[[t.Union[int, float]], None] = sleep,
- stop: "StopBaseT" = stop_never,
- wait: "WaitBaseT" = wait_none(),
- retry: "RetryBaseT" = retry_if_exception_type(),
- before: t.Callable[["RetryCallState"], None] = before_nothing,
- after: t.Callable[["RetryCallState"], None] = after_nothing,
- before_sleep: t.Optional[t.Callable[["RetryCallState"], None]] = None,
- reraise: bool = False,
- retry_error_cls: t.Type[RetryError] = RetryError,
- retry_error_callback: t.Optional[t.Callable[["RetryCallState"], t.Any]] = None,
- ):
- self.sleep = sleep
- self.stop = stop
- self.wait = wait
- self.retry = retry
- self.before = before
- self.after = after
- self.before_sleep = before_sleep
- self.reraise = reraise
- self._local = threading.local()
- self.retry_error_cls = retry_error_cls
- self.retry_error_callback = retry_error_callback
- def copy(
- self,
- sleep: t.Union[t.Callable[[t.Union[int, float]], None], object] = _unset,
- stop: t.Union["StopBaseT", object] = _unset,
- wait: t.Union["WaitBaseT", object] = _unset,
- retry: t.Union[retry_base, object] = _unset,
- before: t.Union[t.Callable[["RetryCallState"], None], object] = _unset,
- after: t.Union[t.Callable[["RetryCallState"], None], object] = _unset,
- before_sleep: t.Union[
- t.Optional[t.Callable[["RetryCallState"], None]], object
- ] = _unset,
- reraise: t.Union[bool, object] = _unset,
- retry_error_cls: t.Union[t.Type[RetryError], object] = _unset,
- retry_error_callback: t.Union[
- t.Optional[t.Callable[["RetryCallState"], t.Any]], object
- ] = _unset,
- ) -> "Self":
- """Copy this object with some parameters changed if needed."""
- return self.__class__(
- sleep=_first_set(sleep, self.sleep),
- stop=_first_set(stop, self.stop),
- wait=_first_set(wait, self.wait),
- retry=_first_set(retry, self.retry),
- before=_first_set(before, self.before),
- after=_first_set(after, self.after),
- before_sleep=_first_set(before_sleep, self.before_sleep),
- reraise=_first_set(reraise, self.reraise),
- retry_error_cls=_first_set(retry_error_cls, self.retry_error_cls),
- retry_error_callback=_first_set(
- retry_error_callback, self.retry_error_callback
- ),
- )
- def __repr__(self) -> str:
- return (
- f"<{self.__class__.__name__} object at 0x{id(self):x} ("
- f"stop={self.stop}, "
- f"wait={self.wait}, "
- f"sleep={self.sleep}, "
- f"retry={self.retry}, "
- f"before={self.before}, "
- f"after={self.after})>"
- )
- @property
- def statistics(self) -> t.Dict[str, t.Any]:
- """Return a dictionary of runtime statistics.
- This dictionary will be empty when the controller has never been
- ran. When it is running or has ran previously it should have (but
- may not) have useful and/or informational keys and values when
- running is underway and/or completed.
- .. warning:: The keys in this dictionary **should** be some what
- stable (not changing), but there existence **may**
- change between major releases as new statistics are
- gathered or removed so before accessing keys ensure that
- they actually exist and handle when they do not.
- .. note:: The values in this dictionary are local to the thread
- running call (so if multiple threads share the same retrying
- object - either directly or indirectly) they will each have
- there own view of statistics they have collected (in the
- future we may provide a way to aggregate the various
- statistics from each thread).
- """
- try:
- return self._local.statistics # type: ignore[no-any-return]
- except AttributeError:
- self._local.statistics = t.cast(t.Dict[str, t.Any], {})
- return self._local.statistics
- @property
- def iter_state(self) -> IterState:
- try:
- return self._local.iter_state # type: ignore[no-any-return]
- except AttributeError:
- self._local.iter_state = IterState()
- return self._local.iter_state
- def wraps(self, f: WrappedFn) -> WrappedFn:
- """Wrap a function for retrying.
- :param f: A function to wraps for retrying.
- """
- @functools.wraps(
- f, functools.WRAPPER_ASSIGNMENTS + ("__defaults__", "__kwdefaults__")
- )
- def wrapped_f(*args: t.Any, **kw: t.Any) -> t.Any:
- # Always create a copy to prevent overwriting the local contexts when
- # calling the same wrapped functions multiple times in the same stack
- copy = self.copy()
- wrapped_f.statistics = copy.statistics # type: ignore[attr-defined]
- return copy(f, *args, **kw)
- def retry_with(*args: t.Any, **kwargs: t.Any) -> WrappedFn:
- return self.copy(*args, **kwargs).wraps(f)
- # Preserve attributes
- wrapped_f.retry = self # type: ignore[attr-defined]
- wrapped_f.retry_with = retry_with # type: ignore[attr-defined]
- wrapped_f.statistics = {} # type: ignore[attr-defined]
- return wrapped_f # type: ignore[return-value]
- def begin(self) -> None:
- self.statistics.clear()
- self.statistics["start_time"] = time.monotonic()
- self.statistics["attempt_number"] = 1
- self.statistics["idle_for"] = 0
- def _add_action_func(self, fn: t.Callable[..., t.Any]) -> None:
- self.iter_state.actions.append(fn)
- def _run_retry(self, retry_state: "RetryCallState") -> None:
- self.iter_state.retry_run_result = self.retry(retry_state)
- def _run_wait(self, retry_state: "RetryCallState") -> None:
- if self.wait:
- sleep = self.wait(retry_state)
- else:
- sleep = 0.0
- retry_state.upcoming_sleep = sleep
- def _run_stop(self, retry_state: "RetryCallState") -> None:
- self.statistics["delay_since_first_attempt"] = retry_state.seconds_since_start
- self.iter_state.stop_run_result = self.stop(retry_state)
- def iter(self, retry_state: "RetryCallState") -> t.Union[DoAttempt, DoSleep, t.Any]: # noqa
- self._begin_iter(retry_state)
- result = None
- for action in self.iter_state.actions:
- result = action(retry_state)
- return result
- def _begin_iter(self, retry_state: "RetryCallState") -> None: # noqa
- self.iter_state.reset()
- fut = retry_state.outcome
- if fut is None:
- if self.before is not None:
- self._add_action_func(self.before)
- self._add_action_func(lambda rs: DoAttempt())
- return
- self.iter_state.is_explicit_retry = fut.failed and isinstance(
- fut.exception(), TryAgain
- )
- if not self.iter_state.is_explicit_retry:
- self._add_action_func(self._run_retry)
- self._add_action_func(self._post_retry_check_actions)
- def _post_retry_check_actions(self, retry_state: "RetryCallState") -> None:
- if not (self.iter_state.is_explicit_retry or self.iter_state.retry_run_result):
- self._add_action_func(lambda rs: rs.outcome.result())
- return
- if self.after is not None:
- self._add_action_func(self.after)
- self._add_action_func(self._run_wait)
- self._add_action_func(self._run_stop)
- self._add_action_func(self._post_stop_check_actions)
- def _post_stop_check_actions(self, retry_state: "RetryCallState") -> None:
- if self.iter_state.stop_run_result:
- if self.retry_error_callback:
- self._add_action_func(self.retry_error_callback)
- return
- def exc_check(rs: "RetryCallState") -> None:
- fut = t.cast(Future, rs.outcome)
- retry_exc = self.retry_error_cls(fut)
- if self.reraise:
- raise retry_exc.reraise()
- raise retry_exc from fut.exception()
- self._add_action_func(exc_check)
- return
- def next_action(rs: "RetryCallState") -> None:
- sleep = rs.upcoming_sleep
- rs.next_action = RetryAction(sleep)
- rs.idle_for += sleep
- self.statistics["idle_for"] += sleep
- self.statistics["attempt_number"] += 1
- self._add_action_func(next_action)
- if self.before_sleep is not None:
- self._add_action_func(self.before_sleep)
- self._add_action_func(lambda rs: DoSleep(rs.upcoming_sleep))
- def __iter__(self) -> t.Generator[AttemptManager, None, None]:
- self.begin()
- retry_state = RetryCallState(self, fn=None, args=(), kwargs={})
- while True:
- do = self.iter(retry_state=retry_state)
- if isinstance(do, DoAttempt):
- yield AttemptManager(retry_state=retry_state)
- elif isinstance(do, DoSleep):
- retry_state.prepare_for_next_attempt()
- self.sleep(do)
- else:
- break
- @abstractmethod
- def __call__(
- self,
- fn: t.Callable[..., WrappedFnReturnT],
- *args: t.Any,
- **kwargs: t.Any,
- ) -> WrappedFnReturnT:
- pass
- class Retrying(BaseRetrying):
- """Retrying controller."""
- def __call__(
- self,
- fn: t.Callable[..., WrappedFnReturnT],
- *args: t.Any,
- **kwargs: t.Any,
- ) -> WrappedFnReturnT:
- self.begin()
- retry_state = RetryCallState(retry_object=self, fn=fn, args=args, kwargs=kwargs)
- while True:
- do = self.iter(retry_state=retry_state)
- if isinstance(do, DoAttempt):
- try:
- result = fn(*args, **kwargs)
- except BaseException: # noqa: B902
- retry_state.set_exception(sys.exc_info()) # type: ignore[arg-type]
- else:
- retry_state.set_result(result)
- elif isinstance(do, DoSleep):
- retry_state.prepare_for_next_attempt()
- self.sleep(do)
- else:
- return do # type: ignore[no-any-return]
- if sys.version_info >= (3, 9):
- FutureGenericT = futures.Future[t.Any]
- else:
- FutureGenericT = futures.Future
- class Future(FutureGenericT):
- """Encapsulates a (future or past) attempted call to a target function."""
- def __init__(self, attempt_number: int) -> None:
- super().__init__()
- self.attempt_number = attempt_number
- @property
- def failed(self) -> bool:
- """Return whether a exception is being held in this future."""
- return self.exception() is not None
- @classmethod
- def construct(
- cls, attempt_number: int, value: t.Any, has_exception: bool
- ) -> "Future":
- """Construct a new Future object."""
- fut = cls(attempt_number)
- if has_exception:
- fut.set_exception(value)
- else:
- fut.set_result(value)
- return fut
- class RetryCallState:
- """State related to a single call wrapped with Retrying."""
- def __init__(
- self,
- retry_object: BaseRetrying,
- fn: t.Optional[WrappedFn],
- args: t.Any,
- kwargs: t.Any,
- ) -> None:
- #: Retry call start timestamp
- self.start_time = time.monotonic()
- #: Retry manager object
- self.retry_object = retry_object
- #: Function wrapped by this retry call
- self.fn = fn
- #: Arguments of the function wrapped by this retry call
- self.args = args
- #: Keyword arguments of the function wrapped by this retry call
- self.kwargs = kwargs
- #: The number of the current attempt
- self.attempt_number: int = 1
- #: Last outcome (result or exception) produced by the function
- self.outcome: t.Optional[Future] = None
- #: Timestamp of the last outcome
- self.outcome_timestamp: t.Optional[float] = None
- #: Time spent sleeping in retries
- self.idle_for: float = 0.0
- #: Next action as decided by the retry manager
- self.next_action: t.Optional[RetryAction] = None
- #: Next sleep time as decided by the retry manager.
- self.upcoming_sleep: float = 0.0
- @property
- def seconds_since_start(self) -> t.Optional[float]:
- if self.outcome_timestamp is None:
- return None
- return self.outcome_timestamp - self.start_time
- def prepare_for_next_attempt(self) -> None:
- self.outcome = None
- self.outcome_timestamp = None
- self.attempt_number += 1
- self.next_action = None
- def set_result(self, val: t.Any) -> None:
- ts = time.monotonic()
- fut = Future(self.attempt_number)
- fut.set_result(val)
- self.outcome, self.outcome_timestamp = fut, ts
- def set_exception(
- self,
- exc_info: t.Tuple[
- t.Type[BaseException], BaseException, "types.TracebackType| None"
- ],
- ) -> None:
- ts = time.monotonic()
- fut = Future(self.attempt_number)
- fut.set_exception(exc_info[1])
- self.outcome, self.outcome_timestamp = fut, ts
- def __repr__(self) -> str:
- if self.outcome is None:
- result = "none yet"
- elif self.outcome.failed:
- exception = self.outcome.exception()
- result = f"failed ({exception.__class__.__name__} {exception})"
- else:
- result = f"returned {self.outcome.result()}"
- slept = float(round(self.idle_for, 2))
- clsname = self.__class__.__name__
- return f"<{clsname} {id(self)}: attempt #{self.attempt_number}; slept for {slept}; last result: {result}>"
- @t.overload
- def retry(func: WrappedFn) -> WrappedFn: ...
- @t.overload
- def retry(
- sleep: t.Callable[[t.Union[int, float]], t.Union[None, t.Awaitable[None]]] = sleep,
- stop: "StopBaseT" = stop_never,
- wait: "WaitBaseT" = wait_none(),
- retry: "t.Union[RetryBaseT, tasyncio.retry.RetryBaseT]" = retry_if_exception_type(),
- before: t.Callable[
- ["RetryCallState"], t.Union[None, t.Awaitable[None]]
- ] = before_nothing,
- after: t.Callable[
- ["RetryCallState"], t.Union[None, t.Awaitable[None]]
- ] = after_nothing,
- before_sleep: t.Optional[
- t.Callable[["RetryCallState"], t.Union[None, t.Awaitable[None]]]
- ] = None,
- reraise: bool = False,
- retry_error_cls: t.Type["RetryError"] = RetryError,
- retry_error_callback: t.Optional[
- t.Callable[["RetryCallState"], t.Union[t.Any, t.Awaitable[t.Any]]]
- ] = None,
- ) -> t.Callable[[WrappedFn], WrappedFn]: ...
- def retry(*dargs: t.Any, **dkw: t.Any) -> t.Any:
- """Wrap a function with a new `Retrying` object.
- :param dargs: positional arguments passed to Retrying object
- :param dkw: keyword arguments passed to the Retrying object
- """
- # support both @retry and @retry() as valid syntax
- if len(dargs) == 1 and callable(dargs[0]):
- return retry()(dargs[0])
- else:
- def wrap(f: WrappedFn) -> WrappedFn:
- if isinstance(f, retry_base):
- warnings.warn(
- f"Got retry_base instance ({f.__class__.__name__}) as callable argument, "
- f"this will probably hang indefinitely (did you mean retry={f.__class__.__name__}(...)?)"
- )
- r: "BaseRetrying"
- if _utils.is_coroutine_callable(f):
- r = AsyncRetrying(*dargs, **dkw)
- elif (
- tornado
- and hasattr(tornado.gen, "is_coroutine_function")
- and tornado.gen.is_coroutine_function(f)
- ):
- r = TornadoRetrying(*dargs, **dkw)
- else:
- r = Retrying(*dargs, **dkw)
- return r.wraps(f)
- return wrap
- from tenacity.asyncio import AsyncRetrying # noqa:E402,I100
- if tornado:
- from tenacity.tornadoweb import TornadoRetrying
- __all__ = [
- "retry_base",
- "retry_all",
- "retry_always",
- "retry_any",
- "retry_if_exception",
- "retry_if_exception_type",
- "retry_if_exception_cause_type",
- "retry_if_not_exception_type",
- "retry_if_not_result",
- "retry_if_result",
- "retry_never",
- "retry_unless_exception_type",
- "retry_if_exception_message",
- "retry_if_not_exception_message",
- "sleep",
- "sleep_using_event",
- "stop_after_attempt",
- "stop_after_delay",
- "stop_before_delay",
- "stop_all",
- "stop_any",
- "stop_never",
- "stop_when_event_set",
- "wait_chain",
- "wait_combine",
- "wait_exponential",
- "wait_fixed",
- "wait_incrementing",
- "wait_none",
- "wait_random",
- "wait_random_exponential",
- "wait_full_jitter",
- "wait_exponential_jitter",
- "before_log",
- "before_nothing",
- "after_log",
- "after_nothing",
- "before_sleep_log",
- "before_sleep_nothing",
- "retry",
- "WrappedFn",
- "TryAgain",
- "NO_RESULT",
- "DoAttempt",
- "DoSleep",
- "BaseAction",
- "RetryAction",
- "RetryError",
- "AttemptManager",
- "BaseRetrying",
- "Retrying",
- "Future",
- "RetryCallState",
- "AsyncRetrying",
- ]
|