def run(self, handler):
from gevent import wsgi, pywsgi, local
if not isinstance(threading.local(), local.local):
msg = "Bottle requires gevent.monkey.patch_all() (before import)"
raise RuntimeError(msg)
if not self.options.pop('fast', None): wsgi = pywsgi
self.options['log'] = None if self.quiet else 'default'
address = (self.host, self.port)
server = wsgi.WSGIServer(address, handler, **self.options)
if 'BOTTLE_CHILD' in os.environ:
import signal
signal.signal(signal.SIGINT, lambda s, f: server.stop())
server.serve_forever()
python类WSGIServer()的实例源码
def run(self, handler): # pragma: no cover
import flup.server.fcgi
self.options.setdefault('bindAddress', (self.host, self.port))
flup.server.fcgi.WSGIServer(handler, **self.options).run()
def run(self, handler):
from gevent import wsgi, pywsgi, local
if not isinstance(_lctx, local.local):
msg = "Bottle requires gevent.monkey.patch_all() (before import)"
raise RuntimeError(msg)
if not self.options.get('fast'): wsgi = pywsgi
log = None if self.quiet else 'default'
wsgi.WSGIServer((self.host, self.port), handler, log=log).serve_forever()
def run(self, handler): # pragma: no cover
import flup.server.fcgi
self.options.setdefault('bindAddress', (self.host, self.port))
flup.server.fcgi.WSGIServer(handler, **self.options).run()
def run(self, app): # pragma: no cover
from wsgiref.simple_server import make_server
from wsgiref.simple_server import WSGIRequestHandler, WSGIServer
import socket
class FixedHandler(WSGIRequestHandler):
def address_string(self): # Prevent reverse DNS lookups please.
return self.client_address[0]
def log_request(*args, **kw):
if not self.quiet:
return WSGIRequestHandler.log_request(*args, **kw)
handler_cls = self.options.get('handler_class', FixedHandler)
server_cls = self.options.get('server_class', WSGIServer)
if ':' in self.host: # Fix wsgiref for IPv6 addresses.
if getattr(server_cls, 'address_family') == socket.AF_INET:
class server_cls(server_cls):
address_family = socket.AF_INET6
self.srv = make_server(self.host, self.port, app, server_cls,
handler_cls)
self.port = self.srv.server_port # update port actual port (0 means random)
try:
self.srv.serve_forever()
except KeyboardInterrupt:
self.srv.server_close() # Prevent ResourceWarning: unclosed socket
raise
def run(self, handler):
from gevent import wsgi, pywsgi, local
if not isinstance(threading.local(), local.local):
msg = "Bottle requires gevent.monkey.patch_all() (before import)"
raise RuntimeError(msg)
if not self.options.pop('fast', None): wsgi = pywsgi
self.options['log'] = None if self.quiet else 'default'
address = (self.host, self.port)
server = wsgi.WSGIServer(address, handler, **self.options)
if 'BOTTLE_CHILD' in os.environ:
import signal
signal.signal(signal.SIGINT, lambda s, f: server.stop())
server.serve_forever()
def gevent_run(app, port=5000, log=None, error_log=None, address='',
monkey_patch=True, start=True, **kwargs): # pragma: no cover
"""Run your app in gevent.wsgi.WSGIServer
:param app: wsgi application, ex. Microservice instance
:param port: int, listen port, default 5000
:param address: str, listen address, default: ""
:param log: logger instance, default app.logger
:param error_log: logger instance, default app.logger
:param monkey_patch: boolean, use gevent.monkey.patch_all() for patching standard modules, default: True
:param start: boolean, if True, server will be start (server.serve_forever())
:param kwargs: other params for WSGIServer(**kwargs)
:return: server
"""
if log is None:
log = app.logger
if error_log is None:
error_log = app.logger
if monkey_patch:
from gevent import monkey
monkey.patch_all()
from gevent.wsgi import WSGIServer
http_server = WSGIServer((address, port), app, log=log, error_log=error_log,
**kwargs)
if start:
http_server.serve_forever()
return http_server
def run_server():
http_server = WSGIServer(('', 5000), DebuggedApplication(application))
http_server.serve_forever()
def run_api_server(app, port=5000):
"""
Runs a gevent server on a given port. Default port is 5000.
:param app: WSGI-compliant app to be run
:param port: port for the gevent server
:type port: int
:return: None
"""
log = logging.getLogger('api')
http_server = WSGIServer(('', port), app, log=log)
http_server.serve_forever()
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--port', help='server port',
type=int, default=9000)
args = parser.parse_args()
http_server = WSGIServer(('', args.port), app)
http_server.serve_forever()
def run(self, handler): # pragma: no cover
import flup.server.fcgi
self.options.setdefault('bindAddress', (self.host, self.port))
flup.server.fcgi.WSGIServer(handler, **self.options).run()
def run(self, handler):
from gevent import wsgi, pywsgi, local
if not isinstance(threading.local(), local.local):
msg = "Bottle requires gevent.monkey.patch_all() (before import)"
raise RuntimeError(msg)
if not self.options.pop('fast', None): wsgi = pywsgi
self.options['log'] = None if self.quiet else 'default'
address = (self.host, self.port)
server = wsgi.WSGIServer(address, handler, **self.options)
if 'BOTTLE_CHILD' in os.environ:
import signal
signal.signal(signal.SIGINT, lambda s, f: server.stop())
server.serve_forever()
def _start_flask(self):
#self.app.run(self.ip, self.port, debug=False, use_reloader=False)
#this should be a more production-fit http-server
#self.app.logger.setLevel(logging.ERROR)
self.http_server = WSGIServer((self.ip, self.port),
self.app,
log=open("/dev/null", "w") # This disables HTTP request logs to not mess up the CLI when e.g. the auto-updated dashboard is used
)
self.http_server.serve_forever()
def run(self, handler): # pragma: no cover
import flup.server.fcgi
self.options.setdefault('bindAddress', (self.host, self.port))
flup.server.fcgi.WSGIServer(handler, **self.options).run()
def run(self, handler):
from gevent import wsgi, pywsgi, local
if not isinstance(_lctx, local.local):
msg = "Bottle requires gevent.monkey.patch_all() (before import)"
raise RuntimeError(msg)
if not self.options.get('fast'): wsgi = pywsgi
log = None if self.quiet else 'default'
wsgi.WSGIServer((self.host, self.port), handler, log=log).serve_forever()
def main():
print 'Starting TRex HTTP proxy on port: ', server_config['port']
http_server = WSGIServer(('', server_config['port']), app)
http_server.serve_forever()
# Start web server
def run(self, handler): # pragma: no cover
import flup.server.fcgi
flup.server.fcgi.WSGIServer(handler, bindAddress=(self.host, self.port)).run()
def run(self, handler):
from gevent import wsgi
#from gevent.hub import getcurrent
#self.set_context_ident(getcurrent, weakref=True) # see contextlocal
wsgi.WSGIServer((self.host, self.port), handler).serve_forever()
def run(self, handler): # pragma: no cover
import flup.server.fcgi
self.options.setdefault('bindAddress', (self.host, self.port))
flup.server.fcgi.WSGIServer(handler, **self.options).run()
def run(self, handler):
from gevent import wsgi, pywsgi, local
if not isinstance(threading.local(), local.local):
msg = "Bottle requires gevent.monkey.patch_all() (before import)"
raise RuntimeError(msg)
if not self.options.pop('fast', None): wsgi = pywsgi
self.options['log'] = None if self.quiet else 'default'
address = (self.host, self.port)
server = wsgi.WSGIServer(address, handler, **self.options)
if 'BOTTLE_CHILD' in os.environ:
import signal
signal.signal(signal.SIGINT, lambda s, f: server.stop())
server.serve_forever()