test_leaks.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443
  1. # -*- coding: utf-8 -*-
  2. """
  3. Testing scenarios that may have leaked.
  4. """
  5. from __future__ import print_function, absolute_import, division
  6. import sys
  7. import gc
  8. import time
  9. import weakref
  10. import threading
  11. import greenlet
  12. from . import TestCase
  13. from .leakcheck import fails_leakcheck
  14. from .leakcheck import ignores_leakcheck
  15. from .leakcheck import RUNNING_ON_MANYLINUX
  16. # pylint:disable=protected-access
  17. assert greenlet.GREENLET_USE_GC # Option to disable this was removed in 1.0
  18. class HasFinalizerTracksInstances(object):
  19. EXTANT_INSTANCES = set()
  20. def __init__(self, msg):
  21. self.msg = sys.intern(msg)
  22. self.EXTANT_INSTANCES.add(id(self))
  23. def __del__(self):
  24. self.EXTANT_INSTANCES.remove(id(self))
  25. def __repr__(self):
  26. return "<HasFinalizerTracksInstances at 0x%x %r>" % (
  27. id(self), self.msg
  28. )
  29. @classmethod
  30. def reset(cls):
  31. cls.EXTANT_INSTANCES.clear()
  32. class TestLeaks(TestCase):
  33. def test_arg_refs(self):
  34. args = ('a', 'b', 'c')
  35. refcount_before = sys.getrefcount(args)
  36. # pylint:disable=unnecessary-lambda
  37. g = greenlet.greenlet(
  38. lambda *args: greenlet.getcurrent().parent.switch(*args))
  39. for _ in range(100):
  40. g.switch(*args)
  41. self.assertEqual(sys.getrefcount(args), refcount_before)
  42. def test_kwarg_refs(self):
  43. kwargs = {}
  44. # pylint:disable=unnecessary-lambda
  45. g = greenlet.greenlet(
  46. lambda **kwargs: greenlet.getcurrent().parent.switch(**kwargs))
  47. for _ in range(100):
  48. g.switch(**kwargs)
  49. self.assertEqual(sys.getrefcount(kwargs), 2)
  50. @staticmethod
  51. def __recycle_threads():
  52. # By introducing a thread that does sleep we allow other threads,
  53. # that have triggered their __block condition, but did not have a
  54. # chance to deallocate their thread state yet, to finally do so.
  55. # The way it works is by requiring a GIL switch (different thread),
  56. # which does a GIL release (sleep), which might do a GIL switch
  57. # to finished threads and allow them to clean up.
  58. def worker():
  59. time.sleep(0.001)
  60. t = threading.Thread(target=worker)
  61. t.start()
  62. time.sleep(0.001)
  63. t.join(10)
  64. def test_threaded_leak(self):
  65. gg = []
  66. def worker():
  67. # only main greenlet present
  68. gg.append(weakref.ref(greenlet.getcurrent()))
  69. for _ in range(2):
  70. t = threading.Thread(target=worker)
  71. t.start()
  72. t.join(10)
  73. del t
  74. greenlet.getcurrent() # update ts_current
  75. self.__recycle_threads()
  76. greenlet.getcurrent() # update ts_current
  77. gc.collect()
  78. greenlet.getcurrent() # update ts_current
  79. for g in gg:
  80. self.assertIsNone(g())
  81. def test_threaded_adv_leak(self):
  82. gg = []
  83. def worker():
  84. # main and additional *finished* greenlets
  85. ll = greenlet.getcurrent().ll = []
  86. def additional():
  87. ll.append(greenlet.getcurrent())
  88. for _ in range(2):
  89. greenlet.greenlet(additional).switch()
  90. gg.append(weakref.ref(greenlet.getcurrent()))
  91. for _ in range(2):
  92. t = threading.Thread(target=worker)
  93. t.start()
  94. t.join(10)
  95. del t
  96. greenlet.getcurrent() # update ts_current
  97. self.__recycle_threads()
  98. greenlet.getcurrent() # update ts_current
  99. gc.collect()
  100. greenlet.getcurrent() # update ts_current
  101. for g in gg:
  102. self.assertIsNone(g())
  103. def assertClocksUsed(self):
  104. used = greenlet._greenlet.get_clocks_used_doing_optional_cleanup()
  105. self.assertGreaterEqual(used, 0)
  106. # we don't lose the value
  107. greenlet._greenlet.enable_optional_cleanup(True)
  108. used2 = greenlet._greenlet.get_clocks_used_doing_optional_cleanup()
  109. self.assertEqual(used, used2)
  110. self.assertGreater(greenlet._greenlet.CLOCKS_PER_SEC, 1)
  111. def _check_issue251(self,
  112. manually_collect_background=True,
  113. explicit_reference_to_switch=False):
  114. # See https://github.com/python-greenlet/greenlet/issues/251
  115. # Killing a greenlet (probably not the main one)
  116. # in one thread from another thread would
  117. # result in leaking a list (the ts_delkey list).
  118. # We no longer use lists to hold that stuff, though.
  119. # For the test to be valid, even empty lists have to be tracked by the
  120. # GC
  121. assert gc.is_tracked([])
  122. HasFinalizerTracksInstances.reset()
  123. greenlet.getcurrent()
  124. greenlets_before = self.count_objects(greenlet.greenlet, exact_kind=False)
  125. background_glet_running = threading.Event()
  126. background_glet_killed = threading.Event()
  127. background_greenlets = []
  128. # XXX: Switching this to a greenlet subclass that overrides
  129. # run results in all callers failing the leaktest; that
  130. # greenlet instance is leaked. There's a bound method for
  131. # run() living on the stack of the greenlet in g_initialstub,
  132. # and since we don't manually switch back to the background
  133. # greenlet to let it "fall off the end" and exit the
  134. # g_initialstub function, it never gets cleaned up. Making the
  135. # garbage collector aware of this bound method (making it an
  136. # attribute of the greenlet structure and traversing into it)
  137. # doesn't help, for some reason.
  138. def background_greenlet():
  139. # Throw control back to the main greenlet.
  140. jd = HasFinalizerTracksInstances("DELETING STACK OBJECT")
  141. greenlet._greenlet.set_thread_local(
  142. 'test_leaks_key',
  143. HasFinalizerTracksInstances("DELETING THREAD STATE"))
  144. # Explicitly keeping 'switch' in a local variable
  145. # breaks this test in all versions
  146. if explicit_reference_to_switch:
  147. s = greenlet.getcurrent().parent.switch
  148. s([jd])
  149. else:
  150. greenlet.getcurrent().parent.switch([jd])
  151. bg_main_wrefs = []
  152. def background_thread():
  153. glet = greenlet.greenlet(background_greenlet)
  154. bg_main_wrefs.append(weakref.ref(glet.parent))
  155. background_greenlets.append(glet)
  156. glet.switch() # Be sure it's active.
  157. # Control is ours again.
  158. del glet # Delete one reference from the thread it runs in.
  159. background_glet_running.set()
  160. background_glet_killed.wait(10)
  161. # To trigger the background collection of the dead
  162. # greenlet, thus clearing out the contents of the list, we
  163. # need to run some APIs. See issue 252.
  164. if manually_collect_background:
  165. greenlet.getcurrent()
  166. t = threading.Thread(target=background_thread)
  167. t.start()
  168. background_glet_running.wait(10)
  169. greenlet.getcurrent()
  170. lists_before = self.count_objects(list, exact_kind=True)
  171. assert len(background_greenlets) == 1
  172. self.assertFalse(background_greenlets[0].dead)
  173. # Delete the last reference to the background greenlet
  174. # from a different thread. This puts it in the background thread's
  175. # ts_delkey list.
  176. del background_greenlets[:]
  177. background_glet_killed.set()
  178. # Now wait for the background thread to die.
  179. t.join(10)
  180. del t
  181. # As part of the fix for 252, we need to cycle the ceval.c
  182. # interpreter loop to be sure it has had a chance to process
  183. # the pending call.
  184. self.wait_for_pending_cleanups()
  185. lists_after = self.count_objects(list, exact_kind=True)
  186. greenlets_after = self.count_objects(greenlet.greenlet, exact_kind=False)
  187. # On 2.7, we observe that lists_after is smaller than
  188. # lists_before. No idea what lists got cleaned up. All the
  189. # Python 3 versions match exactly.
  190. self.assertLessEqual(lists_after, lists_before)
  191. # On versions after 3.6, we've successfully cleaned up the
  192. # greenlet references thanks to the internal "vectorcall"
  193. # protocol; prior to that, there is a reference path through
  194. # the ``greenlet.switch`` method still on the stack that we
  195. # can't reach to clean up. The C code goes through terrific
  196. # lengths to clean that up.
  197. if not explicit_reference_to_switch \
  198. and greenlet._greenlet.get_clocks_used_doing_optional_cleanup() is not None:
  199. # If cleanup was disabled, though, we may not find it.
  200. self.assertEqual(greenlets_after, greenlets_before)
  201. if manually_collect_background:
  202. # TODO: Figure out how to make this work!
  203. # The one on the stack is still leaking somehow
  204. # in the non-manually-collect state.
  205. self.assertEqual(HasFinalizerTracksInstances.EXTANT_INSTANCES, set())
  206. else:
  207. # The explicit reference prevents us from collecting it
  208. # and it isn't always found by the GC either for some
  209. # reason. The entire frame is leaked somehow, on some
  210. # platforms (e.g., MacPorts builds of Python (all
  211. # versions!)), but not on other platforms (the linux and
  212. # windows builds on GitHub actions and Appveyor). So we'd
  213. # like to write a test that proves that the main greenlet
  214. # sticks around, and we can on my machine (macOS 11.6,
  215. # MacPorts builds of everything) but we can't write that
  216. # same test on other platforms. However, hopefully iteration
  217. # done by leakcheck will find it.
  218. pass
  219. if greenlet._greenlet.get_clocks_used_doing_optional_cleanup() is not None:
  220. self.assertClocksUsed()
  221. def test_issue251_killing_cross_thread_leaks_list(self):
  222. self._check_issue251()
  223. def test_issue251_with_cleanup_disabled(self):
  224. greenlet._greenlet.enable_optional_cleanup(False)
  225. try:
  226. self._check_issue251()
  227. finally:
  228. greenlet._greenlet.enable_optional_cleanup(True)
  229. @fails_leakcheck
  230. def test_issue251_issue252_need_to_collect_in_background(self):
  231. # Between greenlet 1.1.2 and the next version, this was still
  232. # failing because the leak of the list still exists when we
  233. # don't call a greenlet API before exiting the thread. The
  234. # proximate cause is that neither of the two greenlets from
  235. # the background thread are actually being destroyed, even
  236. # though the GC is in fact visiting both objects. It's not
  237. # clear where that leak is? For some reason the thread-local
  238. # dict holding it isn't being cleaned up.
  239. #
  240. # The leak, I think, is in the CPYthon internal function that
  241. # calls into green_switch(). The argument tuple is still on
  242. # the C stack somewhere and can't be reached? That doesn't
  243. # make sense, because the tuple should be collectable when
  244. # this object goes away.
  245. #
  246. # Note that this test sometimes spuriously passes on Linux,
  247. # for some reason, but I've never seen it pass on macOS.
  248. self._check_issue251(manually_collect_background=False)
  249. @fails_leakcheck
  250. def test_issue251_issue252_need_to_collect_in_background_cleanup_disabled(self):
  251. self.expect_greenlet_leak = True
  252. greenlet._greenlet.enable_optional_cleanup(False)
  253. try:
  254. self._check_issue251(manually_collect_background=False)
  255. finally:
  256. greenlet._greenlet.enable_optional_cleanup(True)
  257. @fails_leakcheck
  258. def test_issue251_issue252_explicit_reference_not_collectable(self):
  259. self._check_issue251(
  260. manually_collect_background=False,
  261. explicit_reference_to_switch=True)
  262. UNTRACK_ATTEMPTS = 100
  263. def _only_test_some_versions(self):
  264. # We're only looking for this problem specifically on 3.11,
  265. # and this set of tests is relatively fragile, depending on
  266. # OS and memory management details. So we want to run it on 3.11+
  267. # (obviously) but not every older 3.x version in order to reduce
  268. # false negatives. At the moment, those false results seem to have
  269. # resolved, so we are actually running this on 3.8+
  270. assert sys.version_info[0] >= 3
  271. if sys.version_info[:2] < (3, 8):
  272. self.skipTest('Only observed on 3.11')
  273. if RUNNING_ON_MANYLINUX:
  274. self.skipTest("Slow and not worth repeating here")
  275. @ignores_leakcheck
  276. # Because we're just trying to track raw memory, not objects, and running
  277. # the leakcheck makes an already slow test slower.
  278. def test_untracked_memory_doesnt_increase(self):
  279. # See https://github.com/gevent/gevent/issues/1924
  280. # and https://github.com/python-greenlet/greenlet/issues/328
  281. self._only_test_some_versions()
  282. def f():
  283. return 1
  284. ITER = 10000
  285. def run_it():
  286. for _ in range(ITER):
  287. greenlet.greenlet(f).switch()
  288. # Establish baseline
  289. for _ in range(3):
  290. run_it()
  291. # uss: (Linux, macOS, Windows): aka "Unique Set Size", this is
  292. # the memory which is unique to a process and which would be
  293. # freed if the process was terminated right now.
  294. uss_before = self.get_process_uss()
  295. for count in range(self.UNTRACK_ATTEMPTS):
  296. uss_before = max(uss_before, self.get_process_uss())
  297. run_it()
  298. uss_after = self.get_process_uss()
  299. if uss_after <= uss_before and count > 1:
  300. break
  301. self.assertLessEqual(uss_after, uss_before)
  302. def _check_untracked_memory_thread(self, deallocate_in_thread=True):
  303. self._only_test_some_versions()
  304. # Like the above test, but what if there are a bunch of
  305. # unfinished greenlets in a thread that dies?
  306. # Does it matter if we deallocate in the thread or not?
  307. EXIT_COUNT = [0]
  308. def f():
  309. try:
  310. greenlet.getcurrent().parent.switch()
  311. except greenlet.GreenletExit:
  312. EXIT_COUNT[0] += 1
  313. raise
  314. return 1
  315. ITER = 10000
  316. def run_it():
  317. glets = []
  318. for _ in range(ITER):
  319. # Greenlet starts, switches back to us.
  320. # We keep a strong reference to the greenlet though so it doesn't
  321. # get a GreenletExit exception.
  322. g = greenlet.greenlet(f)
  323. glets.append(g)
  324. g.switch()
  325. return glets
  326. test = self
  327. class ThreadFunc:
  328. uss_before = uss_after = 0
  329. glets = ()
  330. ITER = 2
  331. def __call__(self):
  332. self.uss_before = test.get_process_uss()
  333. for _ in range(self.ITER):
  334. self.glets += tuple(run_it())
  335. for g in self.glets:
  336. test.assertIn('suspended active', str(g))
  337. # Drop them.
  338. if deallocate_in_thread:
  339. self.glets = ()
  340. self.uss_after = test.get_process_uss()
  341. # Establish baseline
  342. uss_before = uss_after = None
  343. for count in range(self.UNTRACK_ATTEMPTS):
  344. EXIT_COUNT[0] = 0
  345. thread_func = ThreadFunc()
  346. t = threading.Thread(target=thread_func)
  347. t.start()
  348. t.join(30)
  349. self.assertFalse(t.is_alive())
  350. if uss_before is None:
  351. uss_before = thread_func.uss_before
  352. uss_before = max(uss_before, thread_func.uss_before)
  353. if deallocate_in_thread:
  354. self.assertEqual(thread_func.glets, ())
  355. self.assertEqual(EXIT_COUNT[0], ITER * thread_func.ITER)
  356. del thread_func # Deallocate the greenlets; but this won't raise into them
  357. del t
  358. if not deallocate_in_thread:
  359. self.assertEqual(EXIT_COUNT[0], 0)
  360. if deallocate_in_thread:
  361. self.wait_for_pending_cleanups()
  362. uss_after = self.get_process_uss()
  363. # See if we achieve a non-growth state at some point. Break when we do.
  364. if uss_after <= uss_before and count > 1:
  365. break
  366. self.wait_for_pending_cleanups()
  367. uss_after = self.get_process_uss()
  368. self.assertLessEqual(uss_after, uss_before, "after attempts %d" % (count,))
  369. @ignores_leakcheck
  370. # Because we're just trying to track raw memory, not objects, and running
  371. # the leakcheck makes an already slow test slower.
  372. def test_untracked_memory_doesnt_increase_unfinished_thread_dealloc_in_thread(self):
  373. self._check_untracked_memory_thread(deallocate_in_thread=True)
  374. @ignores_leakcheck
  375. # Because the main greenlets from the background threads do not exit in a timely fashion,
  376. # we fail the object-based leakchecks.
  377. def test_untracked_memory_doesnt_increase_unfinished_thread_dealloc_in_main(self):
  378. self._check_untracked_memory_thread(deallocate_in_thread=False)
  379. if __name__ == '__main__':
  380. __import__('unittest').main()