python类callWhenRunning()的实例源码

test_internet.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_crash(self):
        """
        reactor.crash should NOT fire shutdown triggers
        """
        events = []
        self.addTrigger(
            "before", "shutdown",
            lambda: events.append(("before", "shutdown")))

        # reactor.crash called from an "after-startup" trigger is too early
        # for the gtkreactor: gtk_mainloop is not yet running. Same is true
        # when called with reactor.callLater(0). Must be >0 seconds in the
        # future to let gtk_mainloop start first.
        reactor.callWhenRunning(
            reactor.callLater, 0, reactor.crash)
        reactor.run()
        self.failIf(events, "reactor.crash invoked shutdown triggers, but it "
                            "isn't supposed to.")

    # XXX Test that reactor.stop() invokes shutdown triggers
plebnet_plugin.py 文件源码 项目:PlebNet 作者: rjwvandenberg 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def makeService(self, options):
        """
        Construct a Tribler service.
        """
        tribler_service = MultiService()
        tribler_service.setName("Market")

        manhole_namespace = {}
        if options["manhole"] > 0:
            port = options["manhole"]
            manhole = manhole_tap.makeService({
                'namespace': manhole_namespace,
                'telnetPort': 'tcp:%d:interface=127.0.0.1' % port,
                'sshPort': None,
                'passwd': os.path.join(os.path.dirname(__file__), 'passwd'),
            })
            tribler_service.addService(manhole)

        reactor.callWhenRunning(self.start_tribler, options)

        return tribler_service
monast.py 文件源码 项目:monast 作者: dagmoller 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _processClientActions(self):
        log.debug("Processing Client Actions...")
        while self.clientActions:
            session, action = self.clientActions.pop(0)
            servername      = action['server'][0]
            role, handler   = self.actionHandlers.get(action['action'][0], (None, None))
            if handler:
                if self.authRequired:
                    if role in self.authUsers[session.username].servers.get(servername):
                        reactor.callWhenRunning(handler, session, action)
                    else:
                        self.http._addUpdate(servername = servername, sessid = session.uid, action = "RequestError", message = "You do not have permission to execute this action.")
                else:
                    reactor.callWhenRunning(handler, session, action)
            else:
                log.error("ClientActionHandler for action %s does not exixts..." % action['action'][0])
test_reporter.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def test_run_apt_update_report_timestamp(self):
        """
        The package-report-result message includes a timestamp of the apt
        update run.
        """
        message_store = self.broker_service.message_store
        message_store.set_accepted_types(["package-reporter-result"])
        self._make_fake_apt_update(err="")
        deferred = Deferred()

        def do_test():
            self.reactor.advance(10)
            result = self.reporter.run_apt_update()

            def callback(ignore):
                self.assertMessages(
                    message_store.get_pending_messages(),
                    [{"type": "package-reporter-result",
                      "report-timestamp": 10.0, "code": 0, "err": u""}])
            result.addCallback(callback)
            self.reactor.advance(0)
            result.chainDeferred(deferred)

        reactor.callWhenRunning(do_test)
        return deferred
test_reporter.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_run_apt_update_report_apt_failure(self):
        """
        If L{PackageReporter.run_apt_update} fails, a message is sent to the
        server reporting the error, to be able to fix the problem centrally.
        """
        message_store = self.broker_service.message_store
        message_store.set_accepted_types(["package-reporter-result"])
        self._make_fake_apt_update(code=2)
        deferred = Deferred()

        def do_test():
            result = self.reporter.run_apt_update()

            def callback(ignore):
                self.assertMessages(
                    message_store.get_pending_messages(),
                    [{"type": "package-reporter-result",
                      "report-timestamp": 0.0, "code": 2, "err": u"error"}])
            result.addCallback(callback)
            self.reactor.advance(0)
            result.chainDeferred(deferred)

        reactor.callWhenRunning(do_test)
        return deferred
