def test_close(self):
reactor = MemoryReactorClock()
c = KafkaBrokerClient('test_close', reactor=reactor)
c._connect() # Force a connection attempt
c.connector.factory = c # MemoryReactor doesn't make this connection.
c.connector.state = 'connected' # set the connector to connected state
dd = c.close()
self.assertIsInstance(dd, Deferred)
self.assertNoResult(dd)
f = Failure(ConnectionDone('test_close'))
c.clientConnectionLost(c.connector, f)
self.assertNoResult(dd)
# Advance the clock so the notify() call fires
reactor.advance(0.1)
r = self.successResultOf(dd)
self.assertIs(r, None)
评论列表
文章目录