python类Deferred()的实例源码

base.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def fireSystemEvent(self, eventType):
        """See twisted.internet.interfaces.IReactorCore.fireSystemEvent.
        """
        sysEvtTriggers = self._eventTriggers.get(eventType)
        if not sysEvtTriggers:
            return
        defrList = []
        for callable, args, kw in sysEvtTriggers[0]:
            try:
                d = callable(*args, **kw)
            except:
                log.deferr()
            else:
                if isinstance(d, Deferred):
                    defrList.append(d)
        if defrList:
            DeferredList(defrList).addBoth(self._cbContinueSystemEvent, eventType)
        else:
            self.callLater(0, self._continueSystemEvent, eventType)
utils.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def runWithWarningsSuppressed(suppressedWarnings, f, *a, **kw):
    """Run the function C{f}, but with some warnings suppressed.

    @param suppressedWarnings: A list of arguments to pass to filterwarnings.
                               Must be a sequence of 2-tuples (args, kwargs).
    @param f: A callable, followed by its arguments and keyword arguments
    """
    for args, kwargs in suppressedWarnings:
        warnings.filterwarnings(*args, **kwargs)
    addedFilters = warnings.filters[:len(suppressedWarnings)]
    try:
        result = f(*a, **kw)
    except:
        exc_info = sys.exc_info()
        _resetWarningFilters(None, addedFilters)
        raise exc_info[0], exc_info[1], exc_info[2]
    else:
        if isinstance(result, defer.Deferred):
            result.addBoth(_resetWarningFilters, addedFilters)
        else:
            _resetWarningFilters(None, addedFilters)
        return result
test_util.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_stdin(self):
        """
        Making sure getPassword accepts a password from standard input by
        running a child process which uses getPassword to read in a string
        which it then writes it out again.  Write a string to the child
        process and then read one and make sure it is the right string.
        """
        p = PasswordTestingProcessProtocol()
        p.finished = Deferred()
        reactor.spawnProcess(
            p,
            sys.executable,
            [sys.executable,
             '-c',
             ('import sys\n'
             'from twisted.python.util import getPassword\n'
              'sys.stdout.write(getPassword())\n'
              'sys.stdout.flush()\n')],
            env={'PYTHONPATH': os.pathsep.join(sys.path)})

        def processFinished((reason, output)):
            reason.trap(ProcessDone)
            self.assertEquals(output, [(1, 'secret')])
test_pb.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def testDeadReferenceError(self):
        """
        Test that when a connection is lost, calling a method on a
        RemoteReference obtained from it raises DeadReferenceError.
        """
        factory, rootObjDeferred = self.getFactoryAndRootObject()

        def gotRootObject(rootObj):
            disconnectedDeferred = defer.Deferred()
            rootObj.notifyOnDisconnect(disconnectedDeferred.callback)

            def lostConnection(ign):
                self.assertRaises(
                    pb.DeadReferenceError,
                    rootObj.callRemote, 'method')

            disconnectedDeferred.addCallback(lostConnection)
            factory.disconnect()
            return disconnectedDeferred

        return rootObjDeferred.addCallback(gotRootObject)
test_task.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def testWaitDeferred(self):
        # Tests if the callable isn't scheduled again before the returned
        # deferred has fired.
        timings = [0.2, 0.8]
        clock = task.Clock()

        def foo():
            d = defer.Deferred()
            d.addCallback(lambda _: lc.stop())
            clock.callLater(1, d.callback, None)
            return d

        lc = TestableLoopingCall(clock, foo)
        d = lc.start(0.2)
        clock.pump(timings)
        self.failIf(clock.calls)
test_task.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def testFailurePropagation(self):
        # Tests if the failure of the errback of the deferred returned by the
        # callable is propagated to the lc errback.
        #
        # To make sure this test does not hang trial when LoopingCall does not
        # wait for the callable's deferred, it also checks there are no
        # calls in the clock's callLater queue.
        timings = [0.3]
        clock = task.Clock()

        def foo():
            d = defer.Deferred()
            clock.callLater(0.3, d.errback, TestException())
            return d

        lc = TestableLoopingCall(clock, foo)
        d = lc.start(1)
        self.assertFailure(d, TestException)

        clock.pump(timings)
        self.failIf(clock.calls)
        return d
