def gevent(app, address, **options):
options = options['options']
workers = options.workers
from gevent import pywsgi
from gevent.pool import Pool
pywsgi.WSGIServer(address, app, spawn=workers and Pool(
int(options.workers)) or 'default', log=None).serve_forever()
python类WSGIServer()的实例源码
def pulsar(app, address, **options):
from pulsar.apps import wsgi
sys.argv = ['anyserver.py']
s = wsgi.WSGIServer(callable=app, bind="%s:%d" % address)
s.start()
def flup(app, address, **options):
import flup.server.fcgi
flup.server.fcgi.WSGIServer(app, bindAddress=address).run()
def gevent(app, address, **options):
options = options['options']
workers = options.workers
from gevent import pywsgi
from gevent.pool import Pool
pywsgi.WSGIServer(address, app, spawn=workers and Pool(
int(options.workers)) or 'default', log=None).serve_forever()
def pulsar(app, address, **options):
from pulsar.apps import wsgi
sys.argv = ['anyserver.py']
s = wsgi.WSGIServer(callable=app, bind="%s:%d" % address)
s.start()
def run(self, handler): # pragma: no cover
import flup.server.fcgi
self.options.setdefault('bindAddress', (self.host, self.port))
flup.server.fcgi.WSGIServer(handler, **self.options).run()
def run(self, app): # pragma: no cover
from wsgiref.simple_server import make_server
from wsgiref.simple_server import WSGIRequestHandler, WSGIServer
import socket
class FixedHandler(WSGIRequestHandler):
def address_string(self): # Prevent reverse DNS lookups please.
return self.client_address[0]
def log_request(*args, **kw):
if not self.quiet:
return WSGIRequestHandler.log_request(*args, **kw)
handler_cls = self.options.get('handler_class', FixedHandler)
server_cls = self.options.get('server_class', WSGIServer)
if ':' in self.host: # Fix wsgiref for IPv6 addresses.
if getattr(server_cls, 'address_family') == socket.AF_INET:
class server_cls(server_cls):
address_family = socket.AF_INET6
self.srv = make_server(self.host, self.port, app, server_cls,
handler_cls)
self.port = self.srv.server_port # update port actual port (0 means random)
try:
self.srv.serve_forever()
except KeyboardInterrupt:
self.srv.server_close() # Prevent ResourceWarning: unclosed socket
raise
def run(self, handler):
from gevent import pywsgi, local
if not isinstance(threading.local(), local.local):
msg = "Bottle requires gevent.monkey.patch_all() (before import)"
raise RuntimeError(msg)
if self.quiet:
self.options['log'] = None
address = (self.host, self.port)
server = pywsgi.WSGIServer(address, handler, **self.options)
if 'BOTTLE_CHILD' in os.environ:
import signal
signal.signal(signal.SIGINT, lambda s, f: server.stop())
server.serve_forever()
def run(self, handler):
server = pywsgi.WSGIServer((self.host, self.port), handler, handler_class=WebSocketHandler)
if not self.quiet:
server.logger = create_logger('geventwebsocket.logging')
server.logger.setLevel(logging.INFO)
server.logger.addHandler(logging.StreamHandler())
server.serve_forever()
def flup(app, address, **options):
import flup.server.fcgi
flup.server.fcgi.WSGIServer(app, bindAddress=address).run()
def gevent(app, address, **options):
options = options['options']
workers = options.workers
from gevent import pywsgi
from gevent.pool import Pool
pywsgi.WSGIServer(address, app, spawn=workers and Pool(
int(options.workers)) or 'default', log=None).serve_forever()
def pulsar(app, address, **options):
from pulsar.apps import wsgi
sys.argv = ['anyserver.py']
s = wsgi.WSGIServer(callable=app, bind="%s:%d" % address)
s.start()
def flup(app, address, **options):
import flup.server.fcgi
flup.server.fcgi.WSGIServer(app, bindAddress=address).run()
def gevent(app, address, **options):
options = options['options']
workers = options.workers
from gevent import pywsgi
from gevent.pool import Pool
pywsgi.WSGIServer(address, app, spawn=workers and Pool(
int(options.workers)) or 'default', log=None).serve_forever()
def pulsar(app, address, **options):
from pulsar.apps import wsgi
sys.argv = ['anyserver.py']
s = wsgi.WSGIServer(callable=app, bind="%s:%d" % address)
s.start()
def run(self, handler): # pragma: no cover
import flup.server.fcgi
self.options.setdefault('bindAddress', (self.host, self.port))
flup.server.fcgi.WSGIServer(handler, **self.options).run()
def run(self, app): # pragma: no cover
from wsgiref.simple_server import make_server
from wsgiref.simple_server import WSGIRequestHandler, WSGIServer
import socket
class FixedHandler(WSGIRequestHandler):
def address_string(self): # Prevent reverse DNS lookups please.
return self.client_address[0]
def log_request(*args, **kw):
if not self.quiet:
return WSGIRequestHandler.log_request(*args, **kw)
handler_cls = self.options.get('handler_class', FixedHandler)
server_cls = self.options.get('server_class', WSGIServer)
if ':' in self.host: # Fix wsgiref for IPv6 addresses.
if getattr(server_cls, 'address_family') == socket.AF_INET:
class server_cls(server_cls):
address_family = socket.AF_INET6
self.srv = make_server(self.host, self.port, app, server_cls,
handler_cls)
self.port = self.srv.server_port # update port actual port (0 means random)
try:
self.srv.serve_forever()
except KeyboardInterrupt:
self.srv.server_close() # Prevent ResourceWarning: unclosed socket
raise
def run(self, handler):
from gevent import pywsgi, local
if not isinstance(threading.local(), local.local):
msg = "Bottle requires gevent.monkey.patch_all() (before import)"
raise RuntimeError(msg)
if self.quiet:
self.options['log'] = None
address = (self.host, self.port)
server = pywsgi.WSGIServer(address, handler, **self.options)
if 'BOTTLE_CHILD' in os.environ:
import signal
signal.signal(signal.SIGINT, lambda s, f: server.stop())
server.serve_forever()
def flup(app, address, **options):
import flup.server.fcgi
flup.server.fcgi.WSGIServer(app, bindAddress=address).run()
def gevent(app, address, **options):
options = options['options']
workers = options.workers
from gevent import pywsgi
from gevent.pool import Pool
pywsgi.WSGIServer(address, app, spawn=workers and Pool(
int(options.workers)) or 'default', log=None).serve_forever()