test_reporter.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_run_apt_update_report_success(self):
        """
        L{PackageReporter.run_apt_update} also reports success to be able to
        know the proper state of the client.
        """
        message_store = self.broker_service.message_store
        message_store.set_accepted_types(["package-reporter-result"])
        self._make_fake_apt_update(err="message")
        deferred = Deferred()

        def do_test():
            result = self.reporter.run_apt_update()

            def callback(ignore):
                self.assertMessages(
                    message_store.get_pending_messages(),
                    [{"type": "package-reporter-result",
                      "report-timestamp": 0.0, "code": 0, "err": u"message"}])
            result.addCallback(callback)
            self.reactor.advance(0)
            result.chainDeferred(deferred)

        reactor.callWhenRunning(do_test)
        return deferred
test_reporter.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_run_apt_update_touches_stamp_file(self):
        """
        The L{PackageReporter.run_apt_update} method touches a stamp file
        after running the apt-update wrapper.
        """
        self.reporter.sources_list_filename = "/I/Dont/Exist"
        self._make_fake_apt_update()
        deferred = Deferred()

        def do_test():
            result = self.reporter.run_apt_update()

            def callback(ignored):
                self.assertTrue(
                    os.path.exists(self.config.update_stamp_filename))
            result.addCallback(callback)
            self.reactor.advance(0)
            result.chainDeferred(deferred)

        reactor.callWhenRunning(do_test)
        return deferred
test_internet.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_crash(self):
        """
        reactor.crash should NOT fire shutdown triggers
        """
        events = []
        self.addTrigger(
            "before", "shutdown",
            lambda: events.append(("before", "shutdown")))

        # reactor.crash called from an "after-startup" trigger is too early
        # for the gtkreactor: gtk_mainloop is not yet running. Same is true
        # when called with reactor.callLater(0). Must be >0 seconds in the
        # future to let gtk_mainloop start first.
        reactor.callWhenRunning(
            reactor.callLater, 0, reactor.crash)
        reactor.run()
        self.failIf(events, "reactor.crash invoked shutdown triggers, but it "
                            "isn't supposed to.")

    # XXX Test that reactor.stop() invokes shutdown triggers
mockserver.py 文件源码 项目:scrapy-cdr 作者: TeamHG-Memex 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('resource')
    args = parser.parse_args()
    module_name, name = args.resource.rsplit('.', 1)
    sys.path.append('.')
    resource = getattr(import_module(module_name), name)()
    http_port = reactor.listenTCP(PORT, Site(resource))

    def print_listening():
        host = http_port.getHost()
        print('Mock server {} running at http://{}:{}'.format(
            resource, host.host, host.port))

    reactor.callWhenRunning(print_listening)
    reactor.run()
test_coinswap.py 文件源码 项目:CoinSwapCS 作者: AdamISZ 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def runcase(alice_class, carol_class, fail_alice_state=None, fail_carol_state=None):
    options_server = Options()
    wallets = make_wallets(num_alices + 1,
                               wallet_structures=wallet_structures,
                               mean_amt=funding_amount)
    args_server = ["dummy"]
    test_data_server = (wallets[num_alices]['seed'], args_server, options_server,
                        False, None, carol_class, None, fail_carol_state)
    carol_bbmb = main_cs(test_data_server)
    options_alice = Options()
    options_alice.serve = False
    alices = []
    for i in range(num_alices):
        args_alice = ["dummy", amounts[i]]
        if dest_addr:
            args_alice.append(dest_addr)
        test_data_alice = (wallets[i]['seed'], args_alice, options_alice, False,
                           alice_class, None, fail_alice_state, None)
        alices.append(main_cs(test_data_alice))
    l = task.LoopingCall(miner)
    reactor.callWhenRunning(start_mining, l)
    reactor.run()
    return (alices, carol_bbmb, wallets[num_alices]['wallet'])
