def test_lock_out_of_context_pair(self):
with raises(GIPCLocked):
with pipe(True) as (h1, h2):
# Write more to pipe than pipe buffer can hold
# (makes `put` block when there is no reader).
# Buffer is quite large on Windows.
gw = gevent.spawn(lambda h: h.put(LONGERTHANBUFFER), h1)
gevent.sleep(SHORTTIME)
# Context manager tries to close h2 reader, h2 writer, and
# h1 writer first. Fails upon latter, must still close
# h1 reader after that.
assert not h1._writer._closed
assert h1._reader._closed
assert h2._writer._closed
assert h2._reader._closed
# Kill greenlet (free lock on h1 writer), close h1 writer.
gw.kill(block=False)
gevent.sleep(-1)
h1.close()
assert h1._writer._closed
评论列表
文章目录