json_rpc.py 文件源码 项目:xr-telemetry-m2m-web 作者: cisco 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _request(self, method, **params):
        """
        Send a request over whatever transport is provided
        """
        this_id = self.next_id
        self.next_id += 1
        request = r'{{"jsonrpc": "2.0", "id": {}, "method": "{}", "params": {}}}' \
                    .format(this_id, method, json.dumps(params))
        print('### SENDING REQUEST {}'.format(request))
        self.sdata.add_to_push_queue('request', text=request)
        self.sdata.transport_tx_cb(request)
        d = defer.Deferred()
        self.pending_reply_map[this_id] = d
        return d
collector.py 文件源码 项目:PGO-mapscan-opt 作者: seikur0 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def shutdown(seconds, result=None):
    if not isinstance(seconds, numbers.Number):
        log.err(seconds)
        seconds = 1
    d = Deferred()
    d.addCallback(stop_reaktor)
    reactor.callLater(seconds, d.callback, result)
    return d
collector.py 文件源码 项目:PGO-mapscan-opt 作者: seikur0 项目源码 文件源码 阅读 63 收藏 0 点赞 0 评论 0
def wait(seconds, result=None):
    """Returns a deferred that will be fired later"""
    d = Deferred()
    reactor.callLater(seconds, d.callback, result)
    return d
sse_protocol.py 文件源码 项目:marathon-acme 作者: praekeltfoundation 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def when_finished(self):
        """
        Get a deferred that will be fired when the connection is closed.
        """
        d = Deferred()
        self._waiting.append(d)
        return d
http_helpers.py 文件源码 项目:farfetchd 作者: isislovecruft 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def notifyFinish(self):
        """
        Return a L{Deferred} which is called back with C{None} when the request
        is finished.  This will probably only work if you haven't called
        C{finish} yet.
        """
        finished = Deferred()
        self._finishedDeferreds.append(finished)
        return finished
http_helpers.py 文件源码 项目:farfetchd 作者: isislovecruft 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def finish(self):
        """
        Record that the request is finished and callback and L{Deferred}s
        waiting for notification of this.
        """
        self.finished = self.finished + 1
        if self._finishedDeferreds is not None:
            observers = self._finishedDeferreds
            self._finishedDeferreds = None
            for obs in observers:
                obs.callback(None)
recipe-473865.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, url, method, *args):
        self.url, self.host = url.path, url.host
        self.user, self.password = url.user, url.password
        self.payload = xmlrpc.payloadTemplate % (
            method, xmlrpclib.dumps(args))
        self.deferred = defer.Deferred()
recipe-457670.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, collector, string, firstPage):
        self._atFirstPage = True
        self._firstPage = firstPage
        self._deferred = Deferred()
        StringPager.__init__(self, collector, string, callback=self.done)
recipe-576735.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, collector, path):
        self._deferred = Deferred()
        print "%s, %d bytes" % (path, os.path.getsize(path))
        fd = file(path, 'rb')
        FilePager.__init__(self, collector, fd, callback=self.done)
twisted_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def twisted_fetch(self, url, runner):
        # http://twistedmatrix.com/documents/current/web/howto/client.html
        chunks = []
        client = Agent(self.reactor)
        d = client.request(b'GET', utf8(url))

        class Accumulator(Protocol):
            def __init__(self, finished):
                self.finished = finished

            def dataReceived(self, data):
                chunks.append(data)

            def connectionLost(self, reason):
                self.finished.callback(None)

        def callback(response):
            finished = Deferred()
            response.deliverBody(Accumulator(finished))
            return finished
        d.addCallback(callback)

        def shutdown(failure):
            if hasattr(self, 'stop_loop'):
                self.stop_loop()
            elif failure is not None:
                # loop hasn't been initialized yet; try our best to
                # get an error message out. (the runner() interaction
                # should probably be refactored).
                try:
                    failure.raiseException()
                except:
                    logging.error('exception before starting loop', exc_info=True)
        d.addBoth(shutdown)
        runner()
        self.assertTrue(chunks)
        return ''.join(chunks)
