test_gipc.py 文件源码

python
阅读 24 收藏 0 点赞 0 评论 0

项目:gipc 作者: jgehrcke 项目源码 文件源码
def test_lock_out_of_context_single(self):
        h1, h2 = pipe(True)
        g = gevent.spawn(lambda h: h.get(), h1)
        gevent.sleep(SHORTTIME)
        with raises(GIPCLocked):
            with h1:
                pass
                # Can't close h1 reader on exit, as it is locked in `g`.
        g.kill(block=False)
        # Ensure killing via 'context switch', i.e. yield control to other
        # coroutines (otherwise the subsequent close attempt may fail with
        # `GIPCLocked` error).
        gevent.sleep(-1)
        h2.close()  # Closes read and write handles of h2.
        assert h1._writer._closed
        assert not h1._reader._closed
        h1.close()  # Closes read handle, ignore that writer is already closed.
        assert h1._reader._closed
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号