to_interpreter.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. from __future__ import annotations
  2. import atexit
  3. import os
  4. import pickle
  5. import sys
  6. from collections import deque
  7. from collections.abc import Callable
  8. from textwrap import dedent
  9. from typing import Any, Final, TypeVar
  10. from . import current_time, to_thread
  11. from ._core._exceptions import BrokenWorkerIntepreter
  12. from ._core._synchronization import CapacityLimiter
  13. from .lowlevel import RunVar
  14. if sys.version_info >= (3, 11):
  15. from typing import TypeVarTuple, Unpack
  16. else:
  17. from typing_extensions import TypeVarTuple, Unpack
  18. UNBOUND: Final = 2 # I have no clue how this works, but it was used in the stdlib
  19. FMT_UNPICKLED: Final = 0
  20. FMT_PICKLED: Final = 1
  21. DEFAULT_CPU_COUNT: Final = 8 # this is just an arbitrarily selected value
  22. MAX_WORKER_IDLE_TIME = (
  23. 30 # seconds a subinterpreter can be idle before becoming eligible for pruning
  24. )
  25. T_Retval = TypeVar("T_Retval")
  26. PosArgsT = TypeVarTuple("PosArgsT")
  27. _idle_workers = RunVar[deque["Worker"]]("_available_workers")
  28. _default_interpreter_limiter = RunVar[CapacityLimiter]("_default_interpreter_limiter")
  29. class Worker:
  30. _run_func = compile(
  31. dedent("""
  32. import _interpqueues as queues
  33. import _interpreters as interpreters
  34. from pickle import loads, dumps, HIGHEST_PROTOCOL
  35. item = queues.get(queue_id)[0]
  36. try:
  37. func, args = loads(item)
  38. retval = func(*args)
  39. except BaseException as exc:
  40. is_exception = True
  41. retval = exc
  42. else:
  43. is_exception = False
  44. try:
  45. queues.put(queue_id, (retval, is_exception), FMT_UNPICKLED, UNBOUND)
  46. except interpreters.NotShareableError:
  47. retval = dumps(retval, HIGHEST_PROTOCOL)
  48. queues.put(queue_id, (retval, is_exception), FMT_PICKLED, UNBOUND)
  49. """),
  50. "<string>",
  51. "exec",
  52. )
  53. last_used: float = 0
  54. _initialized: bool = False
  55. _interpreter_id: int
  56. _queue_id: int
  57. def initialize(self) -> None:
  58. import _interpqueues as queues
  59. import _interpreters as interpreters
  60. self._interpreter_id = interpreters.create()
  61. self._queue_id = queues.create(2, FMT_UNPICKLED, UNBOUND)
  62. self._initialized = True
  63. interpreters.set___main___attrs(
  64. self._interpreter_id,
  65. {
  66. "queue_id": self._queue_id,
  67. "FMT_PICKLED": FMT_PICKLED,
  68. "FMT_UNPICKLED": FMT_UNPICKLED,
  69. "UNBOUND": UNBOUND,
  70. },
  71. )
  72. def destroy(self) -> None:
  73. import _interpqueues as queues
  74. import _interpreters as interpreters
  75. if self._initialized:
  76. interpreters.destroy(self._interpreter_id)
  77. queues.destroy(self._queue_id)
  78. def _call(
  79. self,
  80. func: Callable[..., T_Retval],
  81. args: tuple[Any],
  82. ) -> tuple[Any, bool]:
  83. import _interpqueues as queues
  84. import _interpreters as interpreters
  85. if not self._initialized:
  86. self.initialize()
  87. payload = pickle.dumps((func, args), pickle.HIGHEST_PROTOCOL)
  88. queues.put(self._queue_id, payload, FMT_PICKLED, UNBOUND)
  89. res: Any
  90. is_exception: bool
  91. if exc_info := interpreters.exec(self._interpreter_id, self._run_func):
  92. raise BrokenWorkerIntepreter(exc_info)
  93. (res, is_exception), fmt = queues.get(self._queue_id)[:2]
  94. if fmt == FMT_PICKLED:
  95. res = pickle.loads(res)
  96. return res, is_exception
  97. async def call(
  98. self,
  99. func: Callable[..., T_Retval],
  100. args: tuple[Any],
  101. limiter: CapacityLimiter,
  102. ) -> T_Retval:
  103. result, is_exception = await to_thread.run_sync(
  104. self._call,
  105. func,
  106. args,
  107. limiter=limiter,
  108. )
  109. if is_exception:
  110. raise result
  111. return result
  112. def _stop_workers(workers: deque[Worker]) -> None:
  113. for worker in workers:
  114. worker.destroy()
  115. workers.clear()
  116. async def run_sync(
  117. func: Callable[[Unpack[PosArgsT]], T_Retval],
  118. *args: Unpack[PosArgsT],
  119. limiter: CapacityLimiter | None = None,
  120. ) -> T_Retval:
  121. """
  122. Call the given function with the given arguments in a subinterpreter.
  123. If the ``cancellable`` option is enabled and the task waiting for its completion is
  124. cancelled, the call will still run its course but its return value (or any raised
  125. exception) will be ignored.
  126. .. warning:: This feature is **experimental**. The upstream interpreter API has not
  127. yet been finalized or thoroughly tested, so don't rely on this for anything
  128. mission critical.
  129. :param func: a callable
  130. :param args: positional arguments for the callable
  131. :param limiter: capacity limiter to use to limit the total amount of subinterpreters
  132. running (if omitted, the default limiter is used)
  133. :return: the result of the call
  134. :raises BrokenWorkerIntepreter: if there's an internal error in a subinterpreter
  135. """
  136. if sys.version_info <= (3, 13):
  137. raise RuntimeError("subinterpreters require at least Python 3.13")
  138. if limiter is None:
  139. limiter = current_default_interpreter_limiter()
  140. try:
  141. idle_workers = _idle_workers.get()
  142. except LookupError:
  143. idle_workers = deque()
  144. _idle_workers.set(idle_workers)
  145. atexit.register(_stop_workers, idle_workers)
  146. async with limiter:
  147. try:
  148. worker = idle_workers.pop()
  149. except IndexError:
  150. worker = Worker()
  151. try:
  152. return await worker.call(func, args, limiter)
  153. finally:
  154. # Prune workers that have been idle for too long
  155. now = current_time()
  156. while idle_workers:
  157. if now - idle_workers[0].last_used <= MAX_WORKER_IDLE_TIME:
  158. break
  159. await to_thread.run_sync(idle_workers.popleft().destroy, limiter=limiter)
  160. worker.last_used = current_time()
  161. idle_workers.append(worker)
  162. def current_default_interpreter_limiter() -> CapacityLimiter:
  163. """
  164. Return the capacity limiter that is used by default to limit the number of
  165. concurrently running subinterpreters.
  166. Defaults to the number of CPU cores.
  167. :return: a capacity limiter object
  168. """
  169. try:
  170. return _default_interpreter_limiter.get()
  171. except LookupError:
  172. limiter = CapacityLimiter(os.cpu_count() or DEFAULT_CPU_COUNT)
  173. _default_interpreter_limiter.set(limiter)
  174. return limiter