def testQueue(self):
N, M = 2, 2
queue = defer.DeferredQueue(N, M)
gotten = []
for i in range(M):
queue.get().addCallback(gotten.append)
self.assertRaises(defer.QueueUnderflow, queue.get)
for i in range(M):
queue.put(i)
self.assertEquals(gotten, range(i + 1))
for i in range(N):
queue.put(N + i)
self.assertEquals(gotten, range(M))
self.assertRaises(defer.QueueOverflow, queue.put, None)
gotten = []
for i in range(N):
queue.get().addCallback(gotten.append)
self.assertEquals(gotten, range(N, N + i + 1))
queue = defer.DeferredQueue()
gotten = []
for i in range(N):
queue.get().addCallback(gotten.append)
for i in range(N):
queue.put(i)
self.assertEquals(gotten, range(N))
queue = defer.DeferredQueue(size=0)
self.assertRaises(defer.QueueOverflow, queue.put, None)
queue = defer.DeferredQueue(backlog=0)
self.assertRaises(defer.QueueUnderflow, queue.get)
评论列表
文章目录