def test_putting_to_queue(self):
timer = eventlet.Timeout(0.1)
try:
size = 2
self.pool = IntPool(min_size=0, max_size=size)
queue = Queue()
results = []
def just_put(pool_item, index):
self.pool.put(pool_item)
queue.put(index)
for index in six.moves.range(size + 1):
pool_item = self.pool.get()
eventlet.spawn(just_put, pool_item, index)
for _ in six.moves.range(size + 1):
x = queue.get()
results.append(x)
self.assertEqual(sorted(results), list(six.moves.range(size + 1)))
finally:
timer.cancel()
评论列表
文章目录