fork_wait.py 文件源码

python
阅读 26 收藏 0 点赞 0 评论 0

项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码
def test_wait(self):
        for i in range(NUM_THREADS):
            _thread.start_new(self.f, (i,))

        time.sleep(LONGSLEEP)

        a = sorted(self.alive.keys())
        self.assertEqual(a, list(range(NUM_THREADS)))

        prefork_lives = self.alive.copy()

        if sys.platform in ['unixware7']:
            cpid = os.fork1()
        else:
            cpid = os.fork()

        if cpid == 0:
            # Child
            time.sleep(LONGSLEEP)
            n = 0
            for key in self.alive:
                if self.alive[key] != prefork_lives[key]:
                    n += 1
            os._exit(n)
        else:
            # Parent
            try:
                self.wait_impl(cpid)
            finally:
                # Tell threads to die
                self.stop = 1
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号