def __init__(self, host, port, app, handler=None,
passthrough_errors=False, ssl_context=None):
if handler is None:
handler = WSGIRequestHandler
self.address_family = select_ip_version(host, port)
HTTPServer.__init__(self, (host, int(port)), handler)
self.app = app
self.passthrough_errors = passthrough_errors
self.shutdown_signal = False
if ssl_context is not None:
try:
from OpenSSL import tsafe
except ImportError:
raise TypeError('SSL is not available if the OpenSSL '
'library is not installed.')
if isinstance(ssl_context, tuple):
ssl_context = load_ssl_context(*ssl_context)
if ssl_context == 'adhoc':
ssl_context = generate_adhoc_ssl_context()
self.socket = tsafe.Connection(ssl_context, self.socket)
self.ssl_context = ssl_context
else:
self.ssl_context = None
python类__init__()的实例源码
def __init__(self, host, port, app, handler=None,
passthrough_errors=False, ssl_context=None):
if handler is None:
handler = WSGIRequestHandler
self.address_family = select_ip_version(host, port)
HTTPServer.__init__(self, (host, int(port)), handler)
self.app = app
self.passthrough_errors = passthrough_errors
self.shutdown_signal = False
if ssl_context is not None:
if isinstance(ssl_context, tuple):
ssl_context = load_ssl_context(*ssl_context)
if ssl_context == 'adhoc':
ssl_context = generate_adhoc_ssl_context()
self.socket = ssl_context.wrap_socket(self.socket,
server_side=True)
self.ssl_context = ssl_context
else:
self.ssl_context = None
def __init__(self, host, port, app, handler=None,
passthrough_errors=False, ssl_context=None):
if handler is None:
handler = WSGIRequestHandler
self.address_family = select_ip_version(host, port)
HTTPServer.__init__(self, (host, int(port)), handler)
self.app = app
self.passthrough_errors = passthrough_errors
self.shutdown_signal = False
if ssl_context is not None:
if isinstance(ssl_context, tuple):
ssl_context = load_ssl_context(*ssl_context)
if ssl_context == 'adhoc':
ssl_context = generate_adhoc_ssl_context()
self.socket = ssl_context.wrap_socket(self.socket,
server_side=True)
self.ssl_context = ssl_context
else:
self.ssl_context = None
def __init__(self, host, port, app, handler=None,
passthrough_errors=False, ssl_context=None):
if handler is None:
handler = WSGIRequestHandler
self.address_family = select_ip_version(host, port)
HTTPServer.__init__(self, (host, int(port)), handler)
self.app = app
self.passthrough_errors = passthrough_errors
self.shutdown_signal = False
if ssl_context is not None:
try:
from OpenSSL import tsafe
except ImportError:
raise TypeError('SSL is not available if the OpenSSL '
'library is not installed.')
if isinstance(ssl_context, tuple):
ssl_context = load_ssl_context(*ssl_context)
if ssl_context == 'adhoc':
ssl_context = generate_adhoc_ssl_context()
self.socket = tsafe.Connection(ssl_context, self.socket)
self.ssl_context = ssl_context
else:
self.ssl_context = None
def __init__(self, host, port, app, handler=None,
passthrough_errors=False, ssl_context=None):
if handler is None:
handler = WSGIRequestHandler
self.address_family = select_ip_version(host, port)
HTTPServer.__init__(self, (host, int(port)), handler)
self.app = app
self.passthrough_errors = passthrough_errors
self.shutdown_signal = False
if ssl_context is not None:
if isinstance(ssl_context, tuple):
ssl_context = load_ssl_context(*ssl_context)
if ssl_context == 'adhoc':
ssl_context = generate_adhoc_ssl_context()
self.socket = ssl_context.wrap_socket(self.socket,
server_side=True)
self.ssl_context = ssl_context
else:
self.ssl_context = None
def __init__(self, host, port, app, handler=None,
passthrough_errors=False, ssl_context=None):
if handler is None:
handler = WSGIRequestHandler
self.address_family = select_ip_version(host, port)
HTTPServer.__init__(self, (host, int(port)), handler)
self.app = app
self.passthrough_errors = passthrough_errors
self.shutdown_signal = False
if ssl_context is not None:
if isinstance(ssl_context, tuple):
ssl_context = load_ssl_context(*ssl_context)
if ssl_context == 'adhoc':
ssl_context = generate_adhoc_ssl_context()
self.socket = ssl_context.wrap_socket(self.socket,
server_side=True)
self.ssl_context = ssl_context
else:
self.ssl_context = None
def __init__(self, addr, handler, poll_interval, sockmap):
self._localaddr = addr
self._remoteaddr = None
self.data_size_limit = None
self.sockmap = sockmap
asyncore.dispatcher.__init__(self, map=sockmap)
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(0)
self.set_socket(sock, map=sockmap)
# try to re-use a server port if possible
self.set_reuse_addr()
self.bind(addr)
self.port = sock.getsockname()[1]
self.listen(5)
except:
self.close()
raise
self._handler = handler
self._thread = None
self.poll_interval = poll_interval
def __init__(self, addr, handler, poll_interval=0.5,
log=False, sslctx=None):
class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler):
def __getattr__(self, name, default=None):
if name.startswith('do_'):
return self.process_request
raise AttributeError(name)
def process_request(self):
self.server._handler(self)
def log_message(self, format, *args):
if log:
super(DelegatingHTTPRequestHandler,
self).log_message(format, *args)
HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler)
ControlMixin.__init__(self, handler, poll_interval)
self.sslctx = sslctx
def setUp(self):
class CheckingFilter(logging.Filter):
def __init__(self, cls):
self.cls = cls
def filter(self, record):
t = type(record)
if t is not self.cls:
msg = 'Unexpected LogRecord type %s, expected %s' % (t,
self.cls)
raise TypeError(msg)
return True
BaseTest.setUp(self)
self.filter = CheckingFilter(DerivedLogRecord)
self.root_logger.addFilter(self.filter)
self.orig_factory = logging.getLogRecordFactory()
def __init__(self, host, port, app, handler=None,
passthrough_errors=False, ssl_context=None):
if handler is None:
handler = WSGIRequestHandler
self.address_family = select_ip_version(host, port)
HTTPServer.__init__(self, (host, int(port)), handler)
self.app = app
self.passthrough_errors = passthrough_errors
self.shutdown_signal = False
if ssl_context is not None:
try:
from OpenSSL import tsafe
except ImportError:
raise TypeError('SSL is not available if the OpenSSL '
'library is not installed.')
if isinstance(ssl_context, tuple):
ssl_context = load_ssl_context(*ssl_context)
if ssl_context == 'adhoc':
ssl_context = generate_adhoc_ssl_context()
self.socket = tsafe.Connection(ssl_context, self.socket)
self.ssl_context = ssl_context
else:
self.ssl_context = None
def __init__(self, addr, handler, poll_interval=0.5,
log=False, sslctx=None):
class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler):
def __getattr__(self, name, default=None):
if name.startswith('do_'):
return self.process_request
raise AttributeError(name)
def process_request(self):
self.server._handler(self)
def log_message(self, format, *args):
if log:
super(DelegatingHTTPRequestHandler,
self).log_message(format, *args)
HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler)
ControlMixin.__init__(self, handler, poll_interval)
self.sslctx = sslctx
def __init__(self, addr, handler, poll_interval=0.5,
bind_and_activate=True):
class DelegatingUDPRequestHandler(DatagramRequestHandler):
def handle(self):
self.server._handler(self)
def finish(self):
data = self.wfile.getvalue()
if data:
try:
super(DelegatingUDPRequestHandler, self).finish()
except OSError:
if not self.server._closed:
raise
ThreadingUDPServer.__init__(self, addr,
DelegatingUDPRequestHandler,
bind_and_activate)
ControlMixin.__init__(self, handler, poll_interval)
self._closed = False
def __init__(self, addr, handler, poll_interval=0.5,
log=False, sslctx=None):
class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler):
def __getattr__(self, name, default=None):
if name.startswith('do_'):
return self.process_request
raise AttributeError(name)
def process_request(self):
self.server._handler(self)
def log_message(self, format, *args):
if log:
super(DelegatingHTTPRequestHandler,
self).log_message(format, *args)
HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler)
ControlMixin.__init__(self, handler, poll_interval)
self.sslctx = sslctx
def __init__(self, protocol):
self._protocol = protocol
self._certfile = None
self._keyfile = None
self._password = None
def __init__(self, host, port, app, handler=None,
passthrough_errors=False, ssl_context=None, fd=None):
if handler is None:
handler = WSGIRequestHandler
self.address_family = select_ip_version(host, port)
if fd is not None:
real_sock = socket.fromfd(fd, self.address_family,
socket.SOCK_STREAM)
port = 0
HTTPServer.__init__(self, (host, int(port)), handler)
self.app = app
self.passthrough_errors = passthrough_errors
self.shutdown_signal = False
self.host = host
self.port = port
# Patch in the original socket.
if fd is not None:
self.socket.close()
self.socket = real_sock
self.server_address = self.socket.getsockname()
if ssl_context is not None:
if isinstance(ssl_context, tuple):
ssl_context = load_ssl_context(*ssl_context)
if ssl_context == 'adhoc':
ssl_context = generate_adhoc_ssl_context()
# If we are on Python 2 the return value from socket.fromfd
# is an internal socket object but what we need for ssl wrap
# is the wrapper around it :(
sock = self.socket
if PY2 and not isinstance(sock, socket.socket):
sock = socket.socket(sock.family, sock.type, sock.proto, sock)
self.socket = ssl_context.wrap_socket(sock, server_side=True)
self.ssl_context = ssl_context
else:
self.ssl_context = None
def __init__(self, host, port, app, processes=40, handler=None,
passthrough_errors=False, ssl_context=None, fd=None):
BaseWSGIServer.__init__(self, host, port, app, handler,
passthrough_errors, ssl_context, fd)
self.max_children = processes
def __init__(self, protocol):
self._protocol = protocol
self._certfile = None
self._keyfile = None
self._password = None
def __init__(self, host, port, app, handler=None,
passthrough_errors=False, ssl_context=None, fd=None):
if handler is None:
handler = WSGIRequestHandler
self.address_family = select_ip_version(host, port)
if fd is not None:
real_sock = socket.fromfd(fd, self.address_family,
socket.SOCK_STREAM)
port = 0
HTTPServer.__init__(self, (host, int(port)), handler)
self.app = app
self.passthrough_errors = passthrough_errors
self.shutdown_signal = False
self.host = host
self.port = port
# Patch in the original socket.
if fd is not None:
self.socket.close()
self.socket = real_sock
self.server_address = self.socket.getsockname()
if ssl_context is not None:
if isinstance(ssl_context, tuple):
ssl_context = load_ssl_context(*ssl_context)
if ssl_context == 'adhoc':
ssl_context = generate_adhoc_ssl_context()
# If we are on Python 2 the return value from socket.fromfd
# is an internal socket object but what we need for ssl wrap
# is the wrapper around it :(
sock = self.socket
if PY2 and not isinstance(sock, socket.socket):
sock = socket.socket(sock.family, sock.type, sock.proto, sock)
self.socket = ssl_context.wrap_socket(sock, server_side=True)
self.ssl_context = ssl_context
else:
self.ssl_context = None
def __init__(self, host, port, app, processes=40, handler=None,
passthrough_errors=False, ssl_context=None, fd=None):
BaseWSGIServer.__init__(self, host, port, app, handler,
passthrough_errors, ssl_context, fd)
self.max_children = processes
def __init__(self, protocol):
self._protocol = protocol
self._certfile = None
self._keyfile = None
self._password = None
def __init__(self, host, port, app, handler=None,
passthrough_errors=False, ssl_context=None, fd=None):
if handler is None:
handler = WSGIRequestHandler
self.address_family = select_ip_version(host, port)
if fd is not None:
real_sock = socket.fromfd(fd, self.address_family,
socket.SOCK_STREAM)
port = 0
HTTPServer.__init__(self, (host, int(port)), handler)
self.app = app
self.passthrough_errors = passthrough_errors
self.shutdown_signal = False
self.host = host
self.port = port
# Patch in the original socket.
if fd is not None:
self.socket.close()
self.socket = real_sock
self.server_address = self.socket.getsockname()
if ssl_context is not None:
if isinstance(ssl_context, tuple):
ssl_context = load_ssl_context(*ssl_context)
if ssl_context == 'adhoc':
ssl_context = generate_adhoc_ssl_context()
# If we are on Python 2 the return value from socket.fromfd
# is an internal socket object but what we need for ssl wrap
# is the wrapper around it :(
sock = self.socket
if PY2 and not isinstance(sock, socket.socket):
sock = socket.socket(sock.family, sock.type, sock.proto, sock)
self.socket = ssl_context.wrap_socket(sock, server_side=True)
self.ssl_context = ssl_context
else:
self.ssl_context = None
def __init__(self, host, port, app, processes=40, handler=None,
passthrough_errors=False, ssl_context=None, fd=None):
BaseWSGIServer.__init__(self, host, port, app, handler,
passthrough_errors, ssl_context, fd)
self.max_children = processes
def __init__(self, protocol):
self._protocol = protocol
self._certfile = None
self._keyfile = None
self._password = None
def __init__(self, host, port, app, handler=None,
passthrough_errors=False, ssl_context=None, fd=None):
if handler is None:
handler = WSGIRequestHandler
self.address_family = select_ip_version(host, port)
if fd is not None:
real_sock = socket.fromfd(fd, self.address_family,
socket.SOCK_STREAM)
port = 0
HTTPServer.__init__(self, (host, int(port)), handler)
self.app = app
self.passthrough_errors = passthrough_errors
self.shutdown_signal = False
self.host = host
self.port = self.socket.getsockname()[1]
# Patch in the original socket.
if fd is not None:
self.socket.close()
self.socket = real_sock
self.server_address = self.socket.getsockname()
if ssl_context is not None:
if isinstance(ssl_context, tuple):
ssl_context = load_ssl_context(*ssl_context)
if ssl_context == 'adhoc':
ssl_context = generate_adhoc_ssl_context()
# If we are on Python 2 the return value from socket.fromfd
# is an internal socket object but what we need for ssl wrap
# is the wrapper around it :(
sock = self.socket
if PY2 and not isinstance(sock, socket.socket):
sock = socket.socket(sock.family, sock.type, sock.proto, sock)
self.socket = ssl_context.wrap_socket(sock, server_side=True)
self.ssl_context = ssl_context
else:
self.ssl_context = None
def __init__(self, host, port, app, processes=40, handler=None,
passthrough_errors=False, ssl_context=None, fd=None):
if not can_fork:
raise ValueError('Your platform does not support forking.')
BaseWSGIServer.__init__(self, host, port, app, handler,
passthrough_errors, ssl_context, fd)
self.max_children = processes
def __init__(self, protocol):
self._protocol = protocol
self._certfile = None
self._keyfile = None
self._password = None
def __init__(self, host, port, app, handler=None,
passthrough_errors=False, ssl_context=None, fd=None):
if handler is None:
handler = WSGIRequestHandler
self.address_family = select_ip_version(host, port)
if fd is not None:
real_sock = socket.fromfd(fd, self.address_family,
socket.SOCK_STREAM)
port = 0
HTTPServer.__init__(self, (host, int(port)), handler)
self.app = app
self.passthrough_errors = passthrough_errors
self.shutdown_signal = False
self.host = host
self.port = self.socket.getsockname()[1]
# Patch in the original socket.
if fd is not None:
self.socket.close()
self.socket = real_sock
self.server_address = self.socket.getsockname()
if ssl_context is not None:
if isinstance(ssl_context, tuple):
ssl_context = load_ssl_context(*ssl_context)
if ssl_context == 'adhoc':
ssl_context = generate_adhoc_ssl_context()
# If we are on Python 2 the return value from socket.fromfd
# is an internal socket object but what we need for ssl wrap
# is the wrapper around it :(
sock = self.socket
if PY2 and not isinstance(sock, socket.socket):
sock = socket.socket(sock.family, sock.type, sock.proto, sock)
self.socket = ssl_context.wrap_socket(sock, server_side=True)
self.ssl_context = ssl_context
else:
self.ssl_context = None
def __init__(self, protocol):
self._protocol = protocol
self._certfile = None
self._keyfile = None
self._password = None
def __init__(self, host, port, app, handler=None,
passthrough_errors=False, ssl_context=None, fd=None):
if handler is None:
handler = WSGIRequestHandler
self.address_family = select_ip_version(host, port)
if fd is not None:
real_sock = socket.fromfd(fd, self.address_family,
socket.SOCK_STREAM)
port = 0
HTTPServer.__init__(self, (host, int(port)), handler)
self.app = app
self.passthrough_errors = passthrough_errors
self.shutdown_signal = False
self.host = host
self.port = port
# Patch in the original socket.
if fd is not None:
self.socket.close()
self.socket = real_sock
self.server_address = self.socket.getsockname()
if ssl_context is not None:
if isinstance(ssl_context, tuple):
ssl_context = load_ssl_context(*ssl_context)
if ssl_context == 'adhoc':
ssl_context = generate_adhoc_ssl_context()
# If we are on Python 2 the return value from socket.fromfd
# is an internal socket object but what we need for ssl wrap
# is the wrapper around it :(
sock = self.socket
if PY2 and not isinstance(sock, socket.socket):
sock = socket.socket(sock.family, sock.type, sock.proto, sock)
self.socket = ssl_context.wrap_socket(sock, server_side=True)
self.ssl_context = ssl_context
else:
self.ssl_context = None
def __init__(self, host, port, app, processes=40, handler=None,
passthrough_errors=False, ssl_context=None, fd=None):
if not can_fork:
raise ValueError('Your platform does not support forking.')
BaseWSGIServer.__init__(self, host, port, app, handler,
passthrough_errors, ssl_context, fd)
self.max_children = processes