def testSemaphore(self):
N = 13
sem = defer.DeferredSemaphore(N)
controlDeferred = defer.Deferred()
def helper(self, arg):
self.arg = arg
return controlDeferred
results = []
uniqueObject = object()
resultDeferred = sem.run(helper, self=self, arg=uniqueObject)
resultDeferred.addCallback(results.append)
resultDeferred.addCallback(self._incr)
self.assertEquals(results, [])
self.assertEquals(self.arg, uniqueObject)
controlDeferred.callback(None)
self.assertEquals(results.pop(), None)
self.assertEquals(self.counter, 1)
self.counter = 0
for i in range(1, 1 + N):
sem.acquire().addCallback(self._incr)
self.assertEquals(self.counter, i)
sem.acquire().addCallback(self._incr)
self.assertEquals(self.counter, N)
sem.release()
self.assertEquals(self.counter, N + 1)
for i in range(1, 1 + N):
sem.release()
self.assertEquals(self.counter, N + 1)
评论列表
文章目录