python类callLater()的实例源码

test_twisted.py 文件源码 项目:Tinychat-Bot--Discontinued 作者: Tinychat 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_deferred_service(self):
        def echo(data):
            x = defer.Deferred()
            reactor.callLater(0, x.callback, data)

            return x

        self.gw.addService(echo)
        d = self.doRequest('echo', 'hello')

        def cb(response):
            self.assertEqual(response.amfVersion, pyamf.AMF3)

            self.assertTrue('/1' in response)
            body_response = response['/1']

            self.assertEqual(body_response.status, remoting.STATUS_OK)
            self.assertEqual(body_response.body, 'hello')

        return d.addCallback(cb)
test_twisted.py 文件源码 项目:Tinychat-Bot--Discontinued 作者: Tinychat 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_deferred_auth(self):
        d = defer.Deferred()

        def auth(u, p):
            return reactor.callLater(0, lambda: True)

        p = self.getProcessor({'echo': lambda x: x}, authenticator=auth)

        request = remoting.Request('echo', envelope=remoting.Envelope())

        def cb(result):
            self.assertTrue(result)
            d.callback(None)

        p(request).addCallback(cb).addErrback(lambda failure: d.errback())

        return d
test_twisted.py 文件源码 项目:Tinychat-Bot--Discontinued 作者: Tinychat 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_exposed_preprocessor(self):
        d = defer.Deferred()

        def preprocessor(http_request, service_request):
            return reactor.callLater(0, lambda: True)

        preprocessor = gateway.expose_request(preprocessor)
        p = self.getProcessor({'echo': lambda x: x}, preprocessor=preprocessor)

        request = remoting.Request('echo', envelope=remoting.Envelope())

        def cb(result):
            self.assertTrue(result)
            d.callback(None)

        p(request).addCallback(cb).addErrback(lambda failure: d.errback())

        return d
msn.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def sendPart(self):
        """ send a segment of data """
        if not self.connected:
            self._pendingSend = None
            return # may be buggy (if handle_CCL/BYE is called but self.connected is still 1)
        data = self.file.read(self.segmentSize)
        if data:
            dataSize = len(data)
            header = self.makeHeader(dataSize)
            self.bytesSent += dataSize
            self.transport.write(header + data)
            self._pendingSend = reactor.callLater(0, self.sendPart)
        else:
            self._pendingSend = None
            self.completed = 1

# mapping of error codes to error messages
tkunzip.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def processOne(self, deferred):
        if self.stopping:
            deferred.callback(self.root)
            return

        try:
            self.remaining=self.iterator.next()
        except StopIteration:
            self.stopping=1            
        except:
            deferred.errback(failure.Failure())

        if self.remaining%10==0:
            reactor.callLater(0, self.updateBar, deferred)
        if self.remaining%100==0:
            log.msg(self.remaining)
        reactor.callLater(0, self.processOne, deferred)
app.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 92 收藏 0 点赞 0 评论 0
def fixPdb():
    def do_stop(self, arg):
        self.clear_all_breaks()
        self.set_continue()
        from twisted.internet import reactor
        reactor.callLater(0, reactor.stop)
        return 1

    def help_stop(self):
        print """stop - Continue execution, then cleanly shutdown the twisted reactor."""

    def set_quit(self):
        os._exit(0)

    pdb.Pdb.set_quit = set_quit
    pdb.Pdb.do_stop = do_stop
    pdb.Pdb.help_stop = help_stop
userauth.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def serviceStarted(self):
        self.authenticatedWith = []
        self.loginAttempts = 0
        self.user = None
        self.nextService = None
        self.portal = self.transport.factory.portal

        self.supportedAuthentications = []
        for i in self.portal.listCredentialsInterfaces():
            if i in self.interfaceToMethod:
                self.supportedAuthentications.append(self.interfaceToMethod[i])

        if not self.transport.isEncrypted('out'):
            if 'password' in self.supportedAuthentications:
                self.supportedAuthentications.remove('password')
            if 'keyboard-interactive' in self.supportedAuthentications:
                self.supportedAuthentications.remove('keyboard-interactive')
            # don't let us transport password in plaintext
        self.cancelLoginTimeout = reactor.callLater(self.loginTimeout, 
                                                    self.timeoutAuthentication)
