test_termination.py 文件源码

python
阅读 32 收藏 0 点赞 0 评论 0

项目:execnet 作者: pytest-dev 项目源码 文件源码
def test_endmarker_delivery_on_remote_killterm(makegateway, execmodel):
    if execmodel.backend != "thread":
        pytest.xfail("test and execnet not compatible to greenlets yet")
    gw = makegateway('popen')
    q = execmodel.queue.Queue()
    channel = gw.remote_exec(source='''
        import os, time
        channel.send(os.getpid())
        time.sleep(100)
    ''')
    pid = channel.receive()
    py.process.kill(pid)
    channel.setcallback(q.put, endmarker=999)
    val = q.get(TESTTIMEOUT)
    assert val == 999
    err = channel._getremoteerror()
    assert isinstance(err, EOFError)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号