def test_task_done(self):
q = self.queue_class()
for i in range(100):
q.put_nowait(i)
self.accumulator = 0
@gen.coroutine
def worker():
while True:
item = yield q.get()
self.accumulator += item
q.task_done()
yield gen.sleep(random() * 0.01)
# Two coroutines share work.
worker()
worker()
yield q.join()
self.assertEqual(sum(range(100)), self.accumulator)
queues_test.py 文件源码
python
阅读 21
收藏 0
点赞 0
评论 0
评论列表
文章目录