defer.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def setTimeout(self, seconds, timeoutFunc=timeout, *args, **kw):
        """Set a timeout function to be triggered if I am not called.

        @param seconds: How long to wait (from now) before firing the
        timeoutFunc.

        @param timeoutFunc: will receive the Deferred and *args, **kw as its
        arguments.  The default timeoutFunc will call the errback with a
        L{TimeoutError}.
        """
        warnings.warn(
            "Deferred.setTimeout is deprecated.  Look for timeout "
            "support specific to the API you are using instead.",
            DeprecationWarning, stacklevel=2)

        if self.called:
            return
        assert not self.timeoutCall, "Don't call setTimeout twice on the same Deferred."

        from twisted.internet import reactor
        self.timeoutCall = reactor.callLater(
            seconds,
            lambda: self.called or timeoutFunc(self, *args, **kw))
        return self.timeoutCall
test_cooperator.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def testStopOutstanding(self):
        """
        Test that a running iterator paused on a third-party Deferred will
        properly stop when .stop() is called.
        """
        testControlD = defer.Deferred()
        outstandingD = defer.Deferred()
        def myiter():
            reactor.callLater(0, testControlD.callback, None)
            yield outstandingD
            self.fail()
        c = task.Cooperator()
        d = c.coiterate(myiter())
        def stopAndGo(ign):
            c.stop()
            outstandingD.callback('arglebargle')

        testControlD.addCallback(stopAndGo)
        d.addCallback(self.cbIter)
        d.addErrback(self.ebIter)

        return d.addCallback(lambda result: self.assertEquals(result, self.RESULT))
test_task.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testWaitDeferred(self):
        # Tests if the callable isn't scheduled again before the returned
        # deferred has fired.
        timings = [0.2, 0.8]
        clock = task.Clock()

        def foo():
            d = defer.Deferred()
            d.addCallback(lambda _: lc.stop())
            clock.callLater(1, d.callback, None)
            return d

        lc = TestableLoopingCall(clock, foo)
        d = lc.start(0.2)
        clock.pump(timings)
        self.failIf(clock.calls)
test_task.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def testFailurePropagation(self):
        # Tests if the failure of the errback of the deferred returned by the
        # callable is propagated to the lc errback.
        #
        # To make sure this test does not hang trial when LoopingCall does not
        # wait for the callable's deferred, it also checks there are no
        # calls in the clock's callLater queue.
        timings = [0.3]
        clock = task.Clock()

        def foo():
            d = defer.Deferred()
            clock.callLater(0.3, d.errback, TestException())
            return d

        lc = TestableLoopingCall(clock, foo)
        d = lc.start(1)
        self.assertFailure(d, TestException)

        clock.pump(timings)
        self.failIf(clock.calls)
        return d
test_internet.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_cancelDelayedCall(self):
        """
        Test that when a DelayedCall is cancelled it does not run.
        """
        called = []
        def function():
            called.append(None)
        call = reactor.callLater(0, function)
        call.cancel()

        # Schedule a call in two "iterations" to check to make sure that the
        # above call never ran.
        d = Deferred()
        def check():
            try:
                self.assertEqual(called, [])
            except:
                d.errback()
            else:
                d.callback(None)
        reactor.callLater(0, reactor.callLater, 0, check)
        return d
test_internet.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_cancelCalledDelayedCallSynchronous(self):
        """
        Test that cancelling a DelayedCall in the DelayedCall's function as
        that function is being invoked by the DelayedCall raises the
        appropriate exception.
        """
        d = Deferred()
        def later():
            try:
                self.assertRaises(error.AlreadyCalled, call.cancel)
            except:
                d.errback()
            else:
                d.callback(None)
        call = reactor.callLater(0, later)
        return d
