def test_exitcode_previous_to_join(self):
p = start_process(lambda: gevent.sleep(SHORTTIME))
# Assume that the child process is still alive when the next
# line is executed by the interpreter (there is no guarantee
# for that, but it's rather likely).
assert p.exitcode is None
# Expect the child watcher mechanism to pick up
# and process the child process termination event
# (within at most two seconds). The `gevent.sleep()`
# invocations allow for libev event loop iterations,
# two of which are required after the OS delivers the
# SIGCHLD signal to the parent process: one iteration
# invokes the child reap loop, and the next invokes
# the libev callback associated with the termination
# event.
deadline = time.time() + 2
while time.time() < deadline:
if p.exitcode is not None:
assert p.exitcode == 0
p.join()
return
gevent.sleep(ALMOSTZERO)
raise Exception('Child termination not detected')
评论列表
文章目录