def test_no_leaking(self):
refs = weakref.WeakKeyDictionary()
my_local = corolocal.local()
class X(object):
pass
def do_something(i):
o = X()
refs[o] = True
my_local.foo = o
p = eventlet.GreenPool()
for i in six.moves.range(100):
p.spawn(do_something, i)
p.waitall()
del p
gc.collect()
eventlet.sleep(0)
gc.collect()
# at this point all our coros have terminated
self.assertEqual(len(refs), 1)
评论列表
文章目录