test_internet.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_cancelCalledDelayedCallAsynchronous(self):
        """
        Test that cancelling a DelayedCall after it has run its function
        raises the appropriate exception.
        """
        d = Deferred()
        def check():
            try:
                self.assertRaises(error.AlreadyCalled, call.cancel)
            except:
                d.errback()
            else:
                d.callback(None)
        def later():
            reactor.callLater(0, check)
        call = reactor.callLater(0, later)
        return d
test_internet.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def testCallInNextIteration(self):
        calls = []
        def f1():
            calls.append('f1')
            reactor.callLater(0.0, f2)
        def f2():
            calls.append('f2')
            reactor.callLater(0.0, f3)
        def f3():
            calls.append('f3')

        reactor.callLater(0, f1)
        self.assertEquals(calls, [])
        reactor.iterate()
        self.assertEquals(calls, ['f1'])
        reactor.iterate()
        self.assertEquals(calls, ['f1', 'f2'])
        reactor.iterate()
        self.assertEquals(calls, ['f1', 'f2', 'f3'])
test_internet.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def testCallLaterOrder(self):
        l = []
        l2 = []
        def f(x):
            l.append(x)
        def f2(x):
            l2.append(x)
        def done():
            self.assertEquals(l, range(20))
        def done2():
            self.assertEquals(l2, range(10))

        for n in range(10):
            reactor.callLater(0, f, n)
        for n in range(10):
            reactor.callLater(0, f, n+10)
            reactor.callLater(0.1, f2, n)

        reactor.callLater(0, done)
        reactor.callLater(0.1, done2)
        d = Deferred()
        reactor.callLater(0.2, d.callback, None)
        return d
test_internet.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def testDelayedCallStringification(self):
        # Mostly just make sure str() isn't going to raise anything for
        # DelayedCalls within reason.
        dc = reactor.callLater(0, lambda x, y: None, 'x', y=10)
        str(dc)
        dc.reset(5)
        str(dc)
        dc.cancel()
        str(dc)

        dc = reactor.callLater(0, lambda: None, x=[({'hello': u'world'}, 10j), reactor], *range(10))
        str(dc)
        dc.cancel()
        str(dc)

        def calledBack(ignored):
            str(dc)
        d = Deferred().addCallback(calledBack)
        dc = reactor.callLater(0, d.callback, None)
        str(dc)
        return d
test_internet.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_crash(self):
        """
        reactor.crash should NOT fire shutdown triggers
        """
        events = []
        self.addTrigger(
            "before", "shutdown",
            lambda: events.append(("before", "shutdown")))

        # reactor.crash called from an "after-startup" trigger is too early
        # for the gtkreactor: gtk_mainloop is not yet running. Same is true
        # when called with reactor.callLater(0). Must be >0 seconds in the
        # future to let gtk_mainloop start first.
        reactor.callWhenRunning(
            reactor.callLater, 0, reactor.crash)
        reactor.run()
        self.failIf(events, "reactor.crash invoked shutdown triggers, but it "
                            "isn't supposed to.")

    # XXX Test that reactor.stop() invokes shutdown triggers
test_defer.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def testMaybeDeferred(self):
        S, E = [], []
        d = defer.maybeDeferred((lambda x: x + 5), 10)
        d.addCallbacks(S.append, E.append)
        self.assertEquals(E, [])
        self.assertEquals(S, [15])

        S, E = [], []
        try:
            '10' + 5
        except TypeError, e:
            expected = str(e)
        d = defer.maybeDeferred((lambda x: x + 5), '10')
        d.addCallbacks(S.append, E.append)
        self.assertEquals(S, [])
        self.assertEquals(len(E), 1)
        self.assertEquals(str(E[0].value), expected)

        d = defer.Deferred()
        reactor.callLater(0.2, d.callback, 'Success')
        d.addCallback(self.assertEquals, 'Success')
        d.addCallback(self._testMaybeError)
        return d
