to_thread.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. from __future__ import annotations
  2. import sys
  3. from collections.abc import Callable
  4. from typing import TypeVar
  5. from warnings import warn
  6. from ._core._eventloop import get_async_backend
  7. from .abc import CapacityLimiter
  8. if sys.version_info >= (3, 11):
  9. from typing import TypeVarTuple, Unpack
  10. else:
  11. from typing_extensions import TypeVarTuple, Unpack
  12. T_Retval = TypeVar("T_Retval")
  13. PosArgsT = TypeVarTuple("PosArgsT")
  14. async def run_sync(
  15. func: Callable[[Unpack[PosArgsT]], T_Retval],
  16. *args: Unpack[PosArgsT],
  17. abandon_on_cancel: bool = False,
  18. cancellable: bool | None = None,
  19. limiter: CapacityLimiter | None = None,
  20. ) -> T_Retval:
  21. """
  22. Call the given function with the given arguments in a worker thread.
  23. If the ``cancellable`` option is enabled and the task waiting for its completion is
  24. cancelled, the thread will still run its course but its return value (or any raised
  25. exception) will be ignored.
  26. :param func: a callable
  27. :param args: positional arguments for the callable
  28. :param abandon_on_cancel: ``True`` to abandon the thread (leaving it to run
  29. unchecked on own) if the host task is cancelled, ``False`` to ignore
  30. cancellations in the host task until the operation has completed in the worker
  31. thread
  32. :param cancellable: deprecated alias of ``abandon_on_cancel``; will override
  33. ``abandon_on_cancel`` if both parameters are passed
  34. :param limiter: capacity limiter to use to limit the total amount of threads running
  35. (if omitted, the default limiter is used)
  36. :return: an awaitable that yields the return value of the function.
  37. """
  38. if cancellable is not None:
  39. abandon_on_cancel = cancellable
  40. warn(
  41. "The `cancellable=` keyword argument to `anyio.to_thread.run_sync` is "
  42. "deprecated since AnyIO 4.1.0; use `abandon_on_cancel=` instead",
  43. DeprecationWarning,
  44. stacklevel=2,
  45. )
  46. return await get_async_backend().run_sync_in_worker_thread(
  47. func, args, abandon_on_cancel=abandon_on_cancel, limiter=limiter
  48. )
  49. def current_default_thread_limiter() -> CapacityLimiter:
  50. """
  51. Return the capacity limiter that is used by default to limit the number of
  52. concurrent threads.
  53. :return: a capacity limiter object
  54. """
  55. return get_async_backend().current_default_thread_limiter()