daemon.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def run(self):
        """Loads plugins, and initiates polling schedules."""
        reactor.callWhenRunning(self.install_sighandlers)

        if self.options.netbox:
            self.setup_single_job()
        elif self.options.multiprocess:
            self.setup_multiprocess(self.options.multiprocess,
                                    self.options.max_jobs)
        elif self.options.worker:
            self.setup_worker()
        else:
            self.setup_scheduling()

        reactor.suggestThreadPoolSize(self.options.threadpoolsize)
        reactor.addSystemEventTrigger("after", "shutdown", self.shutdown)
        reactor.run()
daemon.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def setup_scheduling(self):
        "Sets up regular job scheduling according to config"
        # NOTE: This is locally imported because it will in turn import
        # twistedsnmp. Twistedsnmp is stupid enough to call
        # logging.basicConfig().  If imported before our own loginit, this
        # causes us to have two StreamHandlers on the root logger, duplicating
        # every log statement.
        self._logger.info("Starting scheduling in single process")
        from .schedule import JobScheduler
        plugins.import_plugins()
        self.work_pool = pool.InlinePool()
        reactor.callWhenRunning(JobScheduler.initialize_from_config_and_run,
                                self.work_pool, self.options.onlyjob)

        def log_scheduler_jobs():
            JobScheduler.log_active_jobs(logging.INFO)

        self.job_loggers.append(log_scheduler_jobs)

        def reload_netboxes():
            JobScheduler.reload()

        self.reloaders.append(reload_netboxes)
daemon.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def setup_multiprocess(self, process_count, max_jobs):
        self._logger.info("Starting multi-process setup")
        from .schedule import JobScheduler
        plugins.import_plugins()
        self.work_pool = pool.WorkerPool(process_count,
                                         max_jobs,
                                         self.options.threadpoolsize)
        reactor.callWhenRunning(JobScheduler.initialize_from_config_and_run,
                                self.work_pool, self.options.onlyjob)

        def log_scheduler_jobs():
            JobScheduler.log_active_jobs(logging.INFO)

        self.job_loggers.append(log_scheduler_jobs)
        self.job_loggers.append(self.work_pool.log_summary)

        def reload_netboxes():
            JobScheduler.reload()

        self.reloaders.append(reload_netboxes)
test_task.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def test_synchronousStop(self):
        """
        L{task.react} handles when the reactor is stopped just before the
        returned L{Deferred} fires.
        """
        def main(reactor):
            d = defer.Deferred()
            def stop():
                reactor.stop()
                d.callback(None)
            reactor.callWhenRunning(stop)
            return d
        r = _FakeReactor()
        exitError = self.assertRaises(
            SystemExit, task.react, main, [], _reactor=r)
        self.assertEqual(0, exitError.code)
mockserver.py 文件源码 项目:domain-discovery-crawler 作者: TeamHG-Memex 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('resource')
    args = parser.parse_args()
    module_name, name = args.resource.rsplit('.', 1)
    sys.path.append('.')
    resource = getattr(import_module(module_name), name)()
    http_port = reactor.listenTCP(PORT, Site(resource))

    def print_listening():
        host = http_port.getHost()
        print('Mock server {} running at http://{}:{}'.format(
            resource, host.host, host.port))

    reactor.callWhenRunning(print_listening)
    reactor.run()
python_reader.py 文件源码 项目:pyrepl 作者: dajose 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def twistedinteract(self):
        from twisted.internet import reactor
        from twisted.internet.abstract import FileDescriptor
        import signal
        outerself = self
        class Me(FileDescriptor):
            def fileno(self):
                """ We want to select on FD 0 """
                return 0

            def doRead(self):
                """called when input is ready"""
                try:
                    outerself.handle1()
                except EOFError:
                    reactor.stop()

        reactor.addReader(Me())
        reactor.callWhenRunning(signal.signal,
                                signal.SIGINT,
                                signal.default_int_handler)
        self.prepare()
        try:
            reactor.run()
        finally:
            self.restore()