ftp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def sendNextCommand(self):
        """(Private) Processes the next command in the queue."""
        ftpCommand = self.popCommandQueue()
        if ftpCommand is None:
            self.nextDeferred = None
            return
        if not ftpCommand.ready:
            self.actionQueue.insert(0, ftpCommand)
            reactor.callLater(1.0, self.sendNextCommand)
            self.nextDeferred = None
            return

        # FIXME: this if block doesn't belong in FTPClientBasic, it belongs in
        #        FTPClient.
        if ftpCommand.text == 'PORT':
            self.generatePortCommand(ftpCommand)

        if self.debug:
            log.msg('<-- %s' % ftpCommand.text)
        self.nextDeferred = ftpCommand.deferred
        self.sendLine(ftpCommand.text)
policies.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def setTimeout(self, period):
        """Change the timeout period

        @type period: C{int} or C{NoneType}
        @param period: The period, in seconds, to change the timeout to, or
        C{None} to disable the timeout.
        """
        prev = self.timeOut
        self.timeOut = period

        if self.__timeoutCall is not None:
            if period is None:
                self.__timeoutCall.cancel()
                self.__timeoutCall = None
            else:
                self.__timeoutCall.reset(period)
        elif period is not None:
            self.__timeoutCall = self.callLater(period, self.__timedOut)

        return prev
client.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def maybeParseConfig(self):
        if self.resolv is None:
            # Don't try to parse it, don't set up a call loop
            return

        try:
            resolvConf = file(self.resolv)
        except IOError, e:
            if e.errno == errno.ENOENT:
                # Missing resolv.conf is treated the same as an empty resolv.conf
                self.parseConfig(())
            else:
                raise
        else:
            mtime = os.fstat(resolvConf.fileno()).st_mtime
            if mtime != self._lastResolvTime:
                log.msg('%s changed, reparsing' % (self.resolv,))
                self._lastResolvTime = mtime
                self.parseConfig(resolvConf)

        # Check again in a little while
        from twisted.internet import reactor
        self._parseCall = reactor.callLater(self._resolvReadInterval, self.maybeParseConfig)
cache.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def cacheResult(self, query, payload):
        if self.verbose > 1:
            log.msg('Adding %r to cache' % query)

        self.cache[query] = (time.time(), payload)

        if self.cancel.has_key(query):
            self.cancel[query].cancel()

        s = list(payload[0]) + list(payload[1]) + list(payload[2])
        m = s[0].ttl
        for r in s:
            m = min(m, r.ttl)

        from twisted.internet import reactor
        self.cancel[query] = reactor.callLater(m, self.clearEntry, query)
imap4.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __cbManualSearch(self, result, tag, mbox, query, uid, searchResults = None):
        if searchResults is None:
            searchResults = []
        i = 0
        for (i, (id, msg)) in zip(range(5), result):
            if self.searchFilter(query, id, msg):
                if uid:
                    searchResults.append(str(msg.getUID()))
                else:
                    searchResults.append(str(id))
        if i == 4:
            from twisted.internet import reactor
            reactor.callLater(0, self.__cbManualSearch, result, tag, mbox, query, uid, searchResults)
        else:
            if searchResults:
                self.sendUntaggedResponse('SEARCH ' + ' '.join(searchResults))
            self.sendPositiveResponse(tag, 'SEARCH completed')
