def test_connection_lost(self):
"""Test closing a client after the connection is lost."""
channel = self.successResultOf(self.protocol.channel(0))
d = channel.basic_consume(queue="test-queue")
self.transport.abortConnection()
self.assertTrue(self.protocol.closed)
failure = self.failureResultOf(d)
self.assertIsInstance(failure.value, Closed)
self.assertIsInstance(failure.value.args[0].value, ConnectionLost)
评论列表
文章目录