def test_semaphore(self, inspect):
slot = mock.Mock()
slot.side_effect = lambda **k: time.sleep(.3)
signal = Signal('tost')
signal.connect(slot)
x = Task.get_or_create(signal, dict(some_kwarg='foo'),
logger=logging.getLogger('TaskX'))
y = Task.get_or_create(signal, dict(some_kwarg='foo'),
logger=logging.getLogger('TaskY'))
eventlet.spawn(x)
time.sleep(.1)
eventlet.spawn(y)
time.sleep(.1)
assert slot.call_count == 1
time.sleep(.4)
assert slot.call_count == 2
评论列表
文章目录