concurrency.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486
  1. # Copyright (c) "Neo4j"
  2. # Neo4j Sweden AB [https://neo4j.com]
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # https://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from __future__ import annotations
  16. import asyncio
  17. import collections
  18. import re
  19. import threading
  20. import typing as t
  21. if t.TYPE_CHECKING:
  22. import typing_extensions as te
  23. from .shims import wait_for
  24. __all__ = [
  25. "AsyncCondition",
  26. "AsyncCooperativeLock",
  27. "AsyncCooperativeRLock",
  28. "AsyncLock",
  29. "AsyncRLock",
  30. "Condition",
  31. "CooperativeLock",
  32. "CooperativeRLock",
  33. "Lock",
  34. "RLock",
  35. ]
  36. AsyncLock = asyncio.Lock
  37. class AsyncRLock(asyncio.Lock):
  38. """
  39. Reentrant asyncio.lock.
  40. Inspired by Python's RLock implementation.
  41. .. warning::
  42. In async Python there are no threads. This implementation uses
  43. :meth:`asyncio.current_task` to determine the owner of the lock. This
  44. means that the owner changes when using :meth:`asyncio.wait_for` or
  45. any other method that wraps the work in a new :class:`asyncio.Task`.
  46. """
  47. _WAITERS_RE = re.compile(r"(?:\W|^)waiters[:=](\d+)(?:\W|$)")
  48. def __init__(self, *args, **kwargs):
  49. super().__init__(*args, **kwargs)
  50. self._owner = None
  51. self._count = 0
  52. def __repr__(self):
  53. res = object.__repr__(self)
  54. lock_repr = super().__repr__()
  55. extra = "locked" if self._count > 0 else "unlocked"
  56. extra += f" count={self._count}"
  57. waiters_match = self._WAITERS_RE.search(lock_repr)
  58. if waiters_match:
  59. extra += f" waiters={waiters_match.group(1)}"
  60. if self._owner:
  61. extra += f" owner={self._owner}"
  62. return f"<{res[1:-1]} [{extra}]>"
  63. def is_owner(self, task=None):
  64. if task is None:
  65. task = asyncio.current_task()
  66. return self._owner == task
  67. async def _acquire_non_blocking(self, me):
  68. if self.is_owner(task=me):
  69. self._count += 1
  70. return True
  71. acquire_coro = super().acquire()
  72. task = asyncio.ensure_future(acquire_coro)
  73. # yielding one cycle is as close to non-blocking as it gets
  74. # (at least without implementing the lock from the ground up)
  75. try:
  76. await asyncio.sleep(0)
  77. except asyncio.CancelledError:
  78. # This is emulating non-blocking. There is no cancelling this!
  79. # Still, we don't want to silently swallow the cancellation.
  80. # Hence, we flag this task as cancelled again, so that the next
  81. # `await` will raise the CancelledError.
  82. asyncio.current_task().cancel()
  83. if task.done():
  84. exception = task.exception()
  85. if exception is None:
  86. self._owner = me
  87. self._count = 1
  88. return True
  89. else:
  90. raise exception
  91. task.cancel()
  92. return False
  93. async def _acquire(self, me):
  94. if self.is_owner(task=me):
  95. self._count += 1
  96. return
  97. await super().acquire()
  98. self._owner = me
  99. self._count = 1
  100. async def acquire(self, blocking=True, timeout=-1):
  101. """Acquire the lock."""
  102. me = asyncio.current_task()
  103. if timeout < 0 and timeout != -1:
  104. raise ValueError("timeout value must be positive")
  105. if not blocking and timeout != -1:
  106. raise ValueError("can't specify a timeout for a non-blocking call")
  107. if not blocking:
  108. return await self._acquire_non_blocking(me)
  109. if blocking and timeout == -1:
  110. await self._acquire(me)
  111. return True
  112. try:
  113. fut = asyncio.ensure_future(self._acquire(me))
  114. try:
  115. await wait_for(fut, timeout)
  116. except asyncio.CancelledError:
  117. if fut.cancelled():
  118. raise
  119. already_finished = not fut.cancel()
  120. if already_finished:
  121. # Too late to cancel the acquisition.
  122. # This can only happen in Python 3.7's asyncio
  123. # as well as in our wait_for shim.
  124. self._release(me)
  125. raise
  126. return True
  127. except asyncio.TimeoutError:
  128. return False
  129. __aenter__ = acquire
  130. def _release(self, me):
  131. if not self.is_owner(task=me):
  132. if self._owner is None:
  133. raise RuntimeError("Cannot release un-acquired lock.")
  134. raise RuntimeError("Cannot release foreign lock.")
  135. self._count -= 1
  136. if not self._count:
  137. self._owner = None
  138. super().release()
  139. def release(self):
  140. """Release the lock."""
  141. me = asyncio.current_task()
  142. return self._release(me)
  143. async def __aexit__(self, t, v, tb):
  144. self.release()
  145. class AsyncCooperativeLock:
  146. """
  147. Lock placeholder for asyncio Python when working fully cooperatively.
  148. This lock doesn't do anything in async Python. Its threaded counterpart,
  149. however, is an ordinary :class:`threading.Lock`.
  150. The AsyncCooperativeLock only works if there is no await being used
  151. while the lock is held.
  152. """
  153. def __init__(self):
  154. self._locked = False
  155. def __repr__(self):
  156. res = super().__repr__()
  157. extra = "locked" if self._locked else "unlocked"
  158. return f"<{res[1:-1]} [{extra}]>"
  159. def locked(self):
  160. """Return True if lock is acquired."""
  161. return self._locked
  162. def acquire(self):
  163. """
  164. Acquire a lock.
  165. This method will raise a RuntimeError where an ordinary
  166. (non-placeholder) lock would need to block. I.e., when the lock is
  167. already taken.
  168. Returns True if the lock was successfully acquired.
  169. """
  170. if self._locked:
  171. raise RuntimeError("Cannot acquire a locked cooperative lock.")
  172. self._locked = True
  173. return True
  174. def release(self):
  175. """
  176. Release a lock.
  177. When the lock is locked, reset it to unlocked, and return.
  178. When invoked on an unlocked lock, a RuntimeError is raised.
  179. There is no return value.
  180. """
  181. if self._locked:
  182. self._locked = False
  183. else:
  184. raise RuntimeError("Lock is not acquired.")
  185. __enter__ = acquire
  186. def __exit__(self, t, v, tb):
  187. self.release()
  188. async def __aenter__(self):
  189. return self.__enter__()
  190. async def __aexit__(self, t, v, tb):
  191. self.__exit__(t, v, tb)
  192. class AsyncCooperativeRLock:
  193. """
  194. Reentrant lock placeholder for cooperative asyncio Python.
  195. This lock doesn't do anything in async Python. It's threaded counterpart,
  196. however, is an ordinary :class:`threading.Lock`.
  197. The AsyncCooperativeLock only works if there is no await being used
  198. while the lock is acquired.
  199. """
  200. def __init__(self):
  201. self._owner = None
  202. self._count = 0
  203. def __repr__(self):
  204. res = super().__repr__()
  205. if self._owner is not None:
  206. extra = f"locked {self._count} times by owner:{self._owner}"
  207. else:
  208. extra = "unlocked"
  209. return f"<{res[1:-1]} [{extra}]>"
  210. def locked(self):
  211. """Return True if lock is acquired."""
  212. return self._owner is not None
  213. def acquire(self):
  214. """
  215. Acquire a lock.
  216. This method will raise a RuntimeError where an ordinary
  217. (non-placeholder) lock would need to block. I.e., when the lock is
  218. already taken by another Task.
  219. Returns True if the lock was successfully acquired.
  220. """
  221. me = asyncio.current_task()
  222. if self._owner is None:
  223. self._owner = me
  224. self._count = 1
  225. return True
  226. if self._owner is me:
  227. self._count += 1
  228. return True
  229. raise RuntimeError("Cannot acquire a foreign locked cooperative lock.")
  230. def release(self):
  231. """
  232. Release a lock.
  233. When the lock is locked, reset it to unlocked, and return.
  234. When invoked on an unlocked or foreign lock, a RuntimeError is raised.
  235. There is no return value.
  236. """
  237. me = asyncio.current_task()
  238. if self._owner is None:
  239. raise RuntimeError("Lock is not acquired.")
  240. if self._owner is not me:
  241. raise RuntimeError("Cannot release a foreign lock.")
  242. self._count -= 1
  243. if not self._count:
  244. self._owner = None
  245. __enter__ = acquire
  246. def __exit__(self, t, v, tb):
  247. self.release()
  248. class AsyncCondition:
  249. """
  250. Asynchronous equivalent to threading.Condition.
  251. This class implements condition variable objects. A condition variable
  252. allows one or more coroutines to wait until they are notified by another
  253. coroutine.
  254. A new Lock object is created and used as the underlying lock.
  255. """
  256. # copied and modified from Python 3.11's asyncio package
  257. # to add support for `.wait(timeout)` and cooperative locks
  258. # Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
  259. # 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022
  260. # Python Software Foundation;
  261. # All Rights Reserved
  262. def __init__(self, lock=None):
  263. if lock is None:
  264. lock = AsyncLock()
  265. self._lock = lock
  266. # Export the lock's locked(), acquire() and release() methods.
  267. self.locked = lock.locked
  268. self.acquire = lock.acquire
  269. self.release = lock.release
  270. self._waiters = collections.deque()
  271. _loop = None
  272. _loop_lock = threading.Lock()
  273. def _get_loop(self):
  274. try:
  275. loop = asyncio.get_running_loop()
  276. except RuntimeError:
  277. loop = None
  278. if self._loop is None:
  279. with self._loop_lock:
  280. if self._loop is None:
  281. self._loop = loop
  282. if loop is not self._loop:
  283. raise RuntimeError(f"{self!r} is bound to a different event loop")
  284. return loop
  285. async def __aenter__(self):
  286. if isinstance(
  287. self._lock, (AsyncCooperativeLock, AsyncCooperativeRLock)
  288. ):
  289. self._lock.acquire()
  290. else:
  291. await self.acquire()
  292. async def __aexit__(self, exc_type, exc, tb):
  293. self.release()
  294. def __repr__(self):
  295. res = super().__repr__()
  296. extra = "locked" if self.locked() else "unlocked"
  297. if self._waiters:
  298. extra = f"{extra}, waiters:{len(self._waiters)}"
  299. return f"<{res[1:-1]} [{extra}]>"
  300. async def _wait(self, timeout=None, me=None):
  301. """
  302. Wait until notified.
  303. If the calling coroutine has not acquired the lock when this
  304. method is called, a RuntimeError is raised.
  305. This method releases the underlying lock, and then blocks
  306. until it is awakened by a notify() or notify_all() call for
  307. the same condition variable in another coroutine. Once
  308. awakened, it re-acquires the lock and returns True.
  309. """
  310. if not self.locked():
  311. raise RuntimeError("cannot wait on un-acquired lock")
  312. cancelled = False
  313. if isinstance(self._lock, AsyncRLock):
  314. self._lock._release(me)
  315. else:
  316. self._lock.release()
  317. try:
  318. fut = self._get_loop().create_future()
  319. self._waiters.append(fut)
  320. try:
  321. await wait_for(fut, timeout)
  322. return True
  323. except asyncio.TimeoutError:
  324. return False
  325. except asyncio.CancelledError:
  326. cancelled = True
  327. raise
  328. finally:
  329. self._waiters.remove(fut)
  330. finally:
  331. # Must reacquire lock even if wait is cancelled
  332. if isinstance(
  333. self._lock, (AsyncCooperativeLock, AsyncCooperativeRLock)
  334. ):
  335. self._lock.acquire()
  336. else:
  337. while True:
  338. try:
  339. if isinstance(self._lock, AsyncRLock):
  340. await self._lock._acquire(me)
  341. else:
  342. await self._lock.acquire()
  343. break
  344. except asyncio.CancelledError:
  345. cancelled = True
  346. if cancelled:
  347. raise asyncio.CancelledError
  348. async def wait(self, timeout=None):
  349. me = asyncio.current_task()
  350. return await self._wait(timeout=timeout, me=me)
  351. async def wait_for(self, predicate):
  352. """
  353. Wait until a predicate becomes true.
  354. The predicate should be a callable which result will be
  355. interpreted as a boolean value. The final predicate value is
  356. the return value.
  357. """
  358. result = predicate()
  359. while not result:
  360. await self.wait()
  361. result = predicate()
  362. return result
  363. def notify(self, n=1):
  364. """
  365. Wake up a single threads waiting on this condition.
  366. By default, wake up one coroutine waiting on this condition, if any.
  367. If the calling coroutine has not acquired the lock when this method
  368. is called, a RuntimeError is raised.
  369. This method wakes up at most n of the coroutines waiting for the
  370. condition variable; it is a no-op if no coroutines are waiting.
  371. Note: an awakened coroutine does not actually return from its
  372. wait() call until it can reacquire the lock. Since notify() does
  373. not release the lock, its caller should.
  374. """
  375. if not self.locked():
  376. raise RuntimeError("cannot notify on un-acquired lock")
  377. idx = 0
  378. for fut in self._waiters:
  379. if idx >= n:
  380. break
  381. if not fut.done():
  382. idx += 1
  383. fut.set_result(False)
  384. def notify_all(self):
  385. """
  386. Wake up all threads waiting on this condition.
  387. This method acts like notify(), but wakes up all waiting threads
  388. instead of one. If the calling thread has not acquired the lock when
  389. this method is called, a RuntimeError is raised.
  390. """
  391. self.notify(len(self._waiters))
  392. Condition: te.TypeAlias = threading.Condition
  393. CooperativeLock: te.TypeAlias = threading.Lock
  394. Lock: te.TypeAlias = threading.Lock
  395. CooperativeRLock: te.TypeAlias = threading.RLock
  396. RLock: te.TypeAlias = threading.RLock