python类iterate()的实例源码

test_internet.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def testCallInNextIteration(self):
        calls = []
        def f1():
            calls.append('f1')
            reactor.callLater(0.0, f2)
        def f2():
            calls.append('f2')
            reactor.callLater(0.0, f3)
        def f3():
            calls.append('f3')

        reactor.callLater(0, f1)
        self.assertEquals(calls, [])
        reactor.iterate()
        self.assertEquals(calls, ['f1'])
        reactor.iterate()
        self.assertEquals(calls, ['f1', 'f2'])
        reactor.iterate()
        self.assertEquals(calls, ['f1', 'f2', 'f3'])
test_ssl.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def testOpenSSLBuffering(self):
        serverProto = self.serverProto = SingleLineServerProtocol()
        clientProto = self.clientProto = RecordingClientProtocol()

        server = protocol.ServerFactory()
        client = self.client = protocol.ClientFactory()

        server.protocol = lambda: serverProto
        client.protocol = lambda: clientProto
        client.buffer = []

        sCTX = ssl.DefaultOpenSSLContextFactory(certPath, certPath)
        cCTX = ssl.ClientContextFactory()

        port = self.port = reactor.listenSSL(0, server, sCTX, interface='127.0.0.1')
        reactor.connectSSL('127.0.0.1', port.getHost().port, client, cCTX)

        i = 0
        while i < 5000 and not client.buffer:
            i += 1
            reactor.iterate()

        self.assertEquals(client.buffer, ["+OK <some crap>\r\n"])
util.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def do_cleanPending(cls):
        # don't import reactor when module is loaded
        from twisted.internet import reactor

        # flush short-range timers
        reactor.iterate(0)
        reactor.iterate(0)

        pending = reactor.getDelayedCalls()
        if pending:
            s = PENDING_TIMED_CALLS_MSG
            for p in pending:
                s += " %s\n" % (p,)
                if p.active():
                    p.cancel() # delete the rest
                else:
                    print "WEIRNESS! pending timed call not active+!"
            raise PendingTimedCallsError(s)
test_internet.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def testCallInNextIteration(self):
        calls = []
        def f1():
            calls.append('f1')
            reactor.callLater(0.0, f2)
        def f2():
            calls.append('f2')
            reactor.callLater(0.0, f3)
        def f3():
            calls.append('f3')

        reactor.callLater(0, f1)
        self.assertEquals(calls, [])
        reactor.iterate()
        self.assertEquals(calls, ['f1'])
        reactor.iterate()
        self.assertEquals(calls, ['f1', 'f2'])
        reactor.iterate()
        self.assertEquals(calls, ['f1', 'f2', 'f3'])
test_ssl.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def testOpenSSLBuffering(self):
        serverProto = self.serverProto = SingleLineServerProtocol()
        clientProto = self.clientProto = RecordingClientProtocol()

        server = protocol.ServerFactory()
        client = self.client = protocol.ClientFactory()

        server.protocol = lambda: serverProto
        client.protocol = lambda: clientProto
        client.buffer = []

        sCTX = ssl.DefaultOpenSSLContextFactory(certPath, certPath)
        cCTX = ssl.ClientContextFactory()

        port = self.port = reactor.listenSSL(0, server, sCTX, interface='127.0.0.1')
        reactor.connectSSL('127.0.0.1', port.getHost().port, client, cCTX)

        i = 0
        while i < 5000 and not client.buffer:
            i += 1
            reactor.iterate()

        self.assertEquals(client.buffer, ["+OK <some crap>\r\n"])
util.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 124 收藏 0 点赞 0 评论 0
def do_cleanPending(cls):
        # don't import reactor when module is loaded
        from twisted.internet import reactor

        # flush short-range timers
        reactor.iterate(0)
        reactor.iterate(0)

        pending = reactor.getDelayedCalls()
        if pending:
            s = PENDING_TIMED_CALLS_MSG
            for p in pending:
                s += " %s\n" % (p,)
                if p.active():
                    p.cancel() # delete the rest
                else:
                    print "WEIRNESS! pending timed call not active+!"
            raise PendingTimedCallsError(s)
asyncdns.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def resolve(self, names):
        """Resolves DNS names in parallel"""
        self._finished = False
        self.results = defaultdict(list)

        deferred_list = []
        for name in names:
            for deferred in self.lookup(name):
                deferred.addCallback(self._extract_records, name)
                deferred.addErrback(self._errback, name)
                deferred_list.append(deferred)

        deferred_list = defer.DeferredList(deferred_list)
        deferred_list.addCallback(self._parse_result)
        deferred_list.addCallback(self._finish)

        while not self._finished:
            reactor.iterate()
        # Although the results are in at this point, we may need an extra
        # iteration to ensure the resolver library closes its UDP sockets
        reactor.iterate()

        return dict(self.results)