api_server.py 文件源码 项目:packet-queue 作者: google 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def configure():
  params, pipes, args = command.configure(rest_server=True)
  port = args.rest_api_port

  reactor.listenTCP(port, create_site(params, pipes))
  @reactor.callWhenRunning
  def startup_message():
    print 'Packet Queue is running. Configure at http://localhost:%i' % port
    sys.stdout.flush()
custom_client.py 文件源码 项目:krafters 作者: GianlucaBortoli 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def run(self):
        def append_from_queue():
            while self.spin:
                if len(self.queue) > 0:
                    reactor.listenUDP(0, ClientProtocol(self.ip, self.port, self.queue.pop()))
            reactor.stop()

        reactor.callWhenRunning(append_from_queue)
        reactor.run(installSignalHandlers=False)
_zmq.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def startService(self):
        zf = ZmqFactory()
        e = ZmqEndpoint(ZmqEndpointType.bind, ENDPOINT)

        self._conn = _DispatcherREPConnection(zf, e, self._core)
        reactor.callWhenRunning(self._conn.do_greet)
        service.Service.startService(self)
bitmask_cli.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def main():

    def signal_handler(signal, frame):
        if reactor.running:
            reactor.stop()
        sys.exit(0)

    reactor.callWhenRunning(reactor.callLater, 0, execute)
    signal.signal(signal.SIGINT, signal_handler)
    reactor.run()
monast.py 文件源码 项目:monast 作者: dagmoller 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def doAction(self, request):
        session = request.getSession()
        self.monast.clientActions.append((session, request.args))
        reactor.callWhenRunning(self.monast._processClientActions)
        request.write("OK")
        request.finish()

##
## Monast AMI
##
monast.py 文件源码 项目:monast 作者: dagmoller 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __start(self):
        log.info("Starting Monast Services...")
        for servername in self.servers:
            reactor.callWhenRunning(self.connect, servername)
test_gpg.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_gpg_verify_with_non_zero_gpg_exit_code(self):
        """
        L{gpg_verify} runs the given gpg binary and returns C{False} if the
        provided signature is not valid.
        """
        gpg = self.makeFile("#!/bin/sh\necho out; echo err >&2; exit 1\n")
        os.chmod(gpg, 0o755)
        gpg_home = self.makeDir()
        deferred = Deferred()

        @mock.patch("tempfile.mkdtemp")
        def do_test(mkdtemp_mock):
            mkdtemp_mock.return_value = gpg_home
            result = gpg_verify("/some/file", "/some/signature", gpg=gpg)

            def check_failure(failure):
                self.assertEqual(str(failure.value),
                                 "%s failed (out='out\n', err='err\n', "
                                 "code='1')" % gpg)
                self.assertFalse(os.path.exists(gpg_home))

            result.addCallback(self.fail)
            result.addErrback(check_failure)
            result.chainDeferred(deferred)

        reactor.callWhenRunning(do_test)
        return deferred
test_releaseupgrader.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_upgrade_with_failure(self):
        """
        The L{ReleaseUpgrader.upgrade} sends a message with failed status
        field if the upgrade-tool exits with non-zero code.
        """
        self.upgrader.logs_directory = self.makeDir()
        upgrade_tool_directory = self.config.upgrade_tool_directory
        upgrade_tool_filename = os.path.join(upgrade_tool_directory, "karmic")
        fd = open(upgrade_tool_filename, "w")
        fd.write("#!/bin/sh\n"
                 "echo out\n"
                 "echo err >&2\n"
                 "exit 3")
        fd.close()
        os.chmod(upgrade_tool_filename, 0o755)

        deferred = Deferred()

        def do_test():

            result = self.upgrader.upgrade("karmic", 100)

            def check_result(ignored):
                result_text = (u"=== Standard output ===\n\nout\n\n\n"
                               "=== Standard error ===\n\nerr\n\n\n")
                self.assertMessages(self.get_pending_messages(),
                                    [{"type": "operation-result",
                                      "operation-id": 100,
                                      "status": FAILED,
                                      "result-text": result_text,
                                      "result-code": 3}])

            result.addCallback(check_result)
            result.chainDeferred(deferred)

        reactor.callWhenRunning(do_test)
        return deferred
