test_threading.py 文件源码

python
阅读 23 收藏 0 点赞 0 评论 0

项目:oil 作者: oilshell 项目源码 文件源码
def test_various_ops(self):
        # This takes about n/3 seconds to run (about n/3 clumps of tasks,
        # times about 1 second per clump).
        NUMTASKS = 10

        # no more than 3 of the 10 can run at once
        sema = threading.BoundedSemaphore(value=3)
        mutex = threading.RLock()
        numrunning = Counter()

        threads = []

        for i in range(NUMTASKS):
            t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
            threads.append(t)
            self.assertIsNone(t.ident)
            self.assertRegexpMatches(repr(t), r'^<TestThread\(.*, initial\)>$')
            t.start()

        if verbose:
            print 'waiting for all tasks to complete'
        for t in threads:
            t.join(NUMTASKS)
            self.assertFalse(t.is_alive())
            self.assertNotEqual(t.ident, 0)
            self.assertIsNotNone(t.ident)
            self.assertRegexpMatches(repr(t), r'^<TestThread\(.*, \w+ -?\d+\)>$')
        if verbose:
            print 'all tasks done'
        self.assertEqual(numrunning.get(), 0)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号