util.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _cleanPending(self):
        """
        Cancel all pending calls and return their string representations.
        """
        reactor = self._getReactor()

        # flush short-range timers
        reactor.iterate(0)
        reactor.iterate(0)

        delayedCallStrings = []
        for p in reactor.getDelayedCalls():
            if p.active():
                delayedString = str(p)
                p.cancel()
            else:
                print("WEIRDNESS! pending timed call not active!")
            delayedCallStrings.append(delayedString)
        return delayedCallStrings
test_cftp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def setUp(self):
        CFTPClientTestBase.setUp(self)

        self.startServer()
        cmds = ('-p %i -l testuser '
               '--known-hosts kh_test '
               '--user-authentications publickey '
               '--host-key-algorithms ssh-rsa '
               '-K direct '
               '-i dsa_test '
               '-a --nocache '
               '-v '
               '127.0.0.1')
        port = self.server.getHost().port
        cmds = test_conch._makeArgs((cmds % port).split(), mod='cftp')
        log.msg('running %s %s' % (sys.executable, cmds))
        self.processProtocol = SFTPTestProcess()

        env = os.environ.copy()
        env['PYTHONPATH'] = os.pathsep.join(sys.path)
        reactor.spawnProcess(self.processProtocol, sys.executable, cmds,
                             env=env)

        timeout = time.time() + 10
        while (not self.processProtocol.buffer) and (time.time() < timeout):
            reactor.iterate(0.1)
        if time.time() > timeout:
            self.skip = "couldn't start process"
        else:
            self.processProtocol.clearBuffer()
test_cftp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _getCmdResult(self, cmd):
        self.processProtocol.clearBuffer()
        self.processProtocol.transport.write(cmd+'\n')
        timeout = time.time() + 10
        while (self.processProtocol.buffer.find('cftp> ') == -1) and (time.time() < timeout):
            reactor.iterate(0.1)
        self.failIf(time.time() > timeout, "timeout")
        if self.processProtocol.buffer.startswith('cftp> '):
            self.processProtocol.buffer = self.processProtocol.buffer[6:]
        return self.processProtocol.buffer[:-6].strip()
threading_latency.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def wait(self):
        start = time.time()
        while time.time() - start < 1:
            reactor.iterate(1.0)
test_internet.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def setUp(self):
        self.finished = 0
        self.counter = 0
        self.timers = {}
        self.deferred = defer.Deferred()
        # ick. Sometimes there are magic timers already running:
        # popsicle.Freezer.tick . Kill off all such timers now so they won't
        # interfere with the test. Of course, this kind of requires that
        # getDelayedCalls already works, so certain failure modes won't be
        # noticed.
        if not hasattr(reactor, "getDelayedCalls"):
            return
        for t in reactor.getDelayedCalls():
            t.cancel()
        reactor.iterate() # flush timers
test_internet.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def testActive(self):
        dcall = reactor.callLater(0, lambda: None)
        self.assertEquals(dcall.active(), 1)
        reactor.iterate()
        self.assertEquals(dcall.active(), 0)
test_internet.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def testScheduling(self):
        c = Counter()
        for i in range(100):
            self.schedule(c.add)
        for i in range(100):
            reactor.iterate()
        self.assertEquals(c.index, 100)
test_internet.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def testCorrectOrder(self):
        o = Order()
        self.schedule(o.a)
        self.schedule(o.b)
        self.schedule(o.c)
        reactor.iterate()
        reactor.iterate()
        reactor.iterate()
        self.assertEquals(o.stage, 3)
test_internet.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def testNotRunAtOnce(self):
        c = Counter()
        self.schedule(c.add)
        # scheduled tasks should not be run at once:
        self.assertEquals(c.index, 0)
        reactor.iterate()
        self.assertEquals(c.index, 1)
test_policies.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def doIterations(self, count=5):
        for i in range(count):
            reactor.iterate()
nevowlore.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def ____wait(d):
    "."
    from twisted.internet import reactor
    from twisted.python import failure
    l = []
    d.addBoth(l.append)
    while not l:
        reactor.iterate()
    if isinstance(l[0], failure.Failure):
        l[0].raiseException()
    return l[0]