test_reporter.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_run_apt_update(self, warning_mock):
        """
        The L{PackageReporter.run_apt_update} method should run apt-update.
        """
        self.reporter.sources_list_filename = "/I/Dont/Exist"
        self.reporter.sources_list_directory = "/I/Dont/Exist"
        self._make_fake_apt_update()
        debug_patcher = mock.patch.object(reporter.logging, "debug")
        debug_mock = debug_patcher.start()
        self.addCleanup(debug_patcher.stop)

        deferred = Deferred()

        def do_test():
            result = self.reporter.run_apt_update()

            def callback(args):
                out, err, code = args
                self.assertEqual("output", out)
                self.assertEqual("error", err)
                self.assertEqual(0, code)
                self.assertFalse(warning_mock.called)
                debug_mock.assert_has_calls([
                    mock.call(
                        "Checking if ubuntu-release-upgrader is running."),
                    mock.call(
                        "'%s' exited with status 0 (out='output', err='error')"
                        % self.reporter.apt_update_filename)
                ])
            result.addCallback(callback)
            self.reactor.advance(0)
            result.chainDeferred(deferred)

        reactor.callWhenRunning(do_test)
        return deferred
test_reporter.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_run_apt_update_report_no_sources(self):
        """
        L{PackageReporter.run_apt_update} reports a failure if apt succeeds but
        there are no APT sources defined. APT doesn't fail if there are no
        sources, but we fake a failure in order to re-use the
        PackageReporterAlert on the server.
        """
        self.facade.reset_channels()
        message_store = self.broker_service.message_store
        message_store.set_accepted_types(["package-reporter-result"])
        self._make_fake_apt_update()
        deferred = Deferred()

        def do_test():
            result = self.reporter.run_apt_update()

            def callback(ignore):
                error = "There are no APT sources configured in %s or %s." % (
                    self.reporter.sources_list_filename,
                    self.reporter.sources_list_directory)
                self.assertMessages(
                    message_store.get_pending_messages(),
                    [{"type": "package-reporter-result",
                      "report-timestamp": 0.0, "code": 1, "err": error}])
            result.addCallback(callback)
            self.reactor.advance(0)
            result.chainDeferred(deferred)

        reactor.callWhenRunning(do_test)
        return deferred
test_reporter.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_run_apt_update_no_run_in_interval(self):
        """
        The L{PackageReporter.run_apt_update} logs a debug message if
        apt-update doesn't run because interval has not passed.
        """
        self.reporter._apt_sources_have_changed = lambda: False
        self.makeFile("", path=self.config.update_stamp_filename)

        debug_patcher = mock.patch.object(reporter.logging, "debug")
        debug_mock = debug_patcher.start()
        self.addCleanup(debug_patcher.stop)

        deferred = Deferred()

        def do_test():
            result = self.reporter.run_apt_update()

            def callback(args):
                out, err, code = args
                self.assertEqual("", out)
                self.assertEqual("", err)
                self.assertEqual(0, code)
                debug_mock.assert_called_once_with(
                    ("'%s' didn't run, conditions not met"
                     ) % self.reporter.apt_update_filename)
            result.addCallback(callback)
            self.reactor.advance(0)
            result.chainDeferred(deferred)

        reactor.callWhenRunning(do_test)
        return deferred
