python类callWhenRunning()的实例源码

test_reporter.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_config_apt_update_interval(self):
        """
        L{PackageReporter} uses the C{apt_update_interval} configuration
        parameter to check the age of the update stamp file.
        """
        self.config.apt_update_interval = 1234
        message_store = self.broker_service.message_store
        message_store.set_accepted_types(["package-reporter-result"])
        intervals = []

        def apt_update_timeout_expired(interval):
            intervals.append(interval)
            return False

        deferred = Deferred()

        self.reporter._apt_sources_have_changed = lambda: False
        self.reporter._apt_update_timeout_expired = apt_update_timeout_expired

        def do_test():
            result = self.reporter.run_apt_update()

            def callback(ignore):
                self.assertMessages(message_store.get_pending_messages(), [])
                self.assertEqual([1234], intervals)
            result.addCallback(callback)
            result.chainDeferred(deferred)

        reactor.callWhenRunning(do_test)
        return deferred
test_reporter.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_store_messages(self):
        """
        L{FakeGlobalReporter} stores messages which are sent.
        """
        message_store = self.broker_service.message_store
        message_store.set_accepted_types(["package-reporter-result"])
        self.reporter.apt_update_filename = self.makeFile(
            "#!/bin/sh\necho -n error >&2\necho -n output\nexit 0")
        os.chmod(self.reporter.apt_update_filename, 0o755)
        deferred = Deferred()

        def do_test():
            self.reporter.get_session_id()
            result = self.reporter.run_apt_update()
            self.reactor.advance(0)

            def callback(ignore):
                message = {"type": "package-reporter-result",
                           "report-timestamp": 0.0, "code": 0, "err": u"error"}
                self.assertMessages(
                    message_store.get_pending_messages(), [message])
                stored = list(self.store._db.execute(
                    "SELECT id, data FROM message").fetchall())
                self.assertEqual(1, len(stored))
                self.assertEqual(1, stored[0][0])
                self.assertEqual(message, bpickle.loads(bytes(stored[0][1])))
            result.addCallback(callback)
            result.chainDeferred(deferred)

        reactor.callWhenRunning(do_test)
        return deferred
application.py 文件源码 项目:fluiddb 作者: fluidinfo 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def setupFacade(config):
    """Get the L{Facade} instance to use in the API service."""
    from fluiddb.api.facade import Facade
    from fluiddb.util.transact import Transact

    maxThreads = int(config.get('service', 'max-threads'))
    threadpool = ThreadPool(minthreads=0, maxthreads=maxThreads)
    reactor.callWhenRunning(threadpool.start)
    reactor.addSystemEventTrigger('during', 'shutdown', threadpool.stop)
    transact = Transact(threadpool)
    factory = FluidinfoSessionFactory('API-%s' % config.get('service', 'port'))
    return Facade(transact, factory)
resources.py 文件源码 项目:fluiddb 作者: fluidinfo 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def make(self, dependency_resources):
        """Create and start a new thread pool."""
        from twisted.internet import reactor

        global _threadPool
        if _threadPool is None:
            _threadPool = ThreadPool(minthreads=1, maxthreads=1)
            reactor.callWhenRunning(_threadPool.start)
            reactor.addSystemEventTrigger('during', 'shutdown',
                                          _threadPool.stop)
        return _threadPool
mockserver.py 文件源码 项目:undercrawler 作者: TeamHG-Memex 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('resource')
    args = parser.parse_args()
    module_name, name = args.resource.rsplit('.', 1)
    sys.path.append('.')
    resource = getattr(import_module(module_name), name)()
    http_port = reactor.listenTCP(PORT, Site(resource))
    def print_listening():
        host = http_port.getHost()
        print('Mock server {} running at http://{}:{}'.format(
            resource, host.host, host.port))
    reactor.callWhenRunning(print_listening)
    reactor.run()
ipv8_plugin.py 文件源码 项目:py-ipv8 作者: qstokkink 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def __init__(self, configuration):
        self.endpoint = UDPEndpoint(configuration['port'])
        self.endpoint.open()

        self.network = Network()

        # Load/generate keys
        self.keys = {}
        for key_block in configuration['keys']:
            if key_block['file'] and isfile(key_block['file']):
                with open(key_block['file'], 'r') as f:
                    self.keys[key_block['alias']] = Peer(ECCrypto().key_from_private_bin(f.read()))
            else:
                self.keys[key_block['alias']] = Peer(ECCrypto().generate_key(key_block['generation']))
                if key_block['file']:
                    with open(key_block['file'], 'w') as f:
                        f.write(self.keys[key_block['alias']].key.key_to_bin())

        # Setup logging
        logging.basicConfig(**configuration['logger'])

        self.strategies = []
        self.overlays = []

        for overlay in configuration['overlays']:
            overlay_class = _COMMUNITIES[overlay['class']]
            my_peer = self.keys[overlay['key']]
            overlay_instance = overlay_class(my_peer, self.endpoint, self.network, **overlay['initialize'])
            self.overlays.append(overlay_instance)
            for walker in overlay['walkers']:
                strategy_class = _WALKERS[walker['strategy']]
                args = walker['init']
                target_peers = walker['peers']
                self.strategies.append((strategy_class(overlay_instance, **args), target_peers))
            for config in overlay['on_start']:
                reactor.callWhenRunning(getattr(overlay_instance, config[0]), *config[1:])

        self.state_machine_lc = LoopingCall(self.on_tick).start(configuration['walker_interval'], False)
