test_shutdownmanager.py 文件源码

python
阅读 18 收藏 0 点赞 0 评论 0

项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码
def test_restart_stops_exchanger(self):
        """
        After a successful shutdown, the broker stops processing new messages.
        """
        message = {"type": "shutdown", "reboot": False, "operation-id": 100}
        self.plugin.perform_shutdown(message)

        [arguments] = self.process_factory.spawns
        protocol = arguments[0]
        protocol.processEnded(Failure(ProcessDone(status=0)))
        self.broker_service.reactor.advance(100)
        self.manager.reactor.advance(100)

        # New messages will not be exchanged after a reboot process is in
        # process.
        self.manager.broker.exchanger.schedule_exchange()
        payloads = self.manager.broker.exchanger._transport.payloads
        self.assertEqual(0, len(payloads))
        return protocol.result
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号