test_reporter.py 文件源码

python
阅读 22 收藏 0 点赞 0 评论 0

项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码
def test_store_messages(self):
        """
        L{FakeGlobalReporter} stores messages which are sent.
        """
        message_store = self.broker_service.message_store
        message_store.set_accepted_types(["package-reporter-result"])
        self.reporter.apt_update_filename = self.makeFile(
            "#!/bin/sh\necho -n error >&2\necho -n output\nexit 0")
        os.chmod(self.reporter.apt_update_filename, 0o755)
        deferred = Deferred()

        def do_test():
            self.reporter.get_session_id()
            result = self.reporter.run_apt_update()
            self.reactor.advance(0)

            def callback(ignore):
                message = {"type": "package-reporter-result",
                           "report-timestamp": 0.0, "code": 0, "err": u"error"}
                self.assertMessages(
                    message_store.get_pending_messages(), [message])
                stored = list(self.store._db.execute(
                    "SELECT id, data FROM message").fetchall())
                self.assertEqual(1, len(stored))
                self.assertEqual(1, stored[0][0])
                self.assertEqual(message, bpickle.loads(bytes(stored[0][1])))
            result.addCallback(callback)
            result.chainDeferred(deferred)

        reactor.callWhenRunning(do_test)
        return deferred
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号