def ws_connect(self, url: str, timeout: float = None
) -> WebSocketClientConnection:
"""Make WebSocket connection to the url.
Retries up to _max (default: 20) times. Client connections made by this
method are closed after each test method.
"""
st = time.perf_counter()
timeout = timeout or self.timeout
while (time.perf_counter() - st) < timeout:
try:
ws = await to_asyncio_future(websocket_connect(url))
except ConnectionRefusedError:
await self.wait()
continue
else:
self._ws_connections.append(ws)
return ws
raise ConnectionRefusedError(
'WebSocket connection refused: {}'.format(url))
评论列表
文章目录