def test_012_ssl_server(self):
def wsgi_app(environ, start_response):
start_response('200 OK', {})
return [environ['wsgi.input'].read()]
certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')
server_sock = eventlet.wrap_ssl(eventlet.listen(('localhost', 0)),
certfile=certificate_file,
keyfile=private_key_file,
server_side=True)
self.spawn_server(sock=server_sock, site=wsgi_app)
sock = eventlet.connect(self.server_addr)
sock = eventlet.wrap_ssl(sock)
sock.write(
b'POST /foo HTTP/1.1\r\nHost: localhost\r\n'
b'Connection: close\r\nContent-length:3\r\n\r\nabc')
result = recvall(sock)
assert result.endswith(b'abc')
python类wrap_ssl()的实例源码
def test_013_empty_return(self):
def wsgi_app(environ, start_response):
start_response("200 OK", [])
return [b""]
certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')
server_sock = eventlet.wrap_ssl(eventlet.listen(('localhost', 0)),
certfile=certificate_file,
keyfile=private_key_file,
server_side=True)
self.spawn_server(sock=server_sock, site=wsgi_app)
sock = eventlet.connect(('localhost', server_sock.getsockname()[1]))
sock = eventlet.wrap_ssl(sock)
sock.write(b'GET /foo HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
result = recvall(sock)
assert result[-4:] == b'\r\n\r\n'
def handle(self, listener, client, addr):
if self.cfg.is_ssl:
client = eventlet.wrap_ssl(client, server_side=True,
**self.cfg.ssl_options)
super(EventletWorker, self).handle(listener, client, addr)
def handle(self, listener, client, addr):
if self.cfg.is_ssl:
client = eventlet.wrap_ssl(client, server_side=True,
**self.cfg.ssl_options)
super(EventletWorker, self).handle(listener, client, addr)
def handle(self, listener, client, addr):
if self.cfg.is_ssl:
client = eventlet.wrap_ssl(client, server_side=True,
**self.cfg.ssl_options)
super(EventletWorker, self).handle(listener, client, addr)
def handle(self, listener, client, addr):
if self.cfg.is_ssl:
client = eventlet.wrap_ssl(client, server_side=True,
**self.cfg.ssl_options)
super(EventletWorker, self).handle(listener, client, addr)
def handle(self, listener, client, addr):
if self.cfg.is_ssl:
client = eventlet.wrap_ssl(client, server_side=True,
**self.cfg.ssl_options)
super(EventletWorker, self).handle(listener, client, addr)
def handle(self, listener, client, addr):
if self.cfg.is_ssl:
client = eventlet.wrap_ssl(client, server_side=True,
**self.cfg.ssl_options)
super(EventletWorker, self).handle(listener, client, addr)
def handle(self, listener, client, addr):
if self.cfg.is_ssl:
client = eventlet.wrap_ssl(client, server_side=True,
**self.cfg.ssl_options)
super(EventletWorker, self).handle(listener, client, addr)
def handle(self, listener, client, addr):
if self.cfg.is_ssl:
client = eventlet.wrap_ssl(client, server_side=True,
**self.cfg.ssl_options)
super(EventletWorker, self).handle(listener, client, addr)
def handle(self, listener, client, addr):
if self.cfg.is_ssl:
client = eventlet.wrap_ssl(client, server_side=True,
**self.cfg.ssl_options)
super(EventletWorker, self).handle(listener, client, addr)
def handle(self, listener, client, addr):
if self.cfg.is_ssl:
client = eventlet.wrap_ssl(client, server_side=True,
**self.cfg.ssl_options)
super(EventletWorker, self).handle(listener, client, addr)
def handle(self, listener, client, addr):
if self.cfg.is_ssl:
client = eventlet.wrap_ssl(client, server_side=True,
**self.cfg.ssl_options)
super(EventletWorker, self).handle(listener, client, addr)
def handle(self, listener, client, addr):
if self.cfg.is_ssl:
client = eventlet.wrap_ssl(client, server_side=True,
**self.cfg.ssl_options)
super(EventletWorker, self).handle(listener, client, addr)
def handle(self, listener, client, addr):
if self.cfg.is_ssl:
client = eventlet.wrap_ssl(client, server_side=True,
**self.cfg.ssl_options)
super(EventletWorker, self).handle(listener, client, addr)
def handle(self, listener, client, addr):
if self.cfg.is_ssl:
client = eventlet.wrap_ssl(client, server_side=True,
**self.cfg.ssl_options)
super(EventletWorker, self).handle(listener, client, addr)
def test_ssl_sending_messages(self):
s = eventlet.wrap_ssl(eventlet.listen(('localhost', 0)),
certfile=tests.certificate_file,
keyfile=tests.private_key_file,
server_side=True)
self.spawn_server(sock=s)
connect = [
"GET /echo HTTP/1.1",
"Upgrade: WebSocket",
"Connection: Upgrade",
"Host: %s:%s" % self.server_addr,
"Origin: http://%s:%s" % self.server_addr,
"Sec-WebSocket-Protocol: ws",
"Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5",
"Sec-WebSocket-Key2: 12998 5 Y3 1 .P00",
]
sock = eventlet.wrap_ssl(eventlet.connect(self.server_addr))
sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U'))
first_resp = b''
while b'\r\n\r\n' not in first_resp:
first_resp += sock.recv()
print('resp now:')
print(first_resp)
# make sure it sets the wss: protocol on the location header
loc_line = [x for x in first_resp.split(b"\r\n")
if x.lower().startswith(b'sec-websocket-location')][0]
expect_wss = ('wss://%s:%s' % self.server_addr).encode()
assert expect_wss in loc_line, "Expecting wss protocol in location: %s" % loc_line
sock.sendall(b'\x00hello\xFF')
result = sock.recv(1024)
self.assertEqual(result, b'\x00hello\xff')
sock.sendall(b'\x00start')
eventlet.sleep(0.001)
sock.sendall(b' end\xff')
result = sock.recv(1024)
self.assertEqual(result, b'\x00start end\xff')
greenio.shutdown_safe(sock)
sock.close()
eventlet.sleep(0.01)
def test_017_ssl_zeroreturnerror(self):
def server(sock, site, log):
try:
serv = wsgi.Server(sock, sock.getsockname(), site, log)
client_socket, addr = sock.accept()
serv.process_request([addr, client_socket, wsgi.STATE_IDLE])
return True
except Exception:
traceback.print_exc()
return False
def wsgi_app(environ, start_response):
start_response('200 OK', [])
return [environ['wsgi.input'].read()]
certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')
sock = eventlet.wrap_ssl(
eventlet.listen(('localhost', 0)),
certfile=certificate_file, keyfile=private_key_file,
server_side=True)
server_coro = eventlet.spawn(server, sock, wsgi_app, self.logfile)
client = eventlet.connect(('localhost', sock.getsockname()[1]))
client = eventlet.wrap_ssl(client)
client.write(b'X') # non-empty payload so that SSL handshake occurs
greenio.shutdown_safe(client)
client.close()
success = server_coro.wait()
assert success
def test_028_ssl_handshake_errors(self):
errored = [False]
def server(sock):
try:
wsgi.server(sock=sock, site=hello_world, log=self.logfile)
errored[0] = 'SSL handshake error caused wsgi.server to exit.'
except greenthread.greenlet.GreenletExit:
pass
except Exception as e:
errored[0] = 'SSL handshake error raised exception %s.' % e
raise
for data in ('', 'GET /non-ssl-request HTTP/1.0\r\n\r\n'):
srv_sock = eventlet.wrap_ssl(
eventlet.listen(('localhost', 0)),
certfile=certificate_file, keyfile=private_key_file,
server_side=True)
addr = srv_sock.getsockname()
g = eventlet.spawn_n(server, srv_sock)
client = eventlet.connect(addr)
if data: # send non-ssl request
client.sendall(data.encode())
else: # close sock prematurely
client.close()
eventlet.sleep(0) # let context switch back to server
assert not errored[0], errored[0]
# make another request to ensure the server's still alive
try:
client = ssl.wrap_socket(eventlet.connect(addr))
client.write(b'GET / HTTP/1.0\r\nHost: localhost\r\n\r\n')
result = recvall(client)
assert result.startswith(b'HTTP'), result
assert result.endswith(b'hello world')
except ImportError:
pass # TODO(openssl): should test with OpenSSL
greenthread.kill(g)
def test_wrap_ssl(self):
server = eventlet.wrap_ssl(
eventlet.listen(('localhost', 0)),
certfile=certificate_file, keyfile=private_key_file,
server_side=True)
port = server.getsockname()[1]
def handle(sock, addr):
sock.sendall(sock.recv(1024))
raise eventlet.StopServe()
eventlet.spawn(eventlet.serve, server, handle)
client = eventlet.wrap_ssl(eventlet.connect(('localhost', port)))
client.sendall(b"echo")
self.assertEqual(b"echo", client.recv(1024))
def handle(self, listener, client, addr):
if self.cfg.is_ssl:
client = eventlet.wrap_ssl(client, server_side=True,
**self.cfg.ssl_options)
super(EventletWorker, self).handle(listener, client, addr)
def handle(self, listener, client, addr):
if self.cfg.is_ssl:
client = eventlet.wrap_ssl(client, server_side=True,
**self.cfg.ssl_options)
super(EventletWorker, self).handle(listener, client, addr)
def handle(self, listener, client, addr):
if self.cfg.is_ssl:
client = eventlet.wrap_ssl(client, server_side=True,
**self.cfg.ssl_options)
super(EventletWorker, self).handle(listener, client, addr)