websocket_test.py 文件源码

python
阅读 21 收藏 0 点赞 0 评论 0

项目:deb-python-eventlet 作者: openstack 项目源码 文件源码
def test_ssl_sending_messages(self):
        s = eventlet.wrap_ssl(eventlet.listen(('localhost', 0)),
                              certfile=tests.certificate_file,
                              keyfile=tests.private_key_file,
                              server_side=True)
        self.spawn_server(sock=s)
        connect = [
            "GET /echo HTTP/1.1",
            "Upgrade: WebSocket",
            "Connection: Upgrade",
            "Host: %s:%s" % self.server_addr,
            "Origin: http://%s:%s" % self.server_addr,
            "Sec-WebSocket-Protocol: ws",
            "Sec-WebSocket-Key1: 4 @1  46546xW%0l 1 5",
            "Sec-WebSocket-Key2: 12998 5 Y3 1  .P00",
        ]
        sock = eventlet.wrap_ssl(eventlet.connect(self.server_addr))

        sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U'))
        first_resp = b''
        while b'\r\n\r\n' not in first_resp:
            first_resp += sock.recv()
            print('resp now:')
            print(first_resp)
        # make sure it sets the wss: protocol on the location header
        loc_line = [x for x in first_resp.split(b"\r\n")
                    if x.lower().startswith(b'sec-websocket-location')][0]
        expect_wss = ('wss://%s:%s' % self.server_addr).encode()
        assert expect_wss in loc_line, "Expecting wss protocol in location: %s" % loc_line

        sock.sendall(b'\x00hello\xFF')
        result = sock.recv(1024)
        self.assertEqual(result, b'\x00hello\xff')
        sock.sendall(b'\x00start')
        eventlet.sleep(0.001)
        sock.sendall(b' end\xff')
        result = sock.recv(1024)
        self.assertEqual(result, b'\x00start end\xff')
        greenio.shutdown_safe(sock)
        sock.close()
        eventlet.sleep(0.01)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号