_test_multiprocessing.py 文件源码

python
阅读 32 收藏 0 点赞 0 评论 0

项目:ouroboros 作者: pybee 项目源码 文件源码
def test_waitfor_timeout(self):
        # based on test in test/lock_tests.py
        cond = self.Condition()
        state = self.Value('i', 0)
        success = self.Value('i', False)
        sem = self.Semaphore(0)

        p = self.Process(target=self._test_waitfor_timeout_f,
                         args=(cond, state, success, sem))
        p.daemon = True
        p.start()
        self.assertTrue(sem.acquire(timeout=10))

        # Only increment 3 times, so state == 4 is never reached.
        for i in range(3):
            time.sleep(0.01)
            with cond:
                state.value += 1
                cond.notify()

        p.join(5)
        self.assertTrue(success.value)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号