def test_schedule(self):
hub = hubs.get_hub()
# clean up the runloop, preventing side effects from previous tests
# on this thread
if hub.running:
hub.abort()
eventlet.sleep(0)
called = []
# t = timer.Timer(0, lambda: (called.append(True), hub.abort()))
# t.schedule()
# let's have a timer somewhere in the future; make sure abort() still works
# (for pyevent, its dispatcher() does not exit if there is something scheduled)
# XXX pyevent handles this, other hubs do not
# hubs.get_hub().schedule_call_global(10000, lambda: (called.append(True), hub.abort()))
hubs.get_hub().schedule_call_global(0, lambda: (called.append(True), hub.abort()))
hub.default_sleep = lambda: 0.0
hub.switch()
assert called
assert not hub.running
python类hubs()的实例源码
def test_sleep(self):
# even if there was an error in the mainloop, the hub should continue
# to work
start = time.time()
eventlet.sleep(DELAY)
delay = time.time() - start
assert delay >= DELAY * \
0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (
delay, DELAY)
def fail():
1 // 0
hubs.get_hub().schedule_call_global(0, fail)
start = time.time()
eventlet.sleep(DELAY)
delay = time.time() - start
assert delay >= DELAY * \
0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (
delay, DELAY)
def test_exceptionleaks(self):
# tests expected behaviour with all versions of greenlet
def test_gt(sem):
try:
raise KeyError()
except KeyError:
sem.release()
hubs.get_hub().switch()
# semaphores for controlling execution order
sem = eventlet.Semaphore()
sem.acquire()
g = eventlet.spawn(test_gt, sem)
try:
sem.acquire()
assert sys.exc_info()[0] is None
finally:
g.kill()
def test_kill(self):
""" Checks that killing a process after the hub runloop dies does
not immediately return to hub greenlet's parent and schedule a
redundant timer. """
hub = hubs.get_hub()
def dummyproc():
hub.switch()
g = eventlet.spawn(dummyproc)
eventlet.sleep(0) # let dummyproc run
assert hub.greenlet.parent == eventlet.greenthread.getcurrent()
self.assertRaises(KeyboardInterrupt, hub.greenlet.throw,
KeyboardInterrupt())
# kill dummyproc, this schedules a timer to return execution to
# this greenlet before throwing an exception in dummyproc.
# it is from this timer that execution should be returned to this
# greenlet, and not by propogating of the terminating greenlet.
g.kill()
with eventlet.Timeout(0.5, self.CustomException()):
# we now switch to the hub, there should be no existing timers
# that switch back to this greenlet and so this hub.switch()
# call should block indefinitely.
self.assertRaises(self.CustomException, hub.switch)
def test_parent(self):
""" Checks that a terminating greenthread whose parent
was a previous, now-defunct hub greenlet returns execution to
the hub runloop and not the hub greenlet's parent. """
hub = hubs.get_hub()
def dummyproc():
pass
g = eventlet.spawn(dummyproc)
assert hub.greenlet.parent == eventlet.greenthread.getcurrent()
self.assertRaises(KeyboardInterrupt, hub.greenlet.throw,
KeyboardInterrupt())
assert not g.dead # check dummyproc hasn't completed
with eventlet.Timeout(0.5, self.CustomException()):
# we now switch to the hub which will allow
# completion of dummyproc.
# this should return execution back to the runloop and not
# this greenlet so that hub.switch() would block indefinitely.
self.assertRaises(self.CustomException, hub.switch)
assert g.dead # sanity check that dummyproc has completed
def verify_hub_empty():
def format_listener(listener):
return 'Listener %r for greenlet %r with run callback %r' % (
listener, listener.greenlet, getattr(listener.greenlet, 'run', None))
from eventlet import hubs
hub = hubs.get_hub()
readers = hub.get_readers()
writers = hub.get_writers()
num_readers = len(readers)
num_writers = len(writers)
num_timers = hub.get_timers_count()
assert num_readers == 0 and num_writers == 0, \
"Readers: %s (%d) Writers: %s (%d)" % (
', '.join(map(format_listener, readers)), num_readers,
', '.join(map(format_listener, writers)), num_writers,
)
def patch(self):
hubs.use_hub()
eventlet.monkey_patch(os=False)
patch_sendfile()
def patch(self):
hubs.use_hub()
eventlet.monkey_patch(os=False)
patch_sendfile()
def patch(self):
hubs.use_hub()
eventlet.monkey_patch(os=False)
patch_sendfile()
def patch(self):
hubs.use_hub()
eventlet.monkey_patch(os=False)
patch_sendfile()
def patch(self):
hubs.use_hub()
eventlet.monkey_patch(os=False)
patch_sendfile()
def patch(self):
hubs.use_hub()
eventlet.monkey_patch(os=False)
patch_sendfile()
def patch(self):
hubs.use_hub()
eventlet.monkey_patch(os=False)
patch_sendfile()
def patch(self):
hubs.use_hub()
eventlet.monkey_patch(os=False)
patch_sendfile()
def patch(self):
hubs.use_hub()
eventlet.monkey_patch(os=False)
patch_sendfile()
def patch(self):
hubs.use_hub()
eventlet.monkey_patch(os=False)
patch_sendfile()
def patch(self):
hubs.use_hub()
eventlet.monkey_patch(os=False)
patch_sendfile()
def patch(self):
hubs.use_hub()
eventlet.monkey_patch(os=False)
patch_sendfile()
def patch(self):
hubs.use_hub()
eventlet.monkey_patch(os=False)
patch_sendfile()
def patch(self):
hubs.use_hub()
eventlet.monkey_patch(os=False)
patch_sendfile()
def test_cancel_immediate(self):
hub = hubs.get_hub()
stimers = hub.get_timers_count()
scanceled = hub.timers_canceled
for i in six.moves.range(2000):
t = hubs.get_hub().schedule_call_global(60, noop)
t.cancel()
self.assert_less_than_equal(hub.timers_canceled,
hub.get_timers_count() + 1)
# there should be fewer than 1000 new timers and canceled
self.assert_less_than_equal(hub.get_timers_count(), 1000 + stimers)
self.assert_less_than_equal(hub.timers_canceled, 1000)
def test_cancel_accumulated(self):
hub = hubs.get_hub()
stimers = hub.get_timers_count()
scanceled = hub.timers_canceled
for i in six.moves.range(2000):
t = hubs.get_hub().schedule_call_global(60, noop)
eventlet.sleep()
self.assert_less_than_equal(hub.timers_canceled,
hub.get_timers_count() + 1)
t.cancel()
self.assert_less_than_equal(hub.timers_canceled,
hub.get_timers_count() + 1, hub.timers)
# there should be fewer than 1000 new timers and canceled
self.assert_less_than_equal(hub.get_timers_count(), 1000 + stimers)
self.assert_less_than_equal(hub.timers_canceled, 1000)
def test_cancel_proportion(self):
# if fewer than half the pending timers are canceled, it should
# not clean them out
hub = hubs.get_hub()
uncanceled_timers = []
stimers = hub.get_timers_count()
scanceled = hub.timers_canceled
for i in six.moves.range(1000):
# 2/3rds of new timers are uncanceled
t = hubs.get_hub().schedule_call_global(60, noop)
t2 = hubs.get_hub().schedule_call_global(60, noop)
t3 = hubs.get_hub().schedule_call_global(60, noop)
eventlet.sleep()
self.assert_less_than_equal(hub.timers_canceled,
hub.get_timers_count() + 1)
t.cancel()
self.assert_less_than_equal(hub.timers_canceled,
hub.get_timers_count() + 1)
uncanceled_timers.append(t2)
uncanceled_timers.append(t3)
# 3000 new timers, plus a few extras
self.assert_less_than_equal(stimers + 3000,
stimers + hub.get_timers_count())
self.assertEqual(hub.timers_canceled, 1000)
for t in uncanceled_timers:
t.cancel()
self.assert_less_than_equal(hub.timers_canceled,
hub.get_timers_count())
eventlet.sleep()
def test_local(self):
lst = [1]
eventlet.spawn(hubs.get_hub().schedule_call_local, DELAY, lst.pop)
eventlet.sleep(0)
eventlet.sleep(DELAY * 2)
assert lst == [1], lst
def test_ordering(self):
lst = []
hubs.get_hub().schedule_call_global(DELAY * 2, lst.append, 3)
hubs.get_hub().schedule_call_global(DELAY, lst.append, 1)
hubs.get_hub().schedule_call_global(DELAY, lst.append, 2)
while len(lst) < 3:
eventlet.sleep(DELAY)
self.assertEqual(lst, [1, 2, 3])
def test_debug_listeners(self):
hubs.get_hub().set_debug_listeners(True)
hubs.get_hub().set_debug_listeners(False)
def test_timer_exceptions(self):
hubs.get_hub().set_timer_exceptions(True)
hubs.get_hub().set_timer_exceptions(False)
def test_kqueue_unsupported(self):
# https://github.com/eventlet/eventlet/issues/38
# get_hub on windows broken by kqueue
module_source = r'''
from __future__ import print_function
# Simulate absence of kqueue even on platforms that support it.
import select
try:
del select.kqueue
except AttributeError:
pass
from eventlet.support.six.moves import builtins
original_import = builtins.__import__
def fail_import(name, *args, **kwargs):
if 'epoll' in name:
raise ImportError('disabled for test')
if 'kqueue' in name:
print('kqueue tried')
return original_import(name, *args, **kwargs)
builtins.__import__ = fail_import
import eventlet.hubs
eventlet.hubs.get_default_hub()
print('ok')
'''
self.write_to_tempfile('newmod', module_source)
output, _ = self.launch_subprocess('newmod.py')
self.assertEqual(output, 'kqueue tried\nok\n')
def using_pyevent(_f):
from eventlet.hubs import get_hub
return 'pyevent' in type(get_hub()).__module__
def patch(self):
hubs.use_hub()
eventlet.monkey_patch(os=False)
patch_sendfile()