python类wrap_ssl()的实例源码

wsgi_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_012_ssl_server(self):
        def wsgi_app(environ, start_response):
            start_response('200 OK', {})
            return [environ['wsgi.input'].read()]

        certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
        private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')

        server_sock = eventlet.wrap_ssl(eventlet.listen(('localhost', 0)),
                                        certfile=certificate_file,
                                        keyfile=private_key_file,
                                        server_side=True)
        self.spawn_server(sock=server_sock, site=wsgi_app)

        sock = eventlet.connect(self.server_addr)
        sock = eventlet.wrap_ssl(sock)
        sock.write(
            b'POST /foo HTTP/1.1\r\nHost: localhost\r\n'
            b'Connection: close\r\nContent-length:3\r\n\r\nabc')
        result = recvall(sock)
        assert result.endswith(b'abc')
wsgi_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_013_empty_return(self):
        def wsgi_app(environ, start_response):
            start_response("200 OK", [])
            return [b""]

        certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
        private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')
        server_sock = eventlet.wrap_ssl(eventlet.listen(('localhost', 0)),
                                        certfile=certificate_file,
                                        keyfile=private_key_file,
                                        server_side=True)
        self.spawn_server(sock=server_sock, site=wsgi_app)

        sock = eventlet.connect(('localhost', server_sock.getsockname()[1]))
        sock = eventlet.wrap_ssl(sock)
        sock.write(b'GET /foo HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
        result = recvall(sock)
        assert result[-4:] == b'\r\n\r\n'
geventlet.py 文件源码 项目:flasky 作者: RoseOu 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def handle(self, listener, client, addr):
        if self.cfg.is_ssl:
            client = eventlet.wrap_ssl(client, server_side=True,
                                       **self.cfg.ssl_options)

        super(EventletWorker, self).handle(listener, client, addr)
geventlet.py 文件源码 项目:chihu 作者: yelongyu 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def handle(self, listener, client, addr):
        if self.cfg.is_ssl:
            client = eventlet.wrap_ssl(client, server_side=True,
                                       **self.cfg.ssl_options)

        super(EventletWorker, self).handle(listener, client, addr)
geventlet.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def handle(self, listener, client, addr):
        if self.cfg.is_ssl:
            client = eventlet.wrap_ssl(client, server_side=True,
                                       **self.cfg.ssl_options)

        super(EventletWorker, self).handle(listener, client, addr)
geventlet.py 文件源码 项目:Price-Comparator 作者: Thejas-1 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def handle(self, listener, client, addr):
        if self.cfg.is_ssl:
            client = eventlet.wrap_ssl(client, server_side=True,
                                       **self.cfg.ssl_options)

        super(EventletWorker, self).handle(listener, client, addr)
geventlet.py 文件源码 项目:tabmaster 作者: NicolasMinghetti 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def handle(self, listener, client, addr):
        if self.cfg.is_ssl:
            client = eventlet.wrap_ssl(client, server_side=True,
                                       **self.cfg.ssl_options)

        super(EventletWorker, self).handle(listener, client, addr)
geventlet.py 文件源码 项目:infiblog 作者: RajuKoushik 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def handle(self, listener, client, addr):
        if self.cfg.is_ssl:
            client = eventlet.wrap_ssl(client, server_side=True,
                                       **self.cfg.ssl_options)

        super(EventletWorker, self).handle(listener, client, addr)
geventlet.py 文件源码 项目:metrics 作者: Jeremy-Friedman 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def handle(self, listener, client, addr):
        if self.cfg.is_ssl:
            client = eventlet.wrap_ssl(client, server_side=True,
                                       **self.cfg.ssl_options)

        super(EventletWorker, self).handle(listener, client, addr)
geventlet.py 文件源码 项目:metrics 作者: Jeremy-Friedman 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def handle(self, listener, client, addr):
        if self.cfg.is_ssl:
            client = eventlet.wrap_ssl(client, server_side=True,
                                       **self.cfg.ssl_options)

        super(EventletWorker, self).handle(listener, client, addr)
geventlet.py 文件源码 项目:logo-gen 作者: jellene4eva 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def handle(self, listener, client, addr):
        if self.cfg.is_ssl:
            client = eventlet.wrap_ssl(client, server_side=True,
                                       **self.cfg.ssl_options)

        super(EventletWorker, self).handle(listener, client, addr)
geventlet.py 文件源码 项目:liberator 作者: libscie 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def handle(self, listener, client, addr):
        if self.cfg.is_ssl:
            client = eventlet.wrap_ssl(client, server_side=True,
                                       **self.cfg.ssl_options)

        super(EventletWorker, self).handle(listener, client, addr)
geventlet.py 文件源码 项目:compatify 作者: hatooku 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def handle(self, listener, client, addr):
        if self.cfg.is_ssl:
            client = eventlet.wrap_ssl(client, server_side=True,
                                       **self.cfg.ssl_options)

        super(EventletWorker, self).handle(listener, client, addr)
geventlet.py 文件源码 项目:djanoDoc 作者: JustinChavez 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def handle(self, listener, client, addr):
        if self.cfg.is_ssl:
            client = eventlet.wrap_ssl(client, server_side=True,
                                       **self.cfg.ssl_options)

        super(EventletWorker, self).handle(listener, client, addr)
geventlet.py 文件源码 项目:Data-visualization 作者: insta-code1 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def handle(self, listener, client, addr):
        if self.cfg.is_ssl:
            client = eventlet.wrap_ssl(client, server_side=True,
                                       **self.cfg.ssl_options)

        super(EventletWorker, self).handle(listener, client, addr)
geventlet.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def handle(self, listener, client, addr):
        if self.cfg.is_ssl:
            client = eventlet.wrap_ssl(client, server_side=True,
                                       **self.cfg.ssl_options)

        super(EventletWorker, self).handle(listener, client, addr)
websocket_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_ssl_sending_messages(self):
        s = eventlet.wrap_ssl(eventlet.listen(('localhost', 0)),
                              certfile=tests.certificate_file,
                              keyfile=tests.private_key_file,
                              server_side=True)
        self.spawn_server(sock=s)
        connect = [
            "GET /echo HTTP/1.1",
            "Upgrade: WebSocket",
            "Connection: Upgrade",
            "Host: %s:%s" % self.server_addr,
            "Origin: http://%s:%s" % self.server_addr,
            "Sec-WebSocket-Protocol: ws",
            "Sec-WebSocket-Key1: 4 @1  46546xW%0l 1 5",
            "Sec-WebSocket-Key2: 12998 5 Y3 1  .P00",
        ]
        sock = eventlet.wrap_ssl(eventlet.connect(self.server_addr))

        sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U'))
        first_resp = b''
        while b'\r\n\r\n' not in first_resp:
            first_resp += sock.recv()
            print('resp now:')
            print(first_resp)
        # make sure it sets the wss: protocol on the location header
        loc_line = [x for x in first_resp.split(b"\r\n")
                    if x.lower().startswith(b'sec-websocket-location')][0]
        expect_wss = ('wss://%s:%s' % self.server_addr).encode()
        assert expect_wss in loc_line, "Expecting wss protocol in location: %s" % loc_line

        sock.sendall(b'\x00hello\xFF')
        result = sock.recv(1024)
        self.assertEqual(result, b'\x00hello\xff')
        sock.sendall(b'\x00start')
        eventlet.sleep(0.001)
        sock.sendall(b' end\xff')
        result = sock.recv(1024)
        self.assertEqual(result, b'\x00start end\xff')
        greenio.shutdown_safe(sock)
        sock.close()
        eventlet.sleep(0.01)
wsgi_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_017_ssl_zeroreturnerror(self):

        def server(sock, site, log):
            try:
                serv = wsgi.Server(sock, sock.getsockname(), site, log)
                client_socket, addr = sock.accept()
                serv.process_request([addr, client_socket, wsgi.STATE_IDLE])
                return True
            except Exception:
                traceback.print_exc()
                return False

        def wsgi_app(environ, start_response):
            start_response('200 OK', [])
            return [environ['wsgi.input'].read()]

        certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
        private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')

        sock = eventlet.wrap_ssl(
            eventlet.listen(('localhost', 0)),
            certfile=certificate_file, keyfile=private_key_file,
            server_side=True)
        server_coro = eventlet.spawn(server, sock, wsgi_app, self.logfile)

        client = eventlet.connect(('localhost', sock.getsockname()[1]))
        client = eventlet.wrap_ssl(client)
        client.write(b'X')  # non-empty payload so that SSL handshake occurs
        greenio.shutdown_safe(client)
        client.close()

        success = server_coro.wait()
        assert success
wsgi_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_028_ssl_handshake_errors(self):
        errored = [False]

        def server(sock):
            try:
                wsgi.server(sock=sock, site=hello_world, log=self.logfile)
                errored[0] = 'SSL handshake error caused wsgi.server to exit.'
            except greenthread.greenlet.GreenletExit:
                pass
            except Exception as e:
                errored[0] = 'SSL handshake error raised exception %s.' % e
                raise
        for data in ('', 'GET /non-ssl-request HTTP/1.0\r\n\r\n'):
            srv_sock = eventlet.wrap_ssl(
                eventlet.listen(('localhost', 0)),
                certfile=certificate_file, keyfile=private_key_file,
                server_side=True)
            addr = srv_sock.getsockname()
            g = eventlet.spawn_n(server, srv_sock)
            client = eventlet.connect(addr)
            if data:  # send non-ssl request
                client.sendall(data.encode())
            else:  # close sock prematurely
                client.close()
            eventlet.sleep(0)  # let context switch back to server
            assert not errored[0], errored[0]
            # make another request to ensure the server's still alive
            try:
                client = ssl.wrap_socket(eventlet.connect(addr))
                client.write(b'GET / HTTP/1.0\r\nHost: localhost\r\n\r\n')
                result = recvall(client)
                assert result.startswith(b'HTTP'), result
                assert result.endswith(b'hello world')
            except ImportError:
                pass  # TODO(openssl): should test with OpenSSL
            greenthread.kill(g)
convenience_test.py 文件源码 项目:deb-python-eventlet 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_wrap_ssl(self):
        server = eventlet.wrap_ssl(
            eventlet.listen(('localhost', 0)),
            certfile=certificate_file, keyfile=private_key_file,
            server_side=True)
        port = server.getsockname()[1]

        def handle(sock, addr):
            sock.sendall(sock.recv(1024))
            raise eventlet.StopServe()

        eventlet.spawn(eventlet.serve, server, handle)
        client = eventlet.wrap_ssl(eventlet.connect(('localhost', port)))
        client.sendall(b"echo")
        self.assertEqual(b"echo", client.recv(1024))
geventlet.py 文件源码 项目:django-next-train 作者: bitpixdigital 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def handle(self, listener, client, addr):
        if self.cfg.is_ssl:
            client = eventlet.wrap_ssl(client, server_side=True,
                                       **self.cfg.ssl_options)

        super(EventletWorker, self).handle(listener, client, addr)
geventlet.py 文件源码 项目:Lixiang_zhaoxin 作者: hejaxian 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def handle(self, listener, client, addr):
        if self.cfg.is_ssl:
            client = eventlet.wrap_ssl(client, server_side=True,
                                       **self.cfg.ssl_options)

        super(EventletWorker, self).handle(listener, client, addr)
geventlet.py 文件源码 项目:Alfred 作者: jkachhadia 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def handle(self, listener, client, addr):
        if self.cfg.is_ssl:
            client = eventlet.wrap_ssl(client, server_side=True,
                                       **self.cfg.ssl_options)

        super(EventletWorker, self).handle(listener, client, addr)


问题


面经


文章

微信
公众号

扫码关注公众号