ipv8_plugin.py 文件源码 项目:py-ipv8 作者: qstokkink 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def makeService(self, options):
        """
        Construct a IPv8 service.
        """
        ipv8_service = MultiService()
        ipv8_service.setName("IPv8")

        reactor.callWhenRunning(self.start_ipv8, options)

        return ipv8_service
usage.py 文件源码 项目:ccs-twistedextensions 作者: apple 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def runReactor(self):
        from twisted.internet import reactor
        reactor.callWhenRunning(self.whenRunning)
        self.log.info("Starting reactor...")
        reactor.run()
python_reader.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def twistedinteract(self):
        from twisted.internet import reactor
        from twisted.internet.abstract import FileDescriptor
        import signal
        outerself = self
        class Me(FileDescriptor):
            def fileno(self):
                """ We want to select on FD 0 """
                return 0

            def doRead(self):
                """called when input is ready"""
                try:
                    outerself.handle1()
                except EOFError:
                    reactor.stop()

        reactor.addReader(Me())
        reactor.callWhenRunning(signal.signal,
                                signal.SIGINT,
                                signal.default_int_handler)
        self.prepare()
        try:
            reactor.run()
        finally:
            self.restore()
mockserver.py 文件源码 项目:autologin-middleware 作者: TeamHG-Memex 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('resource')
    args = parser.parse_args()
    module_name, name = args.resource.rsplit('.', 1)
    sys.path.append('.')
    resource = getattr(import_module(module_name), name)()
    http_port = reactor.listenTCP(PORT, Site(resource))
    def print_listening():
        host = http_port.getHost()
        print('Mock server {} running at http://{}:{}'.format(
            resource, host.host, host.port))
    reactor.callWhenRunning(print_listening)
    reactor.run()
daemon.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def setup_worker(self):
        "Sets up a worker process"
        # NOTE: This is locally imported because it will in turn import
        # twistedsnmp. Twistedsnmp is stupid enough to call
        # logging.basicConfig().  If imported before our own loginit, this
        # causes us to have two StreamHandlers on the root logger, duplicating
        # every log statement.
        self._logger.info("Starting worker process")
        plugins.import_plugins()

        def init():
            handler = pool.initialize_worker()
            self.job_loggers.append(handler.log_jobs)

        reactor.callWhenRunning(init)
daemon.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def setup_single_job(self):
        "Sets up a single job run with exit when done"
        from .jobs import JobHandler
        from . import config

        def _run_job():
            descriptors = dict((d.name, d) for d in config.get_jobs())
            job = descriptors[self.options.onlyjob]
            self._log_context = dict(job=job.name,
                                     sysname=self.options.netbox.sysname)
            job_handler = JobHandler(job.name, self.options.netbox.id,
                                     plugins=job.plugins,
                                     interval=job.interval)
            deferred = maybeDeferred(job_handler.run)
            deferred.addBoth(_log_job, job_handler, interval=job.interval)
            deferred.addBoth(lambda x: reactor.stop())

        def _log_job(result, handler, interval):
            success = not isinstance(result, Failure)
            schedule.log_job_externally(handler, success if result else None,
                                        interval)

        plugins.import_plugins()
        self._logger.info("Running single %r job for %s",
                          self.options.onlyjob, self.options.netbox)
        reactor.callWhenRunning(_run_job)
dashd_impl.py 文件源码 项目:voltha 作者: opencord 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def main():
    logging.basicConfig(
        format='%(asctime)s:%(name)s:' +
               '%(levelname)s:%(process)d:%(message)s',
        level=logging.INFO
    )

    args = parse_options()

    dashd = DashDaemon(args.consul, args.kafka, args.grafana_url, args.topic)
    reactor.callWhenRunning(dashd.start)
    reactor.run()
    log.info("completed!")
main.py 文件源码 项目:voltha 作者: opencord 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def start_reactor(self):
        reactor.callWhenRunning(
            lambda: self.log.info('twisted-reactor-started'))

        reactor.addSystemEventTrigger('before', 'shutdown',
                                      self.shutdown_components)
        reactor.run()
kafka-consumer.py 文件源码 项目:voltha 作者: opencord 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def main():
    logging.basicConfig(
        format='%(asctime)s:%(name)s:' +
               '%(levelname)s:%(process)d:%(message)s',
        level=logging.INFO
    )

    args = parse_options()

    consumer_example = ConsumerExample(args.consul, args.topic,
                                       int(args.runtime))
    reactor.callWhenRunning(consumer_example.start)
    reactor.run()
    log.info("completed!")
main.py 文件源码 项目:voltha 作者: opencord 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def start_reactor(self):
        reactor.callWhenRunning(
            lambda: self.log.info('twisted-reactor-started'))

        reactor.addSystemEventTrigger('before', 'shutdown',
                                      self.shutdown_components)
        reactor.run()
main.py 文件源码 项目:voltha 作者: opencord 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def start_reactor(self):
        reactor.callWhenRunning(
            lambda: self.log.info('twisted-reactor-started'))

        reactor.addSystemEventTrigger('before', 'shutdown',
                                      self.shutdown_components)
        reactor.run()


问题


面经


文章

微信
公众号

扫码关注公众号