test_brokerclient.py 文件源码

python
阅读 18 收藏 0 点赞 0 评论 0

项目:afkak 作者: ciena 项目源码 文件源码
def test_close(self):
        reactor = MemoryReactorClock()
        c = KafkaBrokerClient('test_close', reactor=reactor)
        c._connect()  # Force a connection attempt
        c.connector.factory = c  # MemoryReactor doesn't make this connection.
        c.connector.state = 'connected'  # set the connector to connected state
        dd = c.close()
        self.assertIsInstance(dd, Deferred)
        self.assertNoResult(dd)
        f = Failure(ConnectionDone('test_close'))
        c.clientConnectionLost(c.connector, f)
        self.assertNoResult(dd)
        # Advance the clock so the notify() call fires
        reactor.advance(0.1)
        r = self.successResultOf(dd)
        self.assertIs(r, None)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号