def test_exhaustion(self):
waiter = Queue(0)
def consumer():
gotten = None
try:
gotten = self.pool.get()
finally:
waiter.put(gotten)
eventlet.spawn(consumer)
one, two, three, four = (
self.pool.get(), self.pool.get(), self.pool.get(), self.pool.get())
self.assertEqual(self.pool.free(), 0)
# Let consumer run; nothing will be in the pool, so he will wait
eventlet.sleep(0)
# Wake consumer
self.pool.put(one)
# wait for the consumer
self.assertEqual(waiter.get(), one)
评论列表
文章目录