def test_whatever_1(self):
"""
From a writing child, fire into the pipe. In a greenlet in the parent,
receive one of these messages and return it to the main greenlet.
Expect message retrieval (child process creation) within a certain
timeout interval. Terminate the child process after retrieval.
"""
with pipe() as (r, w):
def readgreenlet(reader):
with gevent.Timeout(SHORTTIME * 5, False) as t:
m = reader.get(timeout=t)
return m
p = start_process(usecase_child_a, args=(w, ))
# Wait for process to send first message:
r.get()
# Second message must be available immediately now.
g = gevent.spawn(readgreenlet, r)
m = r.get()
assert g.get() == "SPLASH"
p.terminate()
p.join()
assert p.exitcode == -signal.SIGTERM
评论列表
文章目录