|
- from __future__ import annotations
- import array
- import math
- import os
- import socket
- import sys
- import types
- import weakref
- from collections.abc import (
- AsyncGenerator,
- AsyncIterator,
- Awaitable,
- Callable,
- Collection,
- Coroutine,
- Iterable,
- Sequence,
- )
- from concurrent.futures import Future
- from contextlib import AbstractContextManager
- from dataclasses import dataclass
- from functools import partial
- from io import IOBase
- from os import PathLike
- from signal import Signals
- from socket import AddressFamily, SocketKind
- from types import TracebackType
- from typing import (
- IO,
- TYPE_CHECKING,
- Any,
- Generic,
- NoReturn,
- TypeVar,
- cast,
- overload,
- )
- import trio.from_thread
- import trio.lowlevel
- from outcome import Error, Outcome, Value
- from trio.lowlevel import (
- current_root_task,
- current_task,
- wait_readable,
- wait_writable,
- )
- from trio.socket import SocketType as TrioSocketType
- from trio.to_thread import run_sync
- from .. import (
- CapacityLimiterStatistics,
- EventStatistics,
- LockStatistics,
- TaskInfo,
- WouldBlock,
- abc,
- )
- from .._core._eventloop import claim_worker_thread
- from .._core._exceptions import (
- BrokenResourceError,
- BusyResourceError,
- ClosedResourceError,
- EndOfStream,
- )
- from .._core._sockets import convert_ipv6_sockaddr
- from .._core._streams import create_memory_object_stream
- from .._core._synchronization import (
- CapacityLimiter as BaseCapacityLimiter,
- )
- from .._core._synchronization import Event as BaseEvent
- from .._core._synchronization import Lock as BaseLock
- from .._core._synchronization import (
- ResourceGuard,
- SemaphoreStatistics,
- )
- from .._core._synchronization import Semaphore as BaseSemaphore
- from .._core._tasks import CancelScope as BaseCancelScope
- from ..abc import IPSockAddrType, UDPPacketType, UNIXDatagramPacketType
- from ..abc._eventloop import AsyncBackend, StrOrBytesPath
- from ..streams.memory import MemoryObjectSendStream
- if TYPE_CHECKING:
- from _typeshed import HasFileno
- if sys.version_info >= (3, 10):
- from typing import ParamSpec
- else:
- from typing_extensions import ParamSpec
- if sys.version_info >= (3, 11):
- from typing import TypeVarTuple, Unpack
- else:
- from exceptiongroup import BaseExceptionGroup
- from typing_extensions import TypeVarTuple, Unpack
- T = TypeVar("T")
- T_Retval = TypeVar("T_Retval")
- T_SockAddr = TypeVar("T_SockAddr", str, IPSockAddrType)
- PosArgsT = TypeVarTuple("PosArgsT")
- P = ParamSpec("P")
- #
- # Event loop
- #
- RunVar = trio.lowlevel.RunVar
- #
- # Timeouts and cancellation
- #
- class CancelScope(BaseCancelScope):
- def __new__(
- cls, original: trio.CancelScope | None = None, **kwargs: object
- ) -> CancelScope:
- return object.__new__(cls)
- def __init__(self, original: trio.CancelScope | None = None, **kwargs: Any) -> None:
- self.__original = original or trio.CancelScope(**kwargs)
- def __enter__(self) -> CancelScope:
- self.__original.__enter__()
- return self
- def __exit__(
- self,
- exc_type: type[BaseException] | None,
- exc_val: BaseException | None,
- exc_tb: TracebackType | None,
- ) -> bool:
- return self.__original.__exit__(exc_type, exc_val, exc_tb)
- def cancel(self) -> None:
- self.__original.cancel()
- @property
- def deadline(self) -> float:
- return self.__original.deadline
- @deadline.setter
- def deadline(self, value: float) -> None:
- self.__original.deadline = value
- @property
- def cancel_called(self) -> bool:
- return self.__original.cancel_called
- @property
- def cancelled_caught(self) -> bool:
- return self.__original.cancelled_caught
- @property
- def shield(self) -> bool:
- return self.__original.shield
- @shield.setter
- def shield(self, value: bool) -> None:
- self.__original.shield = value
- #
- # Task groups
- #
- class TaskGroup(abc.TaskGroup):
- def __init__(self) -> None:
- self._active = False
- self._nursery_manager = trio.open_nursery(strict_exception_groups=True)
- self.cancel_scope = None # type: ignore[assignment]
- async def __aenter__(self) -> TaskGroup:
- self._active = True
- self._nursery = await self._nursery_manager.__aenter__()
- self.cancel_scope = CancelScope(self._nursery.cancel_scope)
- return self
- async def __aexit__(
- self,
- exc_type: type[BaseException] | None,
- exc_val: BaseException | None,
- exc_tb: TracebackType | None,
- ) -> bool:
- try:
- # trio.Nursery.__exit__ returns bool; .open_nursery has wrong type
- return await self._nursery_manager.__aexit__(exc_type, exc_val, exc_tb) # type: ignore[return-value]
- except BaseExceptionGroup as exc:
- if not exc.split(trio.Cancelled)[1]:
- raise trio.Cancelled._create() from exc
- raise
- finally:
- del exc_val, exc_tb
- self._active = False
- def start_soon(
- self,
- func: Callable[[Unpack[PosArgsT]], Awaitable[Any]],
- *args: Unpack[PosArgsT],
- name: object = None,
- ) -> None:
- if not self._active:
- raise RuntimeError(
- "This task group is not active; no new tasks can be started."
- )
- self._nursery.start_soon(func, *args, name=name)
- async def start(
- self, func: Callable[..., Awaitable[Any]], *args: object, name: object = None
- ) -> Any:
- if not self._active:
- raise RuntimeError(
- "This task group is not active; no new tasks can be started."
- )
- return await self._nursery.start(func, *args, name=name)
- #
- # Threads
- #
- class BlockingPortal(abc.BlockingPortal):
- def __new__(cls) -> BlockingPortal:
- return object.__new__(cls)
- def __init__(self) -> None:
- super().__init__()
- self._token = trio.lowlevel.current_trio_token()
- def _spawn_task_from_thread(
- self,
- func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval],
- args: tuple[Unpack[PosArgsT]],
- kwargs: dict[str, Any],
- name: object,
- future: Future[T_Retval],
- ) -> None:
- trio.from_thread.run_sync(
- partial(self._task_group.start_soon, name=name),
- self._call_func,
- func,
- args,
- kwargs,
- future,
- trio_token=self._token,
- )
- #
- # Subprocesses
- #
- @dataclass(eq=False)
- class ReceiveStreamWrapper(abc.ByteReceiveStream):
- _stream: trio.abc.ReceiveStream
- async def receive(self, max_bytes: int | None = None) -> bytes:
- try:
- data = await self._stream.receive_some(max_bytes)
- except trio.ClosedResourceError as exc:
- raise ClosedResourceError from exc.__cause__
- except trio.BrokenResourceError as exc:
- raise BrokenResourceError from exc.__cause__
- if data:
- return data
- else:
- raise EndOfStream
- async def aclose(self) -> None:
- await self._stream.aclose()
- @dataclass(eq=False)
- class SendStreamWrapper(abc.ByteSendStream):
- _stream: trio.abc.SendStream
- async def send(self, item: bytes) -> None:
- try:
- await self._stream.send_all(item)
- except trio.ClosedResourceError as exc:
- raise ClosedResourceError from exc.__cause__
- except trio.BrokenResourceError as exc:
- raise BrokenResourceError from exc.__cause__
- async def aclose(self) -> None:
- await self._stream.aclose()
- @dataclass(eq=False)
- class Process(abc.Process):
- _process: trio.Process
- _stdin: abc.ByteSendStream | None
- _stdout: abc.ByteReceiveStream | None
- _stderr: abc.ByteReceiveStream | None
- async def aclose(self) -> None:
- with CancelScope(shield=True):
- if self._stdin:
- await self._stdin.aclose()
- if self._stdout:
- await self._stdout.aclose()
- if self._stderr:
- await self._stderr.aclose()
- try:
- await self.wait()
- except BaseException:
- self.kill()
- with CancelScope(shield=True):
- await self.wait()
- raise
- async def wait(self) -> int:
- return await self._process.wait()
- def terminate(self) -> None:
- self._process.terminate()
- def kill(self) -> None:
- self._process.kill()
- def send_signal(self, signal: Signals) -> None:
- self._process.send_signal(signal)
- @property
- def pid(self) -> int:
- return self._process.pid
- @property
- def returncode(self) -> int | None:
- return self._process.returncode
- @property
- def stdin(self) -> abc.ByteSendStream | None:
- return self._stdin
- @property
- def stdout(self) -> abc.ByteReceiveStream | None:
- return self._stdout
- @property
- def stderr(self) -> abc.ByteReceiveStream | None:
- return self._stderr
- class _ProcessPoolShutdownInstrument(trio.abc.Instrument):
- def after_run(self) -> None:
- super().after_run()
- current_default_worker_process_limiter: trio.lowlevel.RunVar = RunVar(
- "current_default_worker_process_limiter"
- )
- async def _shutdown_process_pool(workers: set[abc.Process]) -> None:
- try:
- await trio.sleep(math.inf)
- except trio.Cancelled:
- for process in workers:
- if process.returncode is None:
- process.kill()
- with CancelScope(shield=True):
- for process in workers:
- await process.aclose()
- #
- # Sockets and networking
- #
- class _TrioSocketMixin(Generic[T_SockAddr]):
- def __init__(self, trio_socket: TrioSocketType) -> None:
- self._trio_socket = trio_socket
- self._closed = False
- def _check_closed(self) -> None:
- if self._closed:
- raise ClosedResourceError
- if self._trio_socket.fileno() < 0:
- raise BrokenResourceError
- @property
- def _raw_socket(self) -> socket.socket:
- return self._trio_socket._sock # type: ignore[attr-defined]
- async def aclose(self) -> None:
- if self._trio_socket.fileno() >= 0:
- self._closed = True
- self._trio_socket.close()
- def _convert_socket_error(self, exc: BaseException) -> NoReturn:
- if isinstance(exc, trio.ClosedResourceError):
- raise ClosedResourceError from exc
- elif self._trio_socket.fileno() < 0 and self._closed:
- raise ClosedResourceError from None
- elif isinstance(exc, OSError):
- raise BrokenResourceError from exc
- else:
- raise exc
- class SocketStream(_TrioSocketMixin, abc.SocketStream):
- def __init__(self, trio_socket: TrioSocketType) -> None:
- super().__init__(trio_socket)
- self._receive_guard = ResourceGuard("reading from")
- self._send_guard = ResourceGuard("writing to")
- async def receive(self, max_bytes: int = 65536) -> bytes:
- with self._receive_guard:
- try:
- data = await self._trio_socket.recv(max_bytes)
- except BaseException as exc:
- self._convert_socket_error(exc)
- if data:
- return data
- else:
- raise EndOfStream
- async def send(self, item: bytes) -> None:
- with self._send_guard:
- view = memoryview(item)
- while view:
- try:
- bytes_sent = await self._trio_socket.send(view)
- except BaseException as exc:
- self._convert_socket_error(exc)
- view = view[bytes_sent:]
- async def send_eof(self) -> None:
- self._trio_socket.shutdown(socket.SHUT_WR)
- class UNIXSocketStream(SocketStream, abc.UNIXSocketStream):
- async def receive_fds(self, msglen: int, maxfds: int) -> tuple[bytes, list[int]]:
- if not isinstance(msglen, int) or msglen < 0:
- raise ValueError("msglen must be a non-negative integer")
- if not isinstance(maxfds, int) or maxfds < 1:
- raise ValueError("maxfds must be a positive integer")
- fds = array.array("i")
- await trio.lowlevel.checkpoint()
- with self._receive_guard:
- while True:
- try:
- message, ancdata, flags, addr = await self._trio_socket.recvmsg(
- msglen, socket.CMSG_LEN(maxfds * fds.itemsize)
- )
- except BaseException as exc:
- self._convert_socket_error(exc)
- else:
- if not message and not ancdata:
- raise EndOfStream
- break
- for cmsg_level, cmsg_type, cmsg_data in ancdata:
- if cmsg_level != socket.SOL_SOCKET or cmsg_type != socket.SCM_RIGHTS:
- raise RuntimeError(
- f"Received unexpected ancillary data; message = {message!r}, "
- f"cmsg_level = {cmsg_level}, cmsg_type = {cmsg_type}"
- )
- fds.frombytes(cmsg_data[: len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
- return message, list(fds)
- async def send_fds(self, message: bytes, fds: Collection[int | IOBase]) -> None:
- if not message:
- raise ValueError("message must not be empty")
- if not fds:
- raise ValueError("fds must not be empty")
- filenos: list[int] = []
- for fd in fds:
- if isinstance(fd, int):
- filenos.append(fd)
- elif isinstance(fd, IOBase):
- filenos.append(fd.fileno())
- fdarray = array.array("i", filenos)
- await trio.lowlevel.checkpoint()
- with self._send_guard:
- while True:
- try:
- await self._trio_socket.sendmsg(
- [message],
- [
- (
- socket.SOL_SOCKET,
- socket.SCM_RIGHTS,
- fdarray,
- )
- ],
- )
- break
- except BaseException as exc:
- self._convert_socket_error(exc)
- class TCPSocketListener(_TrioSocketMixin, abc.SocketListener):
- def __init__(self, raw_socket: socket.socket):
- super().__init__(trio.socket.from_stdlib_socket(raw_socket))
- self._accept_guard = ResourceGuard("accepting connections from")
- async def accept(self) -> SocketStream:
- with self._accept_guard:
- try:
- trio_socket, _addr = await self._trio_socket.accept()
- except BaseException as exc:
- self._convert_socket_error(exc)
- trio_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
- return SocketStream(trio_socket)
- class UNIXSocketListener(_TrioSocketMixin, abc.SocketListener):
- def __init__(self, raw_socket: socket.socket):
- super().__init__(trio.socket.from_stdlib_socket(raw_socket))
- self._accept_guard = ResourceGuard("accepting connections from")
- async def accept(self) -> UNIXSocketStream:
- with self._accept_guard:
- try:
- trio_socket, _addr = await self._trio_socket.accept()
- except BaseException as exc:
- self._convert_socket_error(exc)
- return UNIXSocketStream(trio_socket)
- class UDPSocket(_TrioSocketMixin[IPSockAddrType], abc.UDPSocket):
- def __init__(self, trio_socket: TrioSocketType) -> None:
- super().__init__(trio_socket)
- self._receive_guard = ResourceGuard("reading from")
- self._send_guard = ResourceGuard("writing to")
- async def receive(self) -> tuple[bytes, IPSockAddrType]:
- with self._receive_guard:
- try:
- data, addr = await self._trio_socket.recvfrom(65536)
- return data, convert_ipv6_sockaddr(addr)
- except BaseException as exc:
- self._convert_socket_error(exc)
- async def send(self, item: UDPPacketType) -> None:
- with self._send_guard:
- try:
- await self._trio_socket.sendto(*item)
- except BaseException as exc:
- self._convert_socket_error(exc)
- class ConnectedUDPSocket(_TrioSocketMixin[IPSockAddrType], abc.ConnectedUDPSocket):
- def __init__(self, trio_socket: TrioSocketType) -> None:
- super().__init__(trio_socket)
- self._receive_guard = ResourceGuard("reading from")
- self._send_guard = ResourceGuard("writing to")
- async def receive(self) -> bytes:
- with self._receive_guard:
- try:
- return await self._trio_socket.recv(65536)
- except BaseException as exc:
- self._convert_socket_error(exc)
- async def send(self, item: bytes) -> None:
- with self._send_guard:
- try:
- await self._trio_socket.send(item)
- except BaseException as exc:
- self._convert_socket_error(exc)
- class UNIXDatagramSocket(_TrioSocketMixin[str], abc.UNIXDatagramSocket):
- def __init__(self, trio_socket: TrioSocketType) -> None:
- super().__init__(trio_socket)
- self._receive_guard = ResourceGuard("reading from")
- self._send_guard = ResourceGuard("writing to")
- async def receive(self) -> UNIXDatagramPacketType:
- with self._receive_guard:
- try:
- data, addr = await self._trio_socket.recvfrom(65536)
- return data, addr
- except BaseException as exc:
- self._convert_socket_error(exc)
- async def send(self, item: UNIXDatagramPacketType) -> None:
- with self._send_guard:
- try:
- await self._trio_socket.sendto(*item)
- except BaseException as exc:
- self._convert_socket_error(exc)
- class ConnectedUNIXDatagramSocket(
- _TrioSocketMixin[str], abc.ConnectedUNIXDatagramSocket
- ):
- def __init__(self, trio_socket: TrioSocketType) -> None:
- super().__init__(trio_socket)
- self._receive_guard = ResourceGuard("reading from")
- self._send_guard = ResourceGuard("writing to")
- async def receive(self) -> bytes:
- with self._receive_guard:
- try:
- return await self._trio_socket.recv(65536)
- except BaseException as exc:
- self._convert_socket_error(exc)
- async def send(self, item: bytes) -> None:
- with self._send_guard:
- try:
- await self._trio_socket.send(item)
- except BaseException as exc:
- self._convert_socket_error(exc)
- #
- # Synchronization
- #
- class Event(BaseEvent):
- def __new__(cls) -> Event:
- return object.__new__(cls)
- def __init__(self) -> None:
- self.__original = trio.Event()
- def is_set(self) -> bool:
- return self.__original.is_set()
- async def wait(self) -> None:
- return await self.__original.wait()
- def statistics(self) -> EventStatistics:
- orig_statistics = self.__original.statistics()
- return EventStatistics(tasks_waiting=orig_statistics.tasks_waiting)
- def set(self) -> None:
- self.__original.set()
- class Lock(BaseLock):
- def __new__(cls, *, fast_acquire: bool = False) -> Lock:
- return object.__new__(cls)
- def __init__(self, *, fast_acquire: bool = False) -> None:
- self._fast_acquire = fast_acquire
- self.__original = trio.Lock()
- @staticmethod
- def _convert_runtime_error_msg(exc: RuntimeError) -> None:
- if exc.args == ("attempt to re-acquire an already held Lock",):
- exc.args = ("Attempted to acquire an already held Lock",)
- async def acquire(self) -> None:
- if not self._fast_acquire:
- try:
- await self.__original.acquire()
- except RuntimeError as exc:
- self._convert_runtime_error_msg(exc)
- raise
- return
- # This is the "fast path" where we don't let other tasks run
- await trio.lowlevel.checkpoint_if_cancelled()
- try:
- self.__original.acquire_nowait()
- except trio.WouldBlock:
- await self.__original._lot.park()
- except RuntimeError as exc:
- self._convert_runtime_error_msg(exc)
- raise
- def acquire_nowait(self) -> None:
- try:
- self.__original.acquire_nowait()
- except trio.WouldBlock:
- raise WouldBlock from None
- except RuntimeError as exc:
- self._convert_runtime_error_msg(exc)
- raise
- def locked(self) -> bool:
- return self.__original.locked()
- def release(self) -> None:
- self.__original.release()
- def statistics(self) -> LockStatistics:
- orig_statistics = self.__original.statistics()
- owner = TrioTaskInfo(orig_statistics.owner) if orig_statistics.owner else None
- return LockStatistics(
- orig_statistics.locked, owner, orig_statistics.tasks_waiting
- )
- class Semaphore(BaseSemaphore):
- def __new__(
- cls,
- initial_value: int,
- *,
- max_value: int | None = None,
- fast_acquire: bool = False,
- ) -> Semaphore:
- return object.__new__(cls)
- def __init__(
- self,
- initial_value: int,
- *,
- max_value: int | None = None,
- fast_acquire: bool = False,
- ) -> None:
- super().__init__(initial_value, max_value=max_value, fast_acquire=fast_acquire)
- self.__original = trio.Semaphore(initial_value, max_value=max_value)
- async def acquire(self) -> None:
- if not self._fast_acquire:
- await self.__original.acquire()
- return
- # This is the "fast path" where we don't let other tasks run
- await trio.lowlevel.checkpoint_if_cancelled()
- try:
- self.__original.acquire_nowait()
- except trio.WouldBlock:
- await self.__original._lot.park()
- def acquire_nowait(self) -> None:
- try:
- self.__original.acquire_nowait()
- except trio.WouldBlock:
- raise WouldBlock from None
- @property
- def max_value(self) -> int | None:
- return self.__original.max_value
- @property
- def value(self) -> int:
- return self.__original.value
- def release(self) -> None:
- self.__original.release()
- def statistics(self) -> SemaphoreStatistics:
- orig_statistics = self.__original.statistics()
- return SemaphoreStatistics(orig_statistics.tasks_waiting)
- class CapacityLimiter(BaseCapacityLimiter):
- def __new__(
- cls,
- total_tokens: float | None = None,
- *,
- original: trio.CapacityLimiter | None = None,
- ) -> CapacityLimiter:
- return object.__new__(cls)
- def __init__(
- self,
- total_tokens: float | None = None,
- *,
- original: trio.CapacityLimiter | None = None,
- ) -> None:
- if original is not None:
- self.__original = original
- else:
- assert total_tokens is not None
- self.__original = trio.CapacityLimiter(total_tokens)
- async def __aenter__(self) -> None:
- return await self.__original.__aenter__()
- async def __aexit__(
- self,
- exc_type: type[BaseException] | None,
- exc_val: BaseException | None,
- exc_tb: TracebackType | None,
- ) -> None:
- await self.__original.__aexit__(exc_type, exc_val, exc_tb)
- @property
- def total_tokens(self) -> float:
- return self.__original.total_tokens
- @total_tokens.setter
- def total_tokens(self, value: float) -> None:
- self.__original.total_tokens = value
- @property
- def borrowed_tokens(self) -> int:
- return self.__original.borrowed_tokens
- @property
- def available_tokens(self) -> float:
- return self.__original.available_tokens
- def acquire_nowait(self) -> None:
- self.__original.acquire_nowait()
- def acquire_on_behalf_of_nowait(self, borrower: object) -> None:
- self.__original.acquire_on_behalf_of_nowait(borrower)
- async def acquire(self) -> None:
- await self.__original.acquire()
- async def acquire_on_behalf_of(self, borrower: object) -> None:
- await self.__original.acquire_on_behalf_of(borrower)
- def release(self) -> None:
- return self.__original.release()
- def release_on_behalf_of(self, borrower: object) -> None:
- return self.__original.release_on_behalf_of(borrower)
- def statistics(self) -> CapacityLimiterStatistics:
- orig = self.__original.statistics()
- return CapacityLimiterStatistics(
- borrowed_tokens=orig.borrowed_tokens,
- total_tokens=orig.total_tokens,
- borrowers=tuple(orig.borrowers),
- tasks_waiting=orig.tasks_waiting,
- )
- _capacity_limiter_wrapper: trio.lowlevel.RunVar = RunVar("_capacity_limiter_wrapper")
- #
- # Signal handling
- #
- class _SignalReceiver:
- _iterator: AsyncIterator[int]
- def __init__(self, signals: tuple[Signals, ...]):
- self._signals = signals
- def __enter__(self) -> _SignalReceiver:
- self._cm = trio.open_signal_receiver(*self._signals)
- self._iterator = self._cm.__enter__()
- return self
- def __exit__(
- self,
- exc_type: type[BaseException] | None,
- exc_val: BaseException | None,
- exc_tb: TracebackType | None,
- ) -> bool | None:
- return self._cm.__exit__(exc_type, exc_val, exc_tb)
- def __aiter__(self) -> _SignalReceiver:
- return self
- async def __anext__(self) -> Signals:
- signum = await self._iterator.__anext__()
- return Signals(signum)
- #
- # Testing and debugging
- #
- class TestRunner(abc.TestRunner):
- def __init__(self, **options: Any) -> None:
- from queue import Queue
- self._call_queue: Queue[Callable[[], object]] = Queue()
- self._send_stream: MemoryObjectSendStream | None = None
- self._options = options
- def __exit__(
- self,
- exc_type: type[BaseException] | None,
- exc_val: BaseException | None,
- exc_tb: types.TracebackType | None,
- ) -> None:
- if self._send_stream:
- self._send_stream.close()
- while self._send_stream is not None:
- self._call_queue.get()()
- async def _run_tests_and_fixtures(self) -> None:
- self._send_stream, receive_stream = create_memory_object_stream(1)
- with receive_stream:
- async for coro, outcome_holder in receive_stream:
- try:
- retval = await coro
- except BaseException as exc:
- outcome_holder.append(Error(exc))
- else:
- outcome_holder.append(Value(retval))
- def _main_task_finished(self, outcome: object) -> None:
- self._send_stream = None
- def _call_in_runner_task(
- self,
- func: Callable[P, Awaitable[T_Retval]],
- *args: P.args,
- **kwargs: P.kwargs,
- ) -> T_Retval:
- if self._send_stream is None:
- trio.lowlevel.start_guest_run(
- self._run_tests_and_fixtures,
- run_sync_soon_threadsafe=self._call_queue.put,
- done_callback=self._main_task_finished,
- **self._options,
- )
- while self._send_stream is None:
- self._call_queue.get()()
- outcome_holder: list[Outcome] = []
- self._send_stream.send_nowait((func(*args, **kwargs), outcome_holder))
- while not outcome_holder:
- self._call_queue.get()()
- return outcome_holder[0].unwrap()
- def run_asyncgen_fixture(
- self,
- fixture_func: Callable[..., AsyncGenerator[T_Retval, Any]],
- kwargs: dict[str, Any],
- ) -> Iterable[T_Retval]:
- asyncgen = fixture_func(**kwargs)
- fixturevalue: T_Retval = self._call_in_runner_task(asyncgen.asend, None)
- yield fixturevalue
- try:
- self._call_in_runner_task(asyncgen.asend, None)
- except StopAsyncIteration:
- pass
- else:
- self._call_in_runner_task(asyncgen.aclose)
- raise RuntimeError("Async generator fixture did not stop")
- def run_fixture(
- self,
- fixture_func: Callable[..., Coroutine[Any, Any, T_Retval]],
- kwargs: dict[str, Any],
- ) -> T_Retval:
- return self._call_in_runner_task(fixture_func, **kwargs)
- def run_test(
- self, test_func: Callable[..., Coroutine[Any, Any, Any]], kwargs: dict[str, Any]
- ) -> None:
- self._call_in_runner_task(test_func, **kwargs)
- class TrioTaskInfo(TaskInfo):
- def __init__(self, task: trio.lowlevel.Task):
- parent_id = None
- if task.parent_nursery and task.parent_nursery.parent_task:
- parent_id = id(task.parent_nursery.parent_task)
- super().__init__(id(task), parent_id, task.name, task.coro)
- self._task = weakref.proxy(task)
- def has_pending_cancellation(self) -> bool:
- try:
- return self._task._cancel_status.effectively_cancelled
- except ReferenceError:
- # If the task is no longer around, it surely doesn't have a cancellation
- # pending
- return False
- class TrioBackend(AsyncBackend):
- @classmethod
- def run(
- cls,
- func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
- args: tuple[Unpack[PosArgsT]],
- kwargs: dict[str, Any],
- options: dict[str, Any],
- ) -> T_Retval:
- return trio.run(func, *args)
- @classmethod
- def current_token(cls) -> object:
- return trio.lowlevel.current_trio_token()
- @classmethod
- def current_time(cls) -> float:
- return trio.current_time()
- @classmethod
- def cancelled_exception_class(cls) -> type[BaseException]:
- return trio.Cancelled
- @classmethod
- async def checkpoint(cls) -> None:
- await trio.lowlevel.checkpoint()
- @classmethod
- async def checkpoint_if_cancelled(cls) -> None:
- await trio.lowlevel.checkpoint_if_cancelled()
- @classmethod
- async def cancel_shielded_checkpoint(cls) -> None:
- await trio.lowlevel.cancel_shielded_checkpoint()
- @classmethod
- async def sleep(cls, delay: float) -> None:
- await trio.sleep(delay)
- @classmethod
- def create_cancel_scope(
- cls, *, deadline: float = math.inf, shield: bool = False
- ) -> abc.CancelScope:
- return CancelScope(deadline=deadline, shield=shield)
- @classmethod
- def current_effective_deadline(cls) -> float:
- return trio.current_effective_deadline()
- @classmethod
- def create_task_group(cls) -> abc.TaskGroup:
- return TaskGroup()
- @classmethod
- def create_event(cls) -> abc.Event:
- return Event()
- @classmethod
- def create_lock(cls, *, fast_acquire: bool) -> Lock:
- return Lock(fast_acquire=fast_acquire)
- @classmethod
- def create_semaphore(
- cls,
- initial_value: int,
- *,
- max_value: int | None = None,
- fast_acquire: bool = False,
- ) -> abc.Semaphore:
- return Semaphore(initial_value, max_value=max_value, fast_acquire=fast_acquire)
- @classmethod
- def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter:
- return CapacityLimiter(total_tokens)
- @classmethod
- async def run_sync_in_worker_thread(
- cls,
- func: Callable[[Unpack[PosArgsT]], T_Retval],
- args: tuple[Unpack[PosArgsT]],
- abandon_on_cancel: bool = False,
- limiter: abc.CapacityLimiter | None = None,
- ) -> T_Retval:
- def wrapper() -> T_Retval:
- with claim_worker_thread(TrioBackend, token):
- return func(*args)
- token = TrioBackend.current_token()
- return await run_sync(
- wrapper,
- abandon_on_cancel=abandon_on_cancel,
- limiter=cast(trio.CapacityLimiter, limiter),
- )
- @classmethod
- def check_cancelled(cls) -> None:
- trio.from_thread.check_cancelled()
- @classmethod
- def run_async_from_thread(
- cls,
- func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
- args: tuple[Unpack[PosArgsT]],
- token: object,
- ) -> T_Retval:
- return trio.from_thread.run(func, *args)
- @classmethod
- def run_sync_from_thread(
- cls,
- func: Callable[[Unpack[PosArgsT]], T_Retval],
- args: tuple[Unpack[PosArgsT]],
- token: object,
- ) -> T_Retval:
- return trio.from_thread.run_sync(func, *args)
- @classmethod
- def create_blocking_portal(cls) -> abc.BlockingPortal:
- return BlockingPortal()
- @classmethod
- async def open_process(
- cls,
- command: StrOrBytesPath | Sequence[StrOrBytesPath],
- *,
- stdin: int | IO[Any] | None,
- stdout: int | IO[Any] | None,
- stderr: int | IO[Any] | None,
- **kwargs: Any,
- ) -> Process:
- def convert_item(item: StrOrBytesPath) -> str:
- str_or_bytes = os.fspath(item)
- if isinstance(str_or_bytes, str):
- return str_or_bytes
- else:
- return os.fsdecode(str_or_bytes)
- if isinstance(command, (str, bytes, PathLike)):
- process = await trio.lowlevel.open_process(
- convert_item(command),
- stdin=stdin,
- stdout=stdout,
- stderr=stderr,
- shell=True,
- **kwargs,
- )
- else:
- process = await trio.lowlevel.open_process(
- [convert_item(item) for item in command],
- stdin=stdin,
- stdout=stdout,
- stderr=stderr,
- shell=False,
- **kwargs,
- )
- stdin_stream = SendStreamWrapper(process.stdin) if process.stdin else None
- stdout_stream = ReceiveStreamWrapper(process.stdout) if process.stdout else None
- stderr_stream = ReceiveStreamWrapper(process.stderr) if process.stderr else None
- return Process(process, stdin_stream, stdout_stream, stderr_stream)
- @classmethod
- def setup_process_pool_exit_at_shutdown(cls, workers: set[abc.Process]) -> None:
- trio.lowlevel.spawn_system_task(_shutdown_process_pool, workers)
- @classmethod
- async def connect_tcp(
- cls, host: str, port: int, local_address: IPSockAddrType | None = None
- ) -> SocketStream:
- family = socket.AF_INET6 if ":" in host else socket.AF_INET
- trio_socket = trio.socket.socket(family)
- trio_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
- if local_address:
- await trio_socket.bind(local_address)
- try:
- await trio_socket.connect((host, port))
- except BaseException:
- trio_socket.close()
- raise
- return SocketStream(trio_socket)
- @classmethod
- async def connect_unix(cls, path: str | bytes) -> abc.UNIXSocketStream:
- trio_socket = trio.socket.socket(socket.AF_UNIX)
- try:
- await trio_socket.connect(path)
- except BaseException:
- trio_socket.close()
- raise
- return UNIXSocketStream(trio_socket)
- @classmethod
- def create_tcp_listener(cls, sock: socket.socket) -> abc.SocketListener:
- return TCPSocketListener(sock)
- @classmethod
- def create_unix_listener(cls, sock: socket.socket) -> abc.SocketListener:
- return UNIXSocketListener(sock)
- @classmethod
- async def create_udp_socket(
- cls,
- family: socket.AddressFamily,
- local_address: IPSockAddrType | None,
- remote_address: IPSockAddrType | None,
- reuse_port: bool,
- ) -> UDPSocket | ConnectedUDPSocket:
- trio_socket = trio.socket.socket(family=family, type=socket.SOCK_DGRAM)
- if reuse_port:
- trio_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
- if local_address:
- await trio_socket.bind(local_address)
- if remote_address:
- await trio_socket.connect(remote_address)
- return ConnectedUDPSocket(trio_socket)
- else:
- return UDPSocket(trio_socket)
- @classmethod
- @overload
- async def create_unix_datagram_socket(
- cls, raw_socket: socket.socket, remote_path: None
- ) -> abc.UNIXDatagramSocket: ...
- @classmethod
- @overload
- async def create_unix_datagram_socket(
- cls, raw_socket: socket.socket, remote_path: str | bytes
- ) -> abc.ConnectedUNIXDatagramSocket: ...
- @classmethod
- async def create_unix_datagram_socket(
- cls, raw_socket: socket.socket, remote_path: str | bytes | None
- ) -> abc.UNIXDatagramSocket | abc.ConnectedUNIXDatagramSocket:
- trio_socket = trio.socket.from_stdlib_socket(raw_socket)
- if remote_path:
- await trio_socket.connect(remote_path)
- return ConnectedUNIXDatagramSocket(trio_socket)
- else:
- return UNIXDatagramSocket(trio_socket)
- @classmethod
- async def getaddrinfo(
- cls,
- host: bytes | str | None,
- port: str | int | None,
- *,
- family: int | AddressFamily = 0,
- type: int | SocketKind = 0,
- proto: int = 0,
- flags: int = 0,
- ) -> Sequence[
- tuple[
- AddressFamily,
- SocketKind,
- int,
- str,
- tuple[str, int] | tuple[str, int, int, int] | tuple[int, bytes],
- ]
- ]:
- return await trio.socket.getaddrinfo(host, port, family, type, proto, flags)
- @classmethod
- async def getnameinfo(
- cls, sockaddr: IPSockAddrType, flags: int = 0
- ) -> tuple[str, str]:
- return await trio.socket.getnameinfo(sockaddr, flags)
- @classmethod
- async def wait_readable(cls, obj: HasFileno | int) -> None:
- try:
- await wait_readable(obj)
- except trio.ClosedResourceError as exc:
- raise ClosedResourceError().with_traceback(exc.__traceback__) from None
- except trio.BusyResourceError:
- raise BusyResourceError("reading from") from None
- @classmethod
- async def wait_writable(cls, obj: HasFileno | int) -> None:
- try:
- await wait_writable(obj)
- except trio.ClosedResourceError as exc:
- raise ClosedResourceError().with_traceback(exc.__traceback__) from None
- except trio.BusyResourceError:
- raise BusyResourceError("writing to") from None
- @classmethod
- def current_default_thread_limiter(cls) -> CapacityLimiter:
- try:
- return _capacity_limiter_wrapper.get()
- except LookupError:
- limiter = CapacityLimiter(
- original=trio.to_thread.current_default_thread_limiter()
- )
- _capacity_limiter_wrapper.set(limiter)
- return limiter
- @classmethod
- def open_signal_receiver(
- cls, *signals: Signals
- ) -> AbstractContextManager[AsyncIterator[Signals]]:
- return _SignalReceiver(signals)
- @classmethod
- def get_current_task(cls) -> TaskInfo:
- task = current_task()
- return TrioTaskInfo(task)
- @classmethod
- def get_running_tasks(cls) -> Sequence[TaskInfo]:
- root_task = current_root_task()
- assert root_task
- task_infos = [TrioTaskInfo(root_task)]
- nurseries = root_task.child_nurseries
- while nurseries:
- new_nurseries: list[trio.Nursery] = []
- for nursery in nurseries:
- for task in nursery.child_tasks:
- task_infos.append(TrioTaskInfo(task))
- new_nurseries.extend(task.child_nurseries)
- nurseries = new_nurseries
- return task_infos
- @classmethod
- async def wait_all_tasks_blocked(cls) -> None:
- from trio.testing import wait_all_tasks_blocked
- await wait_all_tasks_blocked()
- @classmethod
- def create_test_runner(cls, options: dict[str, Any]) -> TestRunner:
- return TestRunner(**options)
- backend_class = TrioBackend
|