_eventloop.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. from __future__ import annotations
  2. import math
  3. import sys
  4. from abc import ABCMeta, abstractmethod
  5. from collections.abc import AsyncIterator, Awaitable, Callable, Sequence
  6. from contextlib import AbstractContextManager
  7. from os import PathLike
  8. from signal import Signals
  9. from socket import AddressFamily, SocketKind, socket
  10. from typing import (
  11. IO,
  12. TYPE_CHECKING,
  13. Any,
  14. TypeVar,
  15. Union,
  16. overload,
  17. )
  18. if sys.version_info >= (3, 11):
  19. from typing import TypeVarTuple, Unpack
  20. else:
  21. from typing_extensions import TypeVarTuple, Unpack
  22. if sys.version_info >= (3, 10):
  23. from typing import TypeAlias
  24. else:
  25. from typing_extensions import TypeAlias
  26. if TYPE_CHECKING:
  27. from _typeshed import HasFileno
  28. from .._core._synchronization import CapacityLimiter, Event, Lock, Semaphore
  29. from .._core._tasks import CancelScope
  30. from .._core._testing import TaskInfo
  31. from ..from_thread import BlockingPortal
  32. from ._sockets import (
  33. ConnectedUDPSocket,
  34. ConnectedUNIXDatagramSocket,
  35. IPSockAddrType,
  36. SocketListener,
  37. SocketStream,
  38. UDPSocket,
  39. UNIXDatagramSocket,
  40. UNIXSocketStream,
  41. )
  42. from ._subprocesses import Process
  43. from ._tasks import TaskGroup
  44. from ._testing import TestRunner
  45. T_Retval = TypeVar("T_Retval")
  46. PosArgsT = TypeVarTuple("PosArgsT")
  47. StrOrBytesPath: TypeAlias = Union[str, bytes, "PathLike[str]", "PathLike[bytes]"]
  48. class AsyncBackend(metaclass=ABCMeta):
  49. @classmethod
  50. @abstractmethod
  51. def run(
  52. cls,
  53. func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
  54. args: tuple[Unpack[PosArgsT]],
  55. kwargs: dict[str, Any],
  56. options: dict[str, Any],
  57. ) -> T_Retval:
  58. """
  59. Run the given coroutine function in an asynchronous event loop.
  60. The current thread must not be already running an event loop.
  61. :param func: a coroutine function
  62. :param args: positional arguments to ``func``
  63. :param kwargs: positional arguments to ``func``
  64. :param options: keyword arguments to call the backend ``run()`` implementation
  65. with
  66. :return: the return value of the coroutine function
  67. """
  68. @classmethod
  69. @abstractmethod
  70. def current_token(cls) -> object:
  71. """
  72. :return:
  73. """
  74. @classmethod
  75. @abstractmethod
  76. def current_time(cls) -> float:
  77. """
  78. Return the current value of the event loop's internal clock.
  79. :return: the clock value (seconds)
  80. """
  81. @classmethod
  82. @abstractmethod
  83. def cancelled_exception_class(cls) -> type[BaseException]:
  84. """Return the exception class that is raised in a task if it's cancelled."""
  85. @classmethod
  86. @abstractmethod
  87. async def checkpoint(cls) -> None:
  88. """
  89. Check if the task has been cancelled, and allow rescheduling of other tasks.
  90. This is effectively the same as running :meth:`checkpoint_if_cancelled` and then
  91. :meth:`cancel_shielded_checkpoint`.
  92. """
  93. @classmethod
  94. async def checkpoint_if_cancelled(cls) -> None:
  95. """
  96. Check if the current task group has been cancelled.
  97. This will check if the task has been cancelled, but will not allow other tasks
  98. to be scheduled if not.
  99. """
  100. if cls.current_effective_deadline() == -math.inf:
  101. await cls.checkpoint()
  102. @classmethod
  103. async def cancel_shielded_checkpoint(cls) -> None:
  104. """
  105. Allow the rescheduling of other tasks.
  106. This will give other tasks the opportunity to run, but without checking if the
  107. current task group has been cancelled, unlike with :meth:`checkpoint`.
  108. """
  109. with cls.create_cancel_scope(shield=True):
  110. await cls.sleep(0)
  111. @classmethod
  112. @abstractmethod
  113. async def sleep(cls, delay: float) -> None:
  114. """
  115. Pause the current task for the specified duration.
  116. :param delay: the duration, in seconds
  117. """
  118. @classmethod
  119. @abstractmethod
  120. def create_cancel_scope(
  121. cls, *, deadline: float = math.inf, shield: bool = False
  122. ) -> CancelScope:
  123. pass
  124. @classmethod
  125. @abstractmethod
  126. def current_effective_deadline(cls) -> float:
  127. """
  128. Return the nearest deadline among all the cancel scopes effective for the
  129. current task.
  130. :return:
  131. - a clock value from the event loop's internal clock
  132. - ``inf`` if there is no deadline in effect
  133. - ``-inf`` if the current scope has been cancelled
  134. :rtype: float
  135. """
  136. @classmethod
  137. @abstractmethod
  138. def create_task_group(cls) -> TaskGroup:
  139. pass
  140. @classmethod
  141. @abstractmethod
  142. def create_event(cls) -> Event:
  143. pass
  144. @classmethod
  145. @abstractmethod
  146. def create_lock(cls, *, fast_acquire: bool) -> Lock:
  147. pass
  148. @classmethod
  149. @abstractmethod
  150. def create_semaphore(
  151. cls,
  152. initial_value: int,
  153. *,
  154. max_value: int | None = None,
  155. fast_acquire: bool = False,
  156. ) -> Semaphore:
  157. pass
  158. @classmethod
  159. @abstractmethod
  160. def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter:
  161. pass
  162. @classmethod
  163. @abstractmethod
  164. async def run_sync_in_worker_thread(
  165. cls,
  166. func: Callable[[Unpack[PosArgsT]], T_Retval],
  167. args: tuple[Unpack[PosArgsT]],
  168. abandon_on_cancel: bool = False,
  169. limiter: CapacityLimiter | None = None,
  170. ) -> T_Retval:
  171. pass
  172. @classmethod
  173. @abstractmethod
  174. def check_cancelled(cls) -> None:
  175. pass
  176. @classmethod
  177. @abstractmethod
  178. def run_async_from_thread(
  179. cls,
  180. func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
  181. args: tuple[Unpack[PosArgsT]],
  182. token: object,
  183. ) -> T_Retval:
  184. pass
  185. @classmethod
  186. @abstractmethod
  187. def run_sync_from_thread(
  188. cls,
  189. func: Callable[[Unpack[PosArgsT]], T_Retval],
  190. args: tuple[Unpack[PosArgsT]],
  191. token: object,
  192. ) -> T_Retval:
  193. pass
  194. @classmethod
  195. @abstractmethod
  196. def create_blocking_portal(cls) -> BlockingPortal:
  197. pass
  198. @classmethod
  199. @abstractmethod
  200. async def open_process(
  201. cls,
  202. command: StrOrBytesPath | Sequence[StrOrBytesPath],
  203. *,
  204. stdin: int | IO[Any] | None,
  205. stdout: int | IO[Any] | None,
  206. stderr: int | IO[Any] | None,
  207. **kwargs: Any,
  208. ) -> Process:
  209. pass
  210. @classmethod
  211. @abstractmethod
  212. def setup_process_pool_exit_at_shutdown(cls, workers: set[Process]) -> None:
  213. pass
  214. @classmethod
  215. @abstractmethod
  216. async def connect_tcp(
  217. cls, host: str, port: int, local_address: IPSockAddrType | None = None
  218. ) -> SocketStream:
  219. pass
  220. @classmethod
  221. @abstractmethod
  222. async def connect_unix(cls, path: str | bytes) -> UNIXSocketStream:
  223. pass
  224. @classmethod
  225. @abstractmethod
  226. def create_tcp_listener(cls, sock: socket) -> SocketListener:
  227. pass
  228. @classmethod
  229. @abstractmethod
  230. def create_unix_listener(cls, sock: socket) -> SocketListener:
  231. pass
  232. @classmethod
  233. @abstractmethod
  234. async def create_udp_socket(
  235. cls,
  236. family: AddressFamily,
  237. local_address: IPSockAddrType | None,
  238. remote_address: IPSockAddrType | None,
  239. reuse_port: bool,
  240. ) -> UDPSocket | ConnectedUDPSocket:
  241. pass
  242. @classmethod
  243. @overload
  244. async def create_unix_datagram_socket(
  245. cls, raw_socket: socket, remote_path: None
  246. ) -> UNIXDatagramSocket: ...
  247. @classmethod
  248. @overload
  249. async def create_unix_datagram_socket(
  250. cls, raw_socket: socket, remote_path: str | bytes
  251. ) -> ConnectedUNIXDatagramSocket: ...
  252. @classmethod
  253. @abstractmethod
  254. async def create_unix_datagram_socket(
  255. cls, raw_socket: socket, remote_path: str | bytes | None
  256. ) -> UNIXDatagramSocket | ConnectedUNIXDatagramSocket:
  257. pass
  258. @classmethod
  259. @abstractmethod
  260. async def getaddrinfo(
  261. cls,
  262. host: bytes | str | None,
  263. port: str | int | None,
  264. *,
  265. family: int | AddressFamily = 0,
  266. type: int | SocketKind = 0,
  267. proto: int = 0,
  268. flags: int = 0,
  269. ) -> Sequence[
  270. tuple[
  271. AddressFamily,
  272. SocketKind,
  273. int,
  274. str,
  275. tuple[str, int] | tuple[str, int, int, int] | tuple[int, bytes],
  276. ]
  277. ]:
  278. pass
  279. @classmethod
  280. @abstractmethod
  281. async def getnameinfo(
  282. cls, sockaddr: IPSockAddrType, flags: int = 0
  283. ) -> tuple[str, str]:
  284. pass
  285. @classmethod
  286. @abstractmethod
  287. async def wait_readable(cls, obj: HasFileno | int) -> None:
  288. pass
  289. @classmethod
  290. @abstractmethod
  291. async def wait_writable(cls, obj: HasFileno | int) -> None:
  292. pass
  293. @classmethod
  294. @abstractmethod
  295. def current_default_thread_limiter(cls) -> CapacityLimiter:
  296. pass
  297. @classmethod
  298. @abstractmethod
  299. def open_signal_receiver(
  300. cls, *signals: Signals
  301. ) -> AbstractContextManager[AsyncIterator[Signals]]:
  302. pass
  303. @classmethod
  304. @abstractmethod
  305. def get_current_task(cls) -> TaskInfo:
  306. pass
  307. @classmethod
  308. @abstractmethod
  309. def get_running_tasks(cls) -> Sequence[TaskInfo]:
  310. pass
  311. @classmethod
  312. @abstractmethod
  313. async def wait_all_tasks_blocked(cls) -> None:
  314. pass
  315. @classmethod
  316. @abstractmethod
  317. def create_test_runner(cls, options: dict[str, Any]) -> TestRunner:
  318. pass