test_reporter.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_run_apt_update_no_run_update_notifier_stamp_in_interval(self):
        """
        The L{PackageReporter.run_apt_update} doesn't runs apt-update if the
        interval is passed but the stamp file from update-notifier-common
        reports that 'apt-get update' has been run in the interval.
        """
        self.reporter._apt_sources_have_changed = lambda: False

        # The interval for the apt-update stamp file is expired.
        self.makeFile("", path=self.config.update_stamp_filename)
        expired_time = time.time() - self.config.apt_update_interval - 1
        os.utime(
            self.config.update_stamp_filename, (expired_time, expired_time))
        # The interval for the update-notifier-common stamp file is not
        # expired.
        self.reporter.update_notifier_stamp = self.makeFile("")

        debug_patcher = mock.patch.object(reporter.logging, "debug")
        debug_mock = debug_patcher.start()
        self.addCleanup(debug_patcher.stop)

        deferred = Deferred()

        def do_test():
            result = self.reporter.run_apt_update()

            def callback(args):
                out, err, code = args
                self.assertEqual("", out)
                self.assertEqual("", err)
                self.assertEqual(0, code)
                debug_mock.assert_called_once_with(
                    ("'%s' didn't run, conditions not met"
                     ) % self.reporter.apt_update_filename)
            result.addCallback(callback)
            self.reactor.advance(0)
            result.chainDeferred(deferred)

        reactor.callWhenRunning(do_test)
        return deferred
test_reporter.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_run_apt_update_runs_interval_expired(self):
        """
        L{PackageReporter.run_apt_update} runs if both apt-update and
        update-notifier-common stamp files are present and the time
        interval has passed.
        """
        self.reporter._apt_sources_have_changed = lambda: False

        expired_time = time.time() - self.config.apt_update_interval - 1
        # The interval for both stamp files is expired.
        self.makeFile("", path=self.config.update_stamp_filename)
        os.utime(
            self.config.update_stamp_filename, (expired_time, expired_time))
        self.reporter.update_notifier_stamp = self.makeFile("")
        os.utime(
            self.reporter.update_notifier_stamp, (expired_time, expired_time))

        message_store = self.broker_service.message_store
        message_store.set_accepted_types(["package-reporter-result"])
        self._make_fake_apt_update(err="message")
        deferred = Deferred()

        def do_test():
            result = self.reporter.run_apt_update()

            def callback(ignore):
                self.assertMessages(
                    message_store.get_pending_messages(),
                    [{"type": "package-reporter-result",
                      "report-timestamp": 0.0, "code": 0, "err": u"message"}])
            result.addCallback(callback)
            self.reactor.advance(0)
            result.chainDeferred(deferred)

        reactor.callWhenRunning(do_test)
        return deferred
test_reporter.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_run_apt_update_error_no_cache_files(self):
        """
        L{PackageReporter.run_apt_update} succeeds if the command fails because
        cache files are not found.
        """
        message_store = self.broker_service.message_store
        message_store.set_accepted_types(["package-reporter-result"])
        self._make_fake_apt_update(
            code=2, out="not important",
            err=("E: Problem renaming the file "
                 "/var/cache/apt/srcpkgcache.bin.Pw1Zxy to "
                 "/var/cache/apt/srcpkgcache.bin - rename (2: No such file "
                 "or directory)\n"
                 "E: Problem renaming the file "
                 "/var/cache/apt/pkgcache.bin.wz8ooS to "
                 "/var/cache/apt/pkgcache.bin - rename (2: No such file "
                 "or directory)\n"
                 "E: The package lists or status file could not be parsed "
                 "or opened."))

        deferred = Deferred()

        def do_test():
            result = self.reporter.run_apt_update()

            def callback(ignore):
                self.assertMessages(
                    message_store.get_pending_messages(),
                    [{"type": "package-reporter-result",
                      "report-timestamp": 0.0, "code": 0, "err": u""}])
            result.addCallback(callback)
            self.reactor.advance(0)
            result.chainDeferred(deferred)

        reactor.callWhenRunning(do_test)
        return deferred


问题


面经


文章

微信
公众号

扫码关注公众号