def test_lock_out_of_context_pair_4(self):
with raises(GIPCLocked):
with pipe(True) as (h1, h2):
# Write more to pipe than pipe buffer can hold
# (makes `put` block when there is no reader).
# Buffer is quite large on Windows.
gw1 = gevent.spawn(lambda h: h.put(LONGERTHANBUFFER), h1)
gw2 = gevent.spawn(lambda h: h.put(LONGERTHANBUFFER), h2)
gevent.sleep(SHORTTIME)
# Context fails closing h2 writer, succeeds upon closing h2
# reader. Proceeds closing h1 writer, fails, closes h1
# reader and succeeds.
assert h2._reader._closed
assert h1._reader._closed
assert not h2._writer._closed
assert not h1._writer._closed
gw1.kill(block=False)
gw2.kill(block=False)
gevent.sleep(-1)
h2.close()
h1.close()
评论列表
文章目录