erroneous.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def testHiddenException(self):
        """
        What happens if an error is raised in a DelayedCall and an error is
        also raised in the test?

        L{test_reporter.TestErrorReporting.testHiddenException} checks that
        both errors get reported.

        Note that this behaviour is deprecated. A B{real} test would return a
        Deferred that got triggered by the callLater. This would guarantee the
        delayed call error gets reported.
        """
        reactor.callLater(0, self.go)
        reactor.iterate(0.01)
        self.fail("Deliberate failure to mask the hidden exception")
test_cftp.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def setUp(self):
        CFTPClientTestBase.setUp(self)

        self.startServer()
        cmds = ('-p %i -l testuser '
               '--known-hosts kh_test '
               '--user-authentications publickey '
               '--host-key-algorithms ssh-rsa '
               '-K direct '
               '-i dsa_test '
               '-a --nocache '
               '-v '
               '127.0.0.1')
        port = self.server.getHost().port
        cmds = test_conch._makeArgs((cmds % port).split(), mod='cftp')
        log.msg('running %s %s' % (sys.executable, cmds))
        self.processProtocol = SFTPTestProcess()

        env = os.environ.copy()
        env['PYTHONPATH'] = os.pathsep.join(sys.path)
        reactor.spawnProcess(self.processProtocol, sys.executable, cmds,
                             env=env)

        timeout = time.time() + 10
        while (not self.processProtocol.buffer) and (time.time() < timeout):
            reactor.iterate(0.1)
        if time.time() > timeout:
            self.skip = "couldn't start process"
        else:
            self.processProtocol.clearBuffer()
test_cftp.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _getCmdResult(self, cmd):
        self.processProtocol.clearBuffer()
        self.processProtocol.transport.write(cmd+'\n')
        timeout = time.time() + 10
        while (self.processProtocol.buffer.find('cftp> ') == -1) and (time.time() < timeout):
            reactor.iterate(0.1)
        self.failIf(time.time() > timeout, "timeout")
        if self.processProtocol.buffer.startswith('cftp> '):
            self.processProtocol.buffer = self.processProtocol.buffer[6:]
        return self.processProtocol.buffer[:-6].strip()
threading_latency.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def wait(self):
        start = time.time()
        while time.time() - start < 1:
            reactor.iterate(1.0)
test_internet.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def setUp(self):
        self.finished = 0
        self.counter = 0
        self.timers = {}
        self.deferred = defer.Deferred()
        # ick. Sometimes there are magic timers already running:
        # popsicle.Freezer.tick . Kill off all such timers now so they won't
        # interfere with the test. Of course, this kind of requires that
        # getDelayedCalls already works, so certain failure modes won't be
        # noticed.
        if not hasattr(reactor, "getDelayedCalls"):
            return
        for t in reactor.getDelayedCalls():
            t.cancel()
        reactor.iterate() # flush timers
test_internet.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def testActive(self):
        dcall = reactor.callLater(0, lambda: None)
        self.assertEquals(dcall.active(), 1)
        reactor.iterate()
        self.assertEquals(dcall.active(), 0)
test_internet.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testScheduling(self):
        c = Counter()
        for i in range(100):
            self.schedule(c.add)
        for i in range(100):
            reactor.iterate()
        self.assertEquals(c.index, 100)
test_internet.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def testCorrectOrder(self):
        o = Order()
        self.schedule(o.a)
        self.schedule(o.b)
        self.schedule(o.c)
        reactor.iterate()
        reactor.iterate()
        reactor.iterate()
        self.assertEquals(o.stage, 3)
test_internet.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def testNotRunAtOnce(self):
        c = Counter()
        self.schedule(c.add)
        # scheduled tasks should not be run at once:
        self.assertEquals(c.index, 0)
        reactor.iterate()
        self.assertEquals(c.index, 1)
test_policies.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def doIterations(self, count=5):
        for i in range(count):
            reactor.iterate()
nevowlore.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def ____wait(d):
    "."
    from twisted.internet import reactor
    from twisted.python import failure
    l = []
    d.addBoth(l.append)
    while not l:
        reactor.iterate()
    if isinstance(l[0], failure.Failure):
        l[0].raiseException()
    return l[0]
erroneous.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def testHiddenException(self):
        """
        What happens if an error is raised in a DelayedCall and an error is
        also raised in the test?

        L{test_reporter.TestErrorReporting.testHiddenException} checks that
        both errors get reported.

        Note that this behaviour is deprecated. A B{real} test would return a
        Deferred that got triggered by the callLater. This would guarantee the
        delayed call error gets reported.
        """
        reactor.callLater(0, self.go)
        reactor.iterate(0.01)
        self.fail("Deliberate failure to mask the hidden exception")


问题


面经


文章

微信
公众号

扫码关注公众号