def test_handles_connection_closed(self):
yield from self._connection.connect()
def raise_closed():
raise websockets.ConnectionClosed(4000, "")
yield from asyncio.sleep(0)
self._mock_socket.recv = raise_closed
self._queue.put_nowait(sample_method)
has_packet = yield from self._connection.has_packet()
self.assertTrue(has_packet) # reads what we pushed to get unblocked
has_packet = yield from self._connection.has_packet()
self.assertFalse(has_packet) # gets a connection closed
评论列表
文章目录