def test_lock_out_of_context_pair_3(self):
with raises(GIPCLocked):
with pipe(True) as (h1, h2):
gr1 = gevent.spawn(lambda h: h.get(), h1)
gr2 = gevent.spawn(lambda h: h.get(), h2)
gevent.sleep(SHORTTIME)
# Context succeeds closing h2 writer, fails upon closing h2
# reader. Proceeds closing h1 writer, succeeds, closes h1
# reader and fails.
assert not h2._reader._closed
assert not h1._reader._closed
assert h2._writer._closed
assert h1._writer._closed
gr1.kill(block=False)
gr2.kill(block=False)
gevent.sleep(-1)
h2.close()
h1.close()
评论列表
文章目录