def test_lock_out_of_context_single(self):
r, w = pipe()
g = gevent.spawn(lambda r: r.get(), r)
gevent.sleep(SHORTTIME)
with raises(GIPCLocked):
with r:
pass
# The context manager can't close `r`, as it is locked in `g`.
g.kill(block=False)
# Ensure killing via 'context switch', i.e. yield control to other
# coroutines (otherwise the subsequent close attempt will fail with
# `GIPCLocked` error).
gevent.sleep(-1)
# Close writer first. otherwise, `os.close(r._fd)` would block on Win.
w.close()
r.close()
评论列表
文章目录