def test_wsgi_scenario(self):
from gevent.wsgi import WSGIServer
def serve(http_server):
http_server.serve_forever()
def hello_world(environ, start_response):
# Generate response in child process.
with pipe() as (reader, writer):
start_response('200 OK', [('Content-Type', 'text/html')])
rg = start_process(
target=complchild_test_wsgi_scenario_respgen,
args=(writer, ))
response = reader.get()
rg.join()
assert rg.exitcode == 0
return [response]
http_server = WSGIServer(('localhost', 0), hello_world)
servelet = gevent.spawn(serve, http_server)
# Wait for server being bound to socket.
while True:
if http_server.address[1] != 0:
break
gevent.sleep(0.05)
client = start_process(
target=complchild_test_wsgi_scenario_client,
args=(http_server.address, ))
client.join()
assert client.exitcode == 0
servelet.kill()
servelet.get() # get() is join and re-raises Exception.
评论列表
文章目录