autohelp.py 文件源码 项目:piqueserver 作者: piqueserver 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def apply_script(protocol, connection, config):
    def send_help_nick(connection):
        connection.protocol.send_chat(
            "TO CHANGE YOUR NAME: Start Menu-> "
            "All Programs-> Ace of Spades-> Configuration")
        connection.protocol.irc_say("* Sent nick help to %s" % connection.name)

    def send_help_airstrike(connection):
        connection.protocol.send_chat(
            "TO USE AN AIRSTRIKE: Once you have 15 points, "
            "get a 6 killstreak ->                  "
            "Then type /airstrike G4 if you want the strike to hit G4")
        connection.protocol.irc_say(
            "* Sent airstrike help to %s" % connection.name)

    class AutoHelpConnection(connection):

        def on_chat(self, value, global_message):
            if deuce_howto_match(self, value):
                reactor.callLater(1.0, send_help_nick, self)
            if airstrike_howto_match(self, value):
                reactor.callLater(1.0, send_help_airstrike, self)
            return connection.on_chat(self, value, global_message)
    return protocol, AutoHelpConnection
console.py 文件源码 项目:piqueserver 作者: piqueserver 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_input(self):
            while msvcrt.kbhit():
                c = msvcrt.getwch()
                if c == u'\r':  # new line
                    c = u'\n'
                    stdout.write(c)
                    self.input += c
                    self.protocol.dataReceived(self.input)
                    self.input = ''
                elif c in (u'\xE0', u'\x00'):
                    # ignore special characters
                    msvcrt.getwch()
                elif c == u'\x08':  # delete
                    self.input = self.input[:-1]
                    stdout.write('\x08 \x08')
                else:
                    self.input += c
                    stdout.write(c)
            reactor.callLater(self.interval, self.get_input)
event.py 文件源码 项目:Codado 作者: corydodt 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def run(self):
        """
        Connect to the docker engine and begin listening for docker events
        """
        self.client = docker.from_env()

        now = time.time()
        nowNano = now * 1000000000
        startEvent = Event(status='init',
                id=None,
                time=int(now),
                timeNano=int(nowNano),
                actor=EventActor(image=None, name=None, signal=None, id=None),
                action='init',
                eventFrom=None,
                eventType='dockerish',
                engine=self,
                )
        self.callLater(0, self._callHandlers, 'dockerish.init', startEvent)
        self.callLater(PEEK_INTERVAL_SECONDS, self._genEvents, time.time())
test_gateway.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def getReply(self, line, proto, transport):
        proto.lineReceived(line)

        if line[:4] not in ['HELO', 'MAIL', 'RCPT', 'DATA']:
            return succeed("")

        def check_transport(_):
            reply = transport.value()
            if reply:
                transport.clear()
                return succeed(reply)

            d = Deferred()
            d.addCallback(check_transport)
            reactor.callLater(0, lambda: d.callback(None))
            return d

        return check_transport(None)
linux.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def _wait_and_kill(self, killfun, proc, deferred, tries=0):
        """
        Check if the process is still alive, and call the killfun
        after waiting several times during a timeout period.

        :param tries: counter of tries, used in recursion
        :type tries: int
        """
        if tries < TERMINATE_MAXTRIES:
            if proc.transport.pid is None:
                deferred.callback(True)
                return
            else:
                self.log.debug('Process did not die, waiting...')

            tries += 1
            reactor.callLater(
                TERMINATE_WAIT,
                self._wait_and_kill, killfun, proc, deferred, tries)
            return

        # after running out of patience, we try a killProcess
        d = killfun()
        d.addCallback(lambda _: deferred.callback(True))
        return d
process.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _connect_to_management(self, retries=30):
        if retries == 0:
            self.log.error('Timeout while connecting to management')
            self.failed = True
            return

        def retry(retries):
            ctr = retries - 1
            self.log.warn(
                'Error connecting to management, retrying. '
                'Retries left:  %s' % ctr)
            reactor.callLater(
                0.1, self._connect_to_management, ctr)

        self._d = connectProtocol(
            self._management_endpoint,
            ManagementProtocol(verbose=True))
        self._d.addCallbacks(
            self._got_management_protocol,
            lambda f: retry(retries))


问题


面经


文章

微信
公众号

扫码关注公众号