def test_kill(self):
""" Checks that killing a process after the hub runloop dies does
not immediately return to hub greenlet's parent and schedule a
redundant timer. """
hub = hubs.get_hub()
def dummyproc():
hub.switch()
g = eventlet.spawn(dummyproc)
eventlet.sleep(0) # let dummyproc run
assert hub.greenlet.parent == eventlet.greenthread.getcurrent()
self.assertRaises(KeyboardInterrupt, hub.greenlet.throw,
KeyboardInterrupt())
# kill dummyproc, this schedules a timer to return execution to
# this greenlet before throwing an exception in dummyproc.
# it is from this timer that execution should be returned to this
# greenlet, and not by propogating of the terminating greenlet.
g.kill()
with eventlet.Timeout(0.5, self.CustomException()):
# we now switch to the hub, there should be no existing timers
# that switch back to this greenlet and so this hub.switch()
# call should block indefinitely.
self.assertRaises(self.CustomException, hub.switch)
评论列表
文章目录