def test_lock_out_of_context_single(self):
h1, h2 = pipe(True)
g = gevent.spawn(lambda h: h.get(), h1)
gevent.sleep(SHORTTIME)
with raises(GIPCLocked):
with h1:
pass
# Can't close h1 reader on exit, as it is locked in `g`.
g.kill(block=False)
# Ensure killing via 'context switch', i.e. yield control to other
# coroutines (otherwise the subsequent close attempt may fail with
# `GIPCLocked` error).
gevent.sleep(-1)
h2.close() # Closes read and write handles of h2.
assert h1._writer._closed
assert not h1._reader._closed
h1.close() # Closes read handle, ignore that writer is already closed.
assert h1._reader._closed
评论列表
文章目录