twisted_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def twisted_fetch(self, url, runner):
        # http://twistedmatrix.com/documents/current/web/howto/client.html
        chunks = []
        client = Agent(self.reactor)
        d = client.request(b'GET', utf8(url))

        class Accumulator(Protocol):
            def __init__(self, finished):
                self.finished = finished

            def dataReceived(self, data):
                chunks.append(data)

            def connectionLost(self, reason):
                self.finished.callback(None)

        def callback(response):
            finished = Deferred()
            response.deliverBody(Accumulator(finished))
            return finished
        d.addCallback(callback)

        def shutdown(failure):
            if hasattr(self, 'stop_loop'):
                self.stop_loop()
            elif failure is not None:
                # loop hasn't been initialized yet; try our best to
                # get an error message out. (the runner() interaction
                # should probably be refactored).
                try:
                    failure.raiseException()
                except:
                    logging.error('exception before starting loop', exc_info=True)
        d.addBoth(shutdown)
        runner()
        self.assertTrue(chunks)
        return ''.join(chunks)
twisted_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def twisted_fetch(self, url, runner):
        # http://twistedmatrix.com/documents/current/web/howto/client.html
        chunks = []
        client = Agent(self.reactor)
        d = client.request(b'GET', utf8(url))

        class Accumulator(Protocol):
            def __init__(self, finished):
                self.finished = finished

            def dataReceived(self, data):
                chunks.append(data)

            def connectionLost(self, reason):
                self.finished.callback(None)

        def callback(response):
            finished = Deferred()
            response.deliverBody(Accumulator(finished))
            return finished
        d.addCallback(callback)

        def shutdown(failure):
            if hasattr(self, 'stop_loop'):
                self.stop_loop()
            elif failure is not None:
                # loop hasn't been initialized yet; try our best to
                # get an error message out. (the runner() interaction
                # should probably be refactored).
                try:
                    failure.raiseException()
                except:
                    logging.error('exception before starting loop', exc_info=True)
        d.addBoth(shutdown)
        runner()
        self.assertTrue(chunks)
        return ''.join(chunks)
wamp_client.py 文件源码 项目:django-wamp-client 作者: fcurella 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def forward_procedure(self, func_path, uri, options=None):
        @inlineCallbacks
        def wrapped(*args, **kwargs):
            reply_channel_name = self.channel_layer.new_channel('{}?'.format(uri))
            payload = {
                'func_path': func_path,
                'uri': uri,
                'args': args,
                'kwargs': kwargs,
                'reply_channel': reply_channel_name,
            }
            channel = Channel('wamp.events')
            channel.send(payload)

            d = Deferred()

            def cleanup(result):
                self.channels.remove(reply_channel_name)
                del self.reply_channels[reply_channel_name]
                self.log.info('result: {}'.format(result['total']))
            d.addCallback(cleanup)
            self.channels.add(reply_channel_name)
            self.reply_channels[reply_channel_name] = d

            yield d
        self.log.info("registered procedure for '{}'".format(uri))
        if options is None:
            options = types.RegisterOptions()
        return self.register(wrapped, uri, options=options)
test_twisted.py 文件源码 项目:Tinychat-Bot--Discontinued 作者: Tinychat 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_preprocessor(self):
        d = defer.Deferred()

        def pp(sr):
            self.assertIdentical(sr, self.service_request)
            d.callback(None)

        gw = twisted.TwistedGateway({'echo': lambda x: x}, preprocessor=pp)
        self.service_request = gateway.ServiceRequest(
            None, gw.services['echo'], None
        )

        gw.preprocessRequest(self.service_request)

        return d


问题


面经


文章

微信
公众号

扫码关注公众号