test_gipc.py 文件源码

python
阅读 26 收藏 0 点赞 0 评论 0

项目:gipc 作者: jgehrcke 项目源码 文件源码
def test_lock_out_of_context_pair_4(self):
        with raises(GIPCLocked):
            with pipe(True) as (h1, h2):
                # Write more to pipe than pipe buffer can hold
                # (makes `put` block when there is no reader).
                # Buffer is quite large on Windows.
                gw1 = gevent.spawn(lambda h: h.put(LONGERTHANBUFFER), h1)
                gw2 = gevent.spawn(lambda h: h.put(LONGERTHANBUFFER), h2)
                gevent.sleep(SHORTTIME)
                # Context fails closing h2 writer, succeeds upon closing h2
                # reader. Proceeds closing h1 writer, fails, closes h1
                # reader and succeeds.
        assert h2._reader._closed
        assert h1._reader._closed
        assert not h2._writer._closed
        assert not h1._writer._closed
        gw1.kill(block=False)
        gw2.kill(block=False)
        gevent.sleep(-1)
        h2.close()
        h1.close()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号