def test_wsgi_scenario(self):
from gevent.wsgi import WSGIServer
def serve(http_server):
http_server.serve_forever()
def hello_world(environ, start_response):
# Generate response in child process.
with pipe() as (reader, writer):
start_response('200 OK', [('Content-Type', 'text/html')])
rg = start_process(
target=complchild_test_wsgi_scenario_respgen,
args=(writer, ))
response = reader.get()
rg.join()
assert rg.exitcode == 0
return [response]
http_server = WSGIServer(('localhost', 0), hello_world)
servelet = gevent.spawn(serve, http_server)
# Wait for server being bound to socket.
while True:
if http_server.address[1] != 0:
break
gevent.sleep(0.05)
client = start_process(
target=complchild_test_wsgi_scenario_client,
args=(http_server.address, ))
client.join()
assert client.exitcode == 0
servelet.kill()
servelet.get() # get() is join and re-raises Exception.
python类wsgi()的实例源码
def run_wsgi_app(address, app):
try:
from gunicorn.app.wsgiapp import WSGIApplication
class GunicornApplication(WSGIApplication):
def init(self, parser, opts, args):
return {'bind': '%s:%d' % (address[0], int(address[1])),
'workers': 2,
'worker_class': 'gevent'}
def load(self):
return application
GunicornApplication().run()
except ImportError:
from gevent.wsgi import WSGIServer
WSGIServer(address, app).serve_forever()
def prepare_env(self):
req = self.request
env = self.server.get_environ()
if '?' in req.uri:
path, query = req.uri.split('?', 1)
else:
path, query = req.uri, ''
path = unquote(path)
env.update({'REQUEST_METHOD': req.typestr,
'PATH_INFO': path,
'QUERY_STRING': query,
'SERVER_PROTOCOL': 'HTTP/%d.%d' % req.version,
'REMOTE_ADDR': req.remote_host,
'REMOTE_PORT': str(req.remote_port),
'REQUEST_URI': req.uri,
'wsgi.input': req.input_buffer})
for header, value in req.get_input_headers():
header = header.replace('-', '_').upper()
if header not in ('CONTENT_LENGTH', 'CONTENT_TYPE'):
header = 'HTTP_' + header
if header in env:
if 'COOKIE' in header:
env[header] += '; ' + value
else:
env[header] += ',' + value
else:
env[header] = value
return env
def set_environ(self, environ=None):
if environ is not None:
self.environ = environ
environ_update = getattr(self, 'environ', None)
self.environ = self.base_env.copy()
if environ_update is not None:
self.environ.update(environ_update)
if self.environ.get('wsgi.errors') is None:
self.environ['wsgi.errors'] = sys.stderr
def prepare_env(self):
req = self.request
env = self.server.get_environ()
if '?' in req.uri:
path, query = req.uri.split('?', 1)
else:
path, query = req.uri, ''
path = unquote(path)
env.update({'REQUEST_METHOD': req.typestr,
'PATH_INFO': path,
'QUERY_STRING': query,
'SERVER_PROTOCOL': 'HTTP/%d.%d' % req.version,
'REMOTE_ADDR': req.remote_host,
'REMOTE_PORT': str(req.remote_port),
'REQUEST_URI': req.uri,
'wsgi.input': req.input_buffer})
for header, value in req.get_input_headers():
header = header.replace('-', '_').upper()
if header not in ('CONTENT_LENGTH', 'CONTENT_TYPE'):
header = 'HTTP_' + header
if header in env:
if 'COOKIE' in header:
env[header] += '; ' + value
else:
env[header] += ',' + value
else:
env[header] = value
return env
def set_environ(self, environ=None):
if environ is not None:
self.environ = environ
environ_update = getattr(self, 'environ', None)
self.environ = self.base_env.copy()
if environ_update is not None:
self.environ.update(environ_update)
if self.environ.get('wsgi.errors') is None:
self.environ['wsgi.errors'] = sys.stderr
def base_run(app, port=5000, **kwargs): # pragma: no cover
"""Run app in base run, for debugging and testing
:param app: wsgi application, ex. Microservice instance
:param port: int, listen port, default 5000
:param kwargs: params for app.run
:return: None
"""
app.run(port=port, **kwargs)
def gevent_run(app, port=5000, log=None, error_log=None, address='',
monkey_patch=True, start=True, **kwargs): # pragma: no cover
"""Run your app in gevent.wsgi.WSGIServer
:param app: wsgi application, ex. Microservice instance
:param port: int, listen port, default 5000
:param address: str, listen address, default: ""
:param log: logger instance, default app.logger
:param error_log: logger instance, default app.logger
:param monkey_patch: boolean, use gevent.monkey.patch_all() for patching standard modules, default: True
:param start: boolean, if True, server will be start (server.serve_forever())
:param kwargs: other params for WSGIServer(**kwargs)
:return: server
"""
if log is None:
log = app.logger
if error_log is None:
error_log = app.logger
if monkey_patch:
from gevent import monkey
monkey.patch_all()
from gevent.wsgi import WSGIServer
http_server = WSGIServer((address, port), app, log=log, error_log=error_log,
**kwargs)
if start:
http_server.serve_forever()
return http_server
def run(self, handler):
from gevent import wsgi as wsgi_fast, pywsgi, monkey, local
if self.options.get('monkey', True):
if not threading.local is local.local: monkey.patch_all()
wsgi = wsgi_fast if self.options.get('fast') else pywsgi
wsgi.WSGIServer((self.host, self.port), handler).serve_forever()
def make_app():
return tornado.wsgi.WSGIApplication([
(r"/", MainHandler),
(r"/query", QueryHandler)],
template_path=os.path.join(os.path.dirname(__file__), "templates"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
debug=True)
def run_wsgi_app(address, app):
try:
from gunicorn.app.wsgiapp import WSGIApplication
class GunicornApplication(WSGIApplication):
def init(self, parser, opts, args):
return {'bind': '%s:%d' % (address[0], int(address[1])),
'workers': 2,
'worker_class': 'gevent'}
def load(self):
return application
GunicornApplication().run()
except ImportError:
from gevent.wsgi import WSGIServer
WSGIServer(address, app).serve_forever()
def prepare_env(self):
req = self.request
env = self.server.get_environ()
if '?' in req.uri:
path, query = req.uri.split('?', 1)
else:
path, query = req.uri, ''
path = unquote(path)
env.update({'REQUEST_METHOD': req.typestr,
'PATH_INFO': path,
'QUERY_STRING': query,
'SERVER_PROTOCOL': 'HTTP/%d.%d' % req.version,
'REMOTE_ADDR': req.remote_host,
'REMOTE_PORT': str(req.remote_port),
'REQUEST_URI': req.uri,
'wsgi.input': req.input_buffer})
for header, value in req.get_input_headers():
header = header.replace('-', '_').upper()
if header not in ('CONTENT_LENGTH', 'CONTENT_TYPE'):
header = 'HTTP_' + header
if header in env:
if 'COOKIE' in header:
env[header] += '; ' + value
else:
env[header] += ',' + value
else:
env[header] = value
return env
def set_environ(self, environ=None):
if environ is not None:
self.environ = environ
environ_update = getattr(self, 'environ', None)
self.environ = self.base_env.copy()
if environ_update is not None:
self.environ.update(environ_update)
if self.environ.get('wsgi.errors') is None:
self.environ['wsgi.errors'] = sys.stderr
def run_wsgi_app(address, app):
try:
from gunicorn.app.wsgiapp import WSGIApplication
class GunicornApplication(WSGIApplication):
def init(self, parser, opts, args):
return {'bind': '%s:%d' % (address[0], int(address[1])),
'workers': 2,
'worker_class': 'gevent'}
def load(self):
return application
GunicornApplication().run()
except ImportError:
from gevent.wsgi import WSGIServer
WSGIServer(address, app).serve_forever()
def prepare_env(self):
req = self.request
env = self.server.get_environ()
if '?' in req.uri:
path, query = req.uri.split('?', 1)
else:
path, query = req.uri, ''
path = unquote(path)
env.update({'REQUEST_METHOD': req.typestr,
'PATH_INFO': path,
'QUERY_STRING': query,
'SERVER_PROTOCOL': 'HTTP/%d.%d' % req.version,
'REMOTE_ADDR': req.remote_host,
'REMOTE_PORT': str(req.remote_port),
'REQUEST_URI': req.uri,
'wsgi.input': req.input_buffer})
for header, value in req.get_input_headers():
header = header.replace('-', '_').upper()
if header not in ('CONTENT_LENGTH', 'CONTENT_TYPE'):
header = 'HTTP_' + header
if header in env:
if 'COOKIE' in header:
env[header] += '; ' + value
else:
env[header] += ',' + value
else:
env[header] = value
return env
def set_environ(self, environ=None):
if environ is not None:
self.environ = environ
environ_update = getattr(self, 'environ', None)
self.environ = self.base_env.copy()
if environ_update is not None:
self.environ.update(environ_update)
if self.environ.get('wsgi.errors') is None:
self.environ['wsgi.errors'] = sys.stderr
def run_wsgi_app(address, app):
try:
from gunicorn.app.wsgiapp import WSGIApplication
class GunicornApplication(WSGIApplication):
def init(self, parser, opts, args):
return {'bind': '%s:%d' % (address[0], int(address[1])),
'workers': 2,
'worker_class': 'gevent'}
def load(self):
return application
GunicornApplication().run()
except ImportError:
from gevent.wsgi import WSGIServer
WSGIServer(address, app).serve_forever()
def tornado_combiner(configs, use_gevent=False, start=True, monkey_patch=None,
Container=None, Server=None, threadpool=None): # pragma: no cover
"""Combine servers in one tornado event loop process
:param configs: [
{
'app': Microservice Application or another wsgi application, required
'port': int, default: 5000
'address': str, default: ""
},
{ ... }
]
:param use_gevent: if True, app.wsgi will be run in gevent.spawn
:param start: if True, will be call utils.tornado_start()
:param Container: your class, bases on tornado.wsgi.WSGIContainer, default: tornado.wsgi.WSGIContainer
:param Server: your class, bases on tornado.httpserver.HTTPServer, default: tornado.httpserver.HTTPServer
:param monkey_patch: boolean, use gevent.monkey.patch_all() for patching standard modules, default: use_gevent
:return: list of tornado servers
"""
servers = []
if monkey_patch is None:
monkey_patch = use_gevent
if use_gevent:
if monkey_patch:
from gevent import monkey
monkey.patch_all()
if threadpool is not None:
from multiprocessing.pool import ThreadPool
if not isinstance(threadpool, ThreadPool):
threadpool = ThreadPool(threadpool)
for config in configs:
app = config['app']
port = config.get('port', 5000)
address = config.get('address', '')
server = tornado_run(app, use_gevent=use_gevent, port=port,
monkey_patch=False, address=address, start=False,
Container=Container,
Server=Server, threadpool=threadpool)
servers.append(server)
if start:
tornado_start()
return servers