123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443 |
- # -*- coding: utf-8 -*-
- """
- Testing scenarios that may have leaked.
- """
- from __future__ import print_function, absolute_import, division
- import sys
- import gc
- import time
- import weakref
- import threading
- import greenlet
- from . import TestCase
- from .leakcheck import fails_leakcheck
- from .leakcheck import ignores_leakcheck
- from .leakcheck import RUNNING_ON_MANYLINUX
- # pylint:disable=protected-access
- assert greenlet.GREENLET_USE_GC # Option to disable this was removed in 1.0
- class HasFinalizerTracksInstances(object):
- EXTANT_INSTANCES = set()
- def __init__(self, msg):
- self.msg = sys.intern(msg)
- self.EXTANT_INSTANCES.add(id(self))
- def __del__(self):
- self.EXTANT_INSTANCES.remove(id(self))
- def __repr__(self):
- return "<HasFinalizerTracksInstances at 0x%x %r>" % (
- id(self), self.msg
- )
- @classmethod
- def reset(cls):
- cls.EXTANT_INSTANCES.clear()
- class TestLeaks(TestCase):
- def test_arg_refs(self):
- args = ('a', 'b', 'c')
- refcount_before = sys.getrefcount(args)
- # pylint:disable=unnecessary-lambda
- g = greenlet.greenlet(
- lambda *args: greenlet.getcurrent().parent.switch(*args))
- for _ in range(100):
- g.switch(*args)
- self.assertEqual(sys.getrefcount(args), refcount_before)
- def test_kwarg_refs(self):
- kwargs = {}
- # pylint:disable=unnecessary-lambda
- g = greenlet.greenlet(
- lambda **kwargs: greenlet.getcurrent().parent.switch(**kwargs))
- for _ in range(100):
- g.switch(**kwargs)
- self.assertEqual(sys.getrefcount(kwargs), 2)
- @staticmethod
- def __recycle_threads():
- # By introducing a thread that does sleep we allow other threads,
- # that have triggered their __block condition, but did not have a
- # chance to deallocate their thread state yet, to finally do so.
- # The way it works is by requiring a GIL switch (different thread),
- # which does a GIL release (sleep), which might do a GIL switch
- # to finished threads and allow them to clean up.
- def worker():
- time.sleep(0.001)
- t = threading.Thread(target=worker)
- t.start()
- time.sleep(0.001)
- t.join(10)
- def test_threaded_leak(self):
- gg = []
- def worker():
- # only main greenlet present
- gg.append(weakref.ref(greenlet.getcurrent()))
- for _ in range(2):
- t = threading.Thread(target=worker)
- t.start()
- t.join(10)
- del t
- greenlet.getcurrent() # update ts_current
- self.__recycle_threads()
- greenlet.getcurrent() # update ts_current
- gc.collect()
- greenlet.getcurrent() # update ts_current
- for g in gg:
- self.assertIsNone(g())
- def test_threaded_adv_leak(self):
- gg = []
- def worker():
- # main and additional *finished* greenlets
- ll = greenlet.getcurrent().ll = []
- def additional():
- ll.append(greenlet.getcurrent())
- for _ in range(2):
- greenlet.greenlet(additional).switch()
- gg.append(weakref.ref(greenlet.getcurrent()))
- for _ in range(2):
- t = threading.Thread(target=worker)
- t.start()
- t.join(10)
- del t
- greenlet.getcurrent() # update ts_current
- self.__recycle_threads()
- greenlet.getcurrent() # update ts_current
- gc.collect()
- greenlet.getcurrent() # update ts_current
- for g in gg:
- self.assertIsNone(g())
- def assertClocksUsed(self):
- used = greenlet._greenlet.get_clocks_used_doing_optional_cleanup()
- self.assertGreaterEqual(used, 0)
- # we don't lose the value
- greenlet._greenlet.enable_optional_cleanup(True)
- used2 = greenlet._greenlet.get_clocks_used_doing_optional_cleanup()
- self.assertEqual(used, used2)
- self.assertGreater(greenlet._greenlet.CLOCKS_PER_SEC, 1)
- def _check_issue251(self,
- manually_collect_background=True,
- explicit_reference_to_switch=False):
- # See https://github.com/python-greenlet/greenlet/issues/251
- # Killing a greenlet (probably not the main one)
- # in one thread from another thread would
- # result in leaking a list (the ts_delkey list).
- # We no longer use lists to hold that stuff, though.
- # For the test to be valid, even empty lists have to be tracked by the
- # GC
- assert gc.is_tracked([])
- HasFinalizerTracksInstances.reset()
- greenlet.getcurrent()
- greenlets_before = self.count_objects(greenlet.greenlet, exact_kind=False)
- background_glet_running = threading.Event()
- background_glet_killed = threading.Event()
- background_greenlets = []
- # XXX: Switching this to a greenlet subclass that overrides
- # run results in all callers failing the leaktest; that
- # greenlet instance is leaked. There's a bound method for
- # run() living on the stack of the greenlet in g_initialstub,
- # and since we don't manually switch back to the background
- # greenlet to let it "fall off the end" and exit the
- # g_initialstub function, it never gets cleaned up. Making the
- # garbage collector aware of this bound method (making it an
- # attribute of the greenlet structure and traversing into it)
- # doesn't help, for some reason.
- def background_greenlet():
- # Throw control back to the main greenlet.
- jd = HasFinalizerTracksInstances("DELETING STACK OBJECT")
- greenlet._greenlet.set_thread_local(
- 'test_leaks_key',
- HasFinalizerTracksInstances("DELETING THREAD STATE"))
- # Explicitly keeping 'switch' in a local variable
- # breaks this test in all versions
- if explicit_reference_to_switch:
- s = greenlet.getcurrent().parent.switch
- s([jd])
- else:
- greenlet.getcurrent().parent.switch([jd])
- bg_main_wrefs = []
- def background_thread():
- glet = greenlet.greenlet(background_greenlet)
- bg_main_wrefs.append(weakref.ref(glet.parent))
- background_greenlets.append(glet)
- glet.switch() # Be sure it's active.
- # Control is ours again.
- del glet # Delete one reference from the thread it runs in.
- background_glet_running.set()
- background_glet_killed.wait(10)
- # To trigger the background collection of the dead
- # greenlet, thus clearing out the contents of the list, we
- # need to run some APIs. See issue 252.
- if manually_collect_background:
- greenlet.getcurrent()
- t = threading.Thread(target=background_thread)
- t.start()
- background_glet_running.wait(10)
- greenlet.getcurrent()
- lists_before = self.count_objects(list, exact_kind=True)
- assert len(background_greenlets) == 1
- self.assertFalse(background_greenlets[0].dead)
- # Delete the last reference to the background greenlet
- # from a different thread. This puts it in the background thread's
- # ts_delkey list.
- del background_greenlets[:]
- background_glet_killed.set()
- # Now wait for the background thread to die.
- t.join(10)
- del t
- # As part of the fix for 252, we need to cycle the ceval.c
- # interpreter loop to be sure it has had a chance to process
- # the pending call.
- self.wait_for_pending_cleanups()
- lists_after = self.count_objects(list, exact_kind=True)
- greenlets_after = self.count_objects(greenlet.greenlet, exact_kind=False)
- # On 2.7, we observe that lists_after is smaller than
- # lists_before. No idea what lists got cleaned up. All the
- # Python 3 versions match exactly.
- self.assertLessEqual(lists_after, lists_before)
- # On versions after 3.6, we've successfully cleaned up the
- # greenlet references thanks to the internal "vectorcall"
- # protocol; prior to that, there is a reference path through
- # the ``greenlet.switch`` method still on the stack that we
- # can't reach to clean up. The C code goes through terrific
- # lengths to clean that up.
- if not explicit_reference_to_switch \
- and greenlet._greenlet.get_clocks_used_doing_optional_cleanup() is not None:
- # If cleanup was disabled, though, we may not find it.
- self.assertEqual(greenlets_after, greenlets_before)
- if manually_collect_background:
- # TODO: Figure out how to make this work!
- # The one on the stack is still leaking somehow
- # in the non-manually-collect state.
- self.assertEqual(HasFinalizerTracksInstances.EXTANT_INSTANCES, set())
- else:
- # The explicit reference prevents us from collecting it
- # and it isn't always found by the GC either for some
- # reason. The entire frame is leaked somehow, on some
- # platforms (e.g., MacPorts builds of Python (all
- # versions!)), but not on other platforms (the linux and
- # windows builds on GitHub actions and Appveyor). So we'd
- # like to write a test that proves that the main greenlet
- # sticks around, and we can on my machine (macOS 11.6,
- # MacPorts builds of everything) but we can't write that
- # same test on other platforms. However, hopefully iteration
- # done by leakcheck will find it.
- pass
- if greenlet._greenlet.get_clocks_used_doing_optional_cleanup() is not None:
- self.assertClocksUsed()
- def test_issue251_killing_cross_thread_leaks_list(self):
- self._check_issue251()
- def test_issue251_with_cleanup_disabled(self):
- greenlet._greenlet.enable_optional_cleanup(False)
- try:
- self._check_issue251()
- finally:
- greenlet._greenlet.enable_optional_cleanup(True)
- @fails_leakcheck
- def test_issue251_issue252_need_to_collect_in_background(self):
- # Between greenlet 1.1.2 and the next version, this was still
- # failing because the leak of the list still exists when we
- # don't call a greenlet API before exiting the thread. The
- # proximate cause is that neither of the two greenlets from
- # the background thread are actually being destroyed, even
- # though the GC is in fact visiting both objects. It's not
- # clear where that leak is? For some reason the thread-local
- # dict holding it isn't being cleaned up.
- #
- # The leak, I think, is in the CPYthon internal function that
- # calls into green_switch(). The argument tuple is still on
- # the C stack somewhere and can't be reached? That doesn't
- # make sense, because the tuple should be collectable when
- # this object goes away.
- #
- # Note that this test sometimes spuriously passes on Linux,
- # for some reason, but I've never seen it pass on macOS.
- self._check_issue251(manually_collect_background=False)
- @fails_leakcheck
- def test_issue251_issue252_need_to_collect_in_background_cleanup_disabled(self):
- self.expect_greenlet_leak = True
- greenlet._greenlet.enable_optional_cleanup(False)
- try:
- self._check_issue251(manually_collect_background=False)
- finally:
- greenlet._greenlet.enable_optional_cleanup(True)
- @fails_leakcheck
- def test_issue251_issue252_explicit_reference_not_collectable(self):
- self._check_issue251(
- manually_collect_background=False,
- explicit_reference_to_switch=True)
- UNTRACK_ATTEMPTS = 100
- def _only_test_some_versions(self):
- # We're only looking for this problem specifically on 3.11,
- # and this set of tests is relatively fragile, depending on
- # OS and memory management details. So we want to run it on 3.11+
- # (obviously) but not every older 3.x version in order to reduce
- # false negatives. At the moment, those false results seem to have
- # resolved, so we are actually running this on 3.8+
- assert sys.version_info[0] >= 3
- if sys.version_info[:2] < (3, 8):
- self.skipTest('Only observed on 3.11')
- if RUNNING_ON_MANYLINUX:
- self.skipTest("Slow and not worth repeating here")
- @ignores_leakcheck
- # Because we're just trying to track raw memory, not objects, and running
- # the leakcheck makes an already slow test slower.
- def test_untracked_memory_doesnt_increase(self):
- # See https://github.com/gevent/gevent/issues/1924
- # and https://github.com/python-greenlet/greenlet/issues/328
- self._only_test_some_versions()
- def f():
- return 1
- ITER = 10000
- def run_it():
- for _ in range(ITER):
- greenlet.greenlet(f).switch()
- # Establish baseline
- for _ in range(3):
- run_it()
- # uss: (Linux, macOS, Windows): aka "Unique Set Size", this is
- # the memory which is unique to a process and which would be
- # freed if the process was terminated right now.
- uss_before = self.get_process_uss()
- for count in range(self.UNTRACK_ATTEMPTS):
- uss_before = max(uss_before, self.get_process_uss())
- run_it()
- uss_after = self.get_process_uss()
- if uss_after <= uss_before and count > 1:
- break
- self.assertLessEqual(uss_after, uss_before)
- def _check_untracked_memory_thread(self, deallocate_in_thread=True):
- self._only_test_some_versions()
- # Like the above test, but what if there are a bunch of
- # unfinished greenlets in a thread that dies?
- # Does it matter if we deallocate in the thread or not?
- EXIT_COUNT = [0]
- def f():
- try:
- greenlet.getcurrent().parent.switch()
- except greenlet.GreenletExit:
- EXIT_COUNT[0] += 1
- raise
- return 1
- ITER = 10000
- def run_it():
- glets = []
- for _ in range(ITER):
- # Greenlet starts, switches back to us.
- # We keep a strong reference to the greenlet though so it doesn't
- # get a GreenletExit exception.
- g = greenlet.greenlet(f)
- glets.append(g)
- g.switch()
- return glets
- test = self
- class ThreadFunc:
- uss_before = uss_after = 0
- glets = ()
- ITER = 2
- def __call__(self):
- self.uss_before = test.get_process_uss()
- for _ in range(self.ITER):
- self.glets += tuple(run_it())
- for g in self.glets:
- test.assertIn('suspended active', str(g))
- # Drop them.
- if deallocate_in_thread:
- self.glets = ()
- self.uss_after = test.get_process_uss()
- # Establish baseline
- uss_before = uss_after = None
- for count in range(self.UNTRACK_ATTEMPTS):
- EXIT_COUNT[0] = 0
- thread_func = ThreadFunc()
- t = threading.Thread(target=thread_func)
- t.start()
- t.join(30)
- self.assertFalse(t.is_alive())
- if uss_before is None:
- uss_before = thread_func.uss_before
- uss_before = max(uss_before, thread_func.uss_before)
- if deallocate_in_thread:
- self.assertEqual(thread_func.glets, ())
- self.assertEqual(EXIT_COUNT[0], ITER * thread_func.ITER)
- del thread_func # Deallocate the greenlets; but this won't raise into them
- del t
- if not deallocate_in_thread:
- self.assertEqual(EXIT_COUNT[0], 0)
- if deallocate_in_thread:
- self.wait_for_pending_cleanups()
- uss_after = self.get_process_uss()
- # See if we achieve a non-growth state at some point. Break when we do.
- if uss_after <= uss_before and count > 1:
- break
- self.wait_for_pending_cleanups()
- uss_after = self.get_process_uss()
- self.assertLessEqual(uss_after, uss_before, "after attempts %d" % (count,))
- @ignores_leakcheck
- # Because we're just trying to track raw memory, not objects, and running
- # the leakcheck makes an already slow test slower.
- def test_untracked_memory_doesnt_increase_unfinished_thread_dealloc_in_thread(self):
- self._check_untracked_memory_thread(deallocate_in_thread=True)
- @ignores_leakcheck
- # Because the main greenlets from the background threads do not exit in a timely fashion,
- # we fail the object-based leakchecks.
- def test_untracked_memory_doesnt_increase_unfinished_thread_dealloc_in_main(self):
- self._check_untracked_memory_thread(deallocate_in_thread=False)
- if __name__ == '__main__':
- __import__('unittest').main()
|