def testQueue(self):
N, M = 2, 2
queue = defer.DeferredQueue(N, M)
gotten = []
for i in range(M):
queue.get().addCallback(gotten.append)
self.assertRaises(defer.QueueUnderflow, queue.get)
for i in range(M):
queue.put(i)
self.assertEqual(gotten, list(range(i + 1)))
for i in range(N):
queue.put(N + i)
self.assertEqual(gotten, list(range(M)))
self.assertRaises(defer.QueueOverflow, queue.put, None)
gotten = []
for i in range(N):
queue.get().addCallback(gotten.append)
self.assertEqual(gotten, list(range(N, N + i + 1)))
queue = defer.DeferredQueue()
gotten = []
for i in range(N):
queue.get().addCallback(gotten.append)
for i in range(N):
queue.put(i)
self.assertEqual(gotten, list(range(N)))
queue = defer.DeferredQueue(size=0)
self.assertRaises(defer.QueueOverflow, queue.put, None)
queue = defer.DeferredQueue(backlog=0)
self.assertRaises(defer.QueueUnderflow, queue.get)
评论列表
文章目录