def ws_connect(self, url: str, timeout: float = None
) -> WebSocketClientConnection:
"""Make WebSocket connection to the url.
Retries up to _max (default: 20) times. Client connections made by this
method are closed after each test method.
"""
st = time.perf_counter()
timeout = timeout or self.timeout
while (time.perf_counter() - st) < timeout:
try:
ws = await to_asyncio_future(websocket_connect(url))
except ConnectionRefusedError:
await self.wait()
continue
else:
self._ws_connections.append(ws)
return ws
raise ConnectionRefusedError(
'WebSocket connection refused: {}'.format(url))
python类websocket_connect()的实例源码
def run_tests():
url = options.url + '/getCaseCount'
control_ws = yield websocket_connect(url, None)
num_tests = int((yield control_ws.read_message()))
logging.info('running %d cases', num_tests)
msg = yield control_ws.read_message()
assert msg is None
for i in range(1, num_tests + 1):
logging.info('running test case %d', i)
url = options.url + '/runCase?case=%d&agent=%s' % (i, options.name)
test_ws = yield websocket_connect(url, None, compression_options={})
while True:
message = yield test_ws.read_message()
if message is None:
break
test_ws.write_message(message, binary=isinstance(message, bytes))
url = options.url + '/updateReports?agent=%s' % options.name
update_ws = yield websocket_connect(url, None)
msg = yield update_ws.read_message()
assert msg is None
IOLoop.instance().stop()
def run_tests():
url = options.url + '/getCaseCount'
control_ws = yield websocket_connect(url, None)
num_tests = int((yield control_ws.read_message()))
logging.info('running %d cases', num_tests)
msg = yield control_ws.read_message()
assert msg is None
for i in range(1, num_tests + 1):
logging.info('running test case %d', i)
url = options.url + '/runCase?case=%d&agent=%s' % (i, options.name)
test_ws = yield websocket_connect(url, None, compression_options={})
while True:
message = yield test_ws.read_message()
if message is None:
break
test_ws.write_message(message, binary=isinstance(message, bytes))
url = options.url + '/updateReports?agent=%s' % options.name
update_ws = yield websocket_connect(url, None)
msg = yield update_ws.read_message()
assert msg is None
IOLoop.instance().stop()
def run_tests():
url = options.url + '/getCaseCount'
control_ws = yield websocket_connect(url, None)
num_tests = int((yield control_ws.read_message()))
logging.info('running %d cases', num_tests)
msg = yield control_ws.read_message()
assert msg is None
for i in range(1, num_tests + 1):
logging.info('running test case %d', i)
url = options.url + '/runCase?case=%d&agent=%s' % (i, options.name)
test_ws = yield websocket_connect(url, None, compression_options={})
while True:
message = yield test_ws.read_message()
if message is None:
break
test_ws.write_message(message, binary=isinstance(message, bytes))
url = options.url + '/updateReports?agent=%s' % options.name
update_ws = yield websocket_connect(url, None)
msg = yield update_ws.read_message()
assert msg is None
IOLoop.instance().stop()
def run_tests():
url = options.url + '/getCaseCount'
control_ws = yield websocket_connect(url, None)
num_tests = int((yield control_ws.read_message()))
logging.info('running %d cases', num_tests)
msg = yield control_ws.read_message()
assert msg is None
for i in range(1, num_tests + 1):
logging.info('running test case %d', i)
url = options.url + '/runCase?case=%d&agent=%s' % (i, options.name)
test_ws = yield websocket_connect(url, None, compression_options={})
while True:
message = yield test_ws.read_message()
if message is None:
break
test_ws.write_message(message, binary=isinstance(message, bytes))
url = options.url + '/updateReports?agent=%s' % options.name
update_ws = yield websocket_connect(url, None)
msg = yield update_ws.read_message()
assert msg is None
IOLoop.instance().stop()
def ws_connect(self, path, compression_options=None):
ws = yield websocket_connect(
'ws://127.0.0.1:%d%s' % (self.get_http_port(), path),
compression_options=compression_options)
raise gen.Return(ws)
def test_websocket_callbacks(self):
websocket_connect(
'ws://127.0.0.1:%d/echo' % self.get_http_port(),
io_loop=self.io_loop, callback=self.stop)
ws = self.wait().result()
ws.write_message('hello')
ws.read_message(self.stop)
response = self.wait().result()
self.assertEqual(response, 'hello')
self.close_future.add_done_callback(lambda f: self.stop())
ws.close()
self.wait()
def test_websocket_network_fail(self):
sock, port = bind_unused_port()
sock.close()
with self.assertRaises(IOError):
with ExpectLog(gen_log, ".*"):
yield websocket_connect(
'ws://127.0.0.1:%d/' % port,
io_loop=self.io_loop,
connect_timeout=3600)
def test_websocket_close_buffered_data(self):
ws = yield websocket_connect(
'ws://127.0.0.1:%d/echo' % self.get_http_port())
ws.write_message('hello')
ws.write_message('world')
# Close the underlying stream.
ws.stream.close()
yield self.close_future
def test_websocket_headers(self):
# Ensure that arbitrary headers can be passed through websocket_connect.
ws = yield websocket_connect(
HTTPRequest('ws://127.0.0.1:%d/header' % self.get_http_port(),
headers={'X-Test': 'hello'}))
response = yield ws.read_message()
self.assertEqual(response, 'hello')
yield self.close(ws)
def test_check_origin_valid_with_path(self):
port = self.get_http_port()
url = 'ws://127.0.0.1:%d/echo' % port
headers = {'Origin': 'http://127.0.0.1:%d/something' % port}
ws = yield websocket_connect(HTTPRequest(url, headers=headers),
io_loop=self.io_loop)
ws.write_message('hello')
response = yield ws.read_message()
self.assertEqual(response, 'hello')
yield self.close(ws)
def test_check_origin_invalid_partial_url(self):
port = self.get_http_port()
url = 'ws://127.0.0.1:%d/echo' % port
headers = {'Origin': '127.0.0.1:%d' % port}
with self.assertRaises(HTTPError) as cm:
yield websocket_connect(HTTPRequest(url, headers=headers),
io_loop=self.io_loop)
self.assertEqual(cm.exception.code, 403)
def test_check_origin_invalid(self):
port = self.get_http_port()
url = 'ws://127.0.0.1:%d/echo' % port
# Host is 127.0.0.1, which should not be accessible from some other
# domain
headers = {'Origin': 'http://somewhereelse.com'}
with self.assertRaises(HTTPError) as cm:
yield websocket_connect(HTTPRequest(url, headers=headers),
io_loop=self.io_loop)
self.assertEqual(cm.exception.code, 403)
def test_check_origin_invalid_subdomains(self):
port = self.get_http_port()
url = 'ws://localhost:%d/echo' % port
# Subdomains should be disallowed by default. If we could pass a
# resolver to websocket_connect we could test sibling domains as well.
headers = {'Origin': 'http://subtenant.localhost'}
with self.assertRaises(HTTPError) as cm:
yield websocket_connect(HTTPRequest(url, headers=headers),
io_loop=self.io_loop)
self.assertEqual(cm.exception.code, 403)
def ws_connect(self, path, compression_options=None):
ws = yield websocket_connect(
'ws://127.0.0.1:%d%s' % (self.get_http_port(), path),
compression_options=compression_options)
raise gen.Return(ws)
def test_websocket_network_fail(self):
sock, port = bind_unused_port()
sock.close()
with self.assertRaises(IOError):
with ExpectLog(gen_log, ".*"):
yield websocket_connect(
'ws://127.0.0.1:%d/' % port,
io_loop=self.io_loop,
connect_timeout=3600)
def test_websocket_close_buffered_data(self):
ws = yield websocket_connect(
'ws://127.0.0.1:%d/echo' % self.get_http_port())
ws.write_message('hello')
ws.write_message('world')
# Close the underlying stream.
ws.stream.close()
yield self.close_future
def test_websocket_headers(self):
# Ensure that arbitrary headers can be passed through websocket_connect.
ws = yield websocket_connect(
HTTPRequest('ws://127.0.0.1:%d/header' % self.get_http_port(),
headers={'X-Test': 'hello'}))
response = yield ws.read_message()
self.assertEqual(response, 'hello')
yield self.close(ws)
def test_check_origin_valid_no_path(self):
port = self.get_http_port()
url = 'ws://127.0.0.1:%d/echo' % port
headers = {'Origin': 'http://127.0.0.1:%d' % port}
ws = yield websocket_connect(HTTPRequest(url, headers=headers),
io_loop=self.io_loop)
ws.write_message('hello')
response = yield ws.read_message()
self.assertEqual(response, 'hello')
yield self.close(ws)
def test_check_origin_valid_with_path(self):
port = self.get_http_port()
url = 'ws://127.0.0.1:%d/echo' % port
headers = {'Origin': 'http://127.0.0.1:%d/something' % port}
ws = yield websocket_connect(HTTPRequest(url, headers=headers),
io_loop=self.io_loop)
ws.write_message('hello')
response = yield ws.read_message()
self.assertEqual(response, 'hello')
yield self.close(ws)
def test_check_origin_invalid(self):
port = self.get_http_port()
url = 'ws://127.0.0.1:%d/echo' % port
# Host is 127.0.0.1, which should not be accessible from some other
# domain
headers = {'Origin': 'http://somewhereelse.com'}
with self.assertRaises(HTTPError) as cm:
yield websocket_connect(HTTPRequest(url, headers=headers),
io_loop=self.io_loop)
self.assertEqual(cm.exception.code, 403)
def test_check_origin_invalid_subdomains(self):
port = self.get_http_port()
url = 'ws://localhost:%d/echo' % port
# Subdomains should be disallowed by default. If we could pass a
# resolver to websocket_connect we could test sibling domains as well.
headers = {'Origin': 'http://subtenant.localhost'}
with self.assertRaises(HTTPError) as cm:
yield websocket_connect(HTTPRequest(url, headers=headers),
io_loop=self.io_loop)
self.assertEqual(cm.exception.code, 403)
def ws_connect(self, path, compression_options=None):
ws = yield websocket_connect(
'ws://127.0.0.1:%d%s' % (self.get_http_port(), path),
compression_options=compression_options)
raise gen.Return(ws)
def test_websocket_callbacks(self):
websocket_connect(
'ws://127.0.0.1:%d/echo' % self.get_http_port(),
io_loop=self.io_loop, callback=self.stop)
ws = self.wait().result()
ws.write_message('hello')
ws.read_message(self.stop)
response = self.wait().result()
self.assertEqual(response, 'hello')
self.close_future.add_done_callback(lambda f: self.stop())
ws.close()
self.wait()
def test_websocket_network_fail(self):
sock, port = bind_unused_port()
sock.close()
with self.assertRaises(IOError):
with ExpectLog(gen_log, ".*"):
yield websocket_connect(
'ws://127.0.0.1:%d/' % port,
io_loop=self.io_loop,
connect_timeout=3600)
def test_websocket_headers(self):
# Ensure that arbitrary headers can be passed through websocket_connect.
ws = yield websocket_connect(
HTTPRequest('ws://127.0.0.1:%d/header' % self.get_http_port(),
headers={'X-Test': 'hello'}))
response = yield ws.read_message()
self.assertEqual(response, 'hello')
yield self.close(ws)
def test_check_origin_valid_no_path(self):
port = self.get_http_port()
url = 'ws://127.0.0.1:%d/echo' % port
headers = {'Origin': 'http://127.0.0.1:%d' % port}
ws = yield websocket_connect(HTTPRequest(url, headers=headers),
io_loop=self.io_loop)
ws.write_message('hello')
response = yield ws.read_message()
self.assertEqual(response, 'hello')
yield self.close(ws)
def test_check_origin_valid_with_path(self):
port = self.get_http_port()
url = 'ws://127.0.0.1:%d/echo' % port
headers = {'Origin': 'http://127.0.0.1:%d/something' % port}
ws = yield websocket_connect(HTTPRequest(url, headers=headers),
io_loop=self.io_loop)
ws.write_message('hello')
response = yield ws.read_message()
self.assertEqual(response, 'hello')
yield self.close(ws)
def test_check_origin_invalid_partial_url(self):
port = self.get_http_port()
url = 'ws://127.0.0.1:%d/echo' % port
headers = {'Origin': '127.0.0.1:%d' % port}
with self.assertRaises(HTTPError) as cm:
yield websocket_connect(HTTPRequest(url, headers=headers),
io_loop=self.io_loop)
self.assertEqual(cm.exception.code, 403)
def test_check_origin_invalid(self):
port = self.get_http_port()
url = 'ws://127.0.0.1:%d/echo' % port
# Host is 127.0.0.1, which should not be accessible from some other
# domain
headers = {'Origin': 'http://somewhereelse.com'}
with self.assertRaises(HTTPError) as cm:
yield websocket_connect(HTTPRequest(url, headers=headers),
io_loop=self.io_loop)
self.assertEqual(cm.exception.code, 403)