test_gipc.py 文件源码

python
阅读 27 收藏 0 点赞 0 评论 0

项目:gipc 作者: jgehrcke 项目源码 文件源码
def test_whatever_1(self):
        """
        From a writing child, fire into the pipe. In a greenlet in the parent,
        receive one of these messages and return it to the main greenlet.
        Expect message retrieval (child process creation) within a certain
        timeout interval. Terminate the child process after retrieval.
        """
        with pipe() as (r, w):
            def readgreenlet(reader):
                with gevent.Timeout(SHORTTIME * 5, False) as t:
                    m = reader.get(timeout=t)
                    return m
            p = start_process(usecase_child_a, args=(w, ))
            # Wait for process to send first message:
            r.get()
            # Second message must be available immediately now.
            g = gevent.spawn(readgreenlet, r)
            m = r.get()
            assert g.get() == "SPLASH"
            p.terminate()
            p.join()
            assert p.exitcode == -signal.SIGTERM
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号