def _threadpoolTest(self, method):
# This is a schizophrenic test: it seems to be trying to test
# both the dispatch() behavior of the ThreadPool as well as
# the serialization behavior of threadable.synchronize(). It
# would probably make more sense as two much simpler tests.
N = 10
tp = threadpool.ThreadPool()
tp.start()
try:
waiting = threading.Lock()
waiting.acquire()
actor = Synchronization(N, waiting)
for i in xrange(N):
tp.dispatch(actor, actor.run)
self._waitForLock(waiting)
self.failIf(actor.failures, "run() re-entered %d times" % (actor.failures,))
finally:
tp.stop()
评论列表
文章目录