def test_getting_messages_from_websocket_75(self):
connect = [
"GET /range HTTP/1.1",
"Upgrade: WebSocket",
"Connection: Upgrade",
"Host: %s:%s" % self.server_addr,
"Origin: http://%s:%s" % self.server_addr,
"WebSocket-Protocol: ws",
]
sock = eventlet.connect(self.server_addr)
sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n'))
resp = sock.recv(1024)
headers, result = resp.split(b'\r\n\r\n')
msgs = [result.strip(b'\x00\xff')]
cnt = 10
while cnt:
msgs.append(sock.recv(20).strip(b'\x00\xff'))
cnt -= 1
# Last item in msgs is an empty string
self.assertEqual(msgs[:-1], [six.b('msg %d' % i) for i in range(10)])
评论列表
文章目录