def testCallInNextIteration(self):
calls = []
def f1():
calls.append('f1')
reactor.callLater(0.0, f2)
def f2():
calls.append('f2')
reactor.callLater(0.0, f3)
def f3():
calls.append('f3')
reactor.callLater(0, f1)
self.assertEquals(calls, [])
reactor.iterate()
self.assertEquals(calls, ['f1'])
reactor.iterate()
self.assertEquals(calls, ['f1', 'f2'])
reactor.iterate()
self.assertEquals(calls, ['f1', 'f2', 'f3'])
python类iterate()的实例源码
def testOpenSSLBuffering(self):
serverProto = self.serverProto = SingleLineServerProtocol()
clientProto = self.clientProto = RecordingClientProtocol()
server = protocol.ServerFactory()
client = self.client = protocol.ClientFactory()
server.protocol = lambda: serverProto
client.protocol = lambda: clientProto
client.buffer = []
sCTX = ssl.DefaultOpenSSLContextFactory(certPath, certPath)
cCTX = ssl.ClientContextFactory()
port = self.port = reactor.listenSSL(0, server, sCTX, interface='127.0.0.1')
reactor.connectSSL('127.0.0.1', port.getHost().port, client, cCTX)
i = 0
while i < 5000 and not client.buffer:
i += 1
reactor.iterate()
self.assertEquals(client.buffer, ["+OK <some crap>\r\n"])
def do_cleanPending(cls):
# don't import reactor when module is loaded
from twisted.internet import reactor
# flush short-range timers
reactor.iterate(0)
reactor.iterate(0)
pending = reactor.getDelayedCalls()
if pending:
s = PENDING_TIMED_CALLS_MSG
for p in pending:
s += " %s\n" % (p,)
if p.active():
p.cancel() # delete the rest
else:
print "WEIRNESS! pending timed call not active+!"
raise PendingTimedCallsError(s)
def testCallInNextIteration(self):
calls = []
def f1():
calls.append('f1')
reactor.callLater(0.0, f2)
def f2():
calls.append('f2')
reactor.callLater(0.0, f3)
def f3():
calls.append('f3')
reactor.callLater(0, f1)
self.assertEquals(calls, [])
reactor.iterate()
self.assertEquals(calls, ['f1'])
reactor.iterate()
self.assertEquals(calls, ['f1', 'f2'])
reactor.iterate()
self.assertEquals(calls, ['f1', 'f2', 'f3'])
def testOpenSSLBuffering(self):
serverProto = self.serverProto = SingleLineServerProtocol()
clientProto = self.clientProto = RecordingClientProtocol()
server = protocol.ServerFactory()
client = self.client = protocol.ClientFactory()
server.protocol = lambda: serverProto
client.protocol = lambda: clientProto
client.buffer = []
sCTX = ssl.DefaultOpenSSLContextFactory(certPath, certPath)
cCTX = ssl.ClientContextFactory()
port = self.port = reactor.listenSSL(0, server, sCTX, interface='127.0.0.1')
reactor.connectSSL('127.0.0.1', port.getHost().port, client, cCTX)
i = 0
while i < 5000 and not client.buffer:
i += 1
reactor.iterate()
self.assertEquals(client.buffer, ["+OK <some crap>\r\n"])
def do_cleanPending(cls):
# don't import reactor when module is loaded
from twisted.internet import reactor
# flush short-range timers
reactor.iterate(0)
reactor.iterate(0)
pending = reactor.getDelayedCalls()
if pending:
s = PENDING_TIMED_CALLS_MSG
for p in pending:
s += " %s\n" % (p,)
if p.active():
p.cancel() # delete the rest
else:
print "WEIRNESS! pending timed call not active+!"
raise PendingTimedCallsError(s)
def resolve(self, names):
"""Resolves DNS names in parallel"""
self._finished = False
self.results = defaultdict(list)
deferred_list = []
for name in names:
for deferred in self.lookup(name):
deferred.addCallback(self._extract_records, name)
deferred.addErrback(self._errback, name)
deferred_list.append(deferred)
deferred_list = defer.DeferredList(deferred_list)
deferred_list.addCallback(self._parse_result)
deferred_list.addCallback(self._finish)
while not self._finished:
reactor.iterate()
# Although the results are in at this point, we may need an extra
# iteration to ensure the resolver library closes its UDP sockets
reactor.iterate()
return dict(self.results)
def _cleanPending(self):
"""
Cancel all pending calls and return their string representations.
"""
reactor = self._getReactor()
# flush short-range timers
reactor.iterate(0)
reactor.iterate(0)
delayedCallStrings = []
for p in reactor.getDelayedCalls():
if p.active():
delayedString = str(p)
p.cancel()
else:
print("WEIRDNESS! pending timed call not active!")
delayedCallStrings.append(delayedString)
return delayedCallStrings
def setUp(self):
CFTPClientTestBase.setUp(self)
self.startServer()
cmds = ('-p %i -l testuser '
'--known-hosts kh_test '
'--user-authentications publickey '
'--host-key-algorithms ssh-rsa '
'-K direct '
'-i dsa_test '
'-a --nocache '
'-v '
'127.0.0.1')
port = self.server.getHost().port
cmds = test_conch._makeArgs((cmds % port).split(), mod='cftp')
log.msg('running %s %s' % (sys.executable, cmds))
self.processProtocol = SFTPTestProcess()
env = os.environ.copy()
env['PYTHONPATH'] = os.pathsep.join(sys.path)
reactor.spawnProcess(self.processProtocol, sys.executable, cmds,
env=env)
timeout = time.time() + 10
while (not self.processProtocol.buffer) and (time.time() < timeout):
reactor.iterate(0.1)
if time.time() > timeout:
self.skip = "couldn't start process"
else:
self.processProtocol.clearBuffer()
def _getCmdResult(self, cmd):
self.processProtocol.clearBuffer()
self.processProtocol.transport.write(cmd+'\n')
timeout = time.time() + 10
while (self.processProtocol.buffer.find('cftp> ') == -1) and (time.time() < timeout):
reactor.iterate(0.1)
self.failIf(time.time() > timeout, "timeout")
if self.processProtocol.buffer.startswith('cftp> '):
self.processProtocol.buffer = self.processProtocol.buffer[6:]
return self.processProtocol.buffer[:-6].strip()
def wait(self):
start = time.time()
while time.time() - start < 1:
reactor.iterate(1.0)
def setUp(self):
self.finished = 0
self.counter = 0
self.timers = {}
self.deferred = defer.Deferred()
# ick. Sometimes there are magic timers already running:
# popsicle.Freezer.tick . Kill off all such timers now so they won't
# interfere with the test. Of course, this kind of requires that
# getDelayedCalls already works, so certain failure modes won't be
# noticed.
if not hasattr(reactor, "getDelayedCalls"):
return
for t in reactor.getDelayedCalls():
t.cancel()
reactor.iterate() # flush timers
def testActive(self):
dcall = reactor.callLater(0, lambda: None)
self.assertEquals(dcall.active(), 1)
reactor.iterate()
self.assertEquals(dcall.active(), 0)
def testScheduling(self):
c = Counter()
for i in range(100):
self.schedule(c.add)
for i in range(100):
reactor.iterate()
self.assertEquals(c.index, 100)
def testCorrectOrder(self):
o = Order()
self.schedule(o.a)
self.schedule(o.b)
self.schedule(o.c)
reactor.iterate()
reactor.iterate()
reactor.iterate()
self.assertEquals(o.stage, 3)
def testNotRunAtOnce(self):
c = Counter()
self.schedule(c.add)
# scheduled tasks should not be run at once:
self.assertEquals(c.index, 0)
reactor.iterate()
self.assertEquals(c.index, 1)
def doIterations(self, count=5):
for i in range(count):
reactor.iterate()
def ____wait(d):
"."
from twisted.internet import reactor
from twisted.python import failure
l = []
d.addBoth(l.append)
while not l:
reactor.iterate()
if isinstance(l[0], failure.Failure):
l[0].raiseException()
return l[0]
def testHiddenException(self):
"""
What happens if an error is raised in a DelayedCall and an error is
also raised in the test?
L{test_reporter.TestErrorReporting.testHiddenException} checks that
both errors get reported.
Note that this behaviour is deprecated. A B{real} test would return a
Deferred that got triggered by the callLater. This would guarantee the
delayed call error gets reported.
"""
reactor.callLater(0, self.go)
reactor.iterate(0.01)
self.fail("Deliberate failure to mask the hidden exception")
def setUp(self):
CFTPClientTestBase.setUp(self)
self.startServer()
cmds = ('-p %i -l testuser '
'--known-hosts kh_test '
'--user-authentications publickey '
'--host-key-algorithms ssh-rsa '
'-K direct '
'-i dsa_test '
'-a --nocache '
'-v '
'127.0.0.1')
port = self.server.getHost().port
cmds = test_conch._makeArgs((cmds % port).split(), mod='cftp')
log.msg('running %s %s' % (sys.executable, cmds))
self.processProtocol = SFTPTestProcess()
env = os.environ.copy()
env['PYTHONPATH'] = os.pathsep.join(sys.path)
reactor.spawnProcess(self.processProtocol, sys.executable, cmds,
env=env)
timeout = time.time() + 10
while (not self.processProtocol.buffer) and (time.time() < timeout):
reactor.iterate(0.1)
if time.time() > timeout:
self.skip = "couldn't start process"
else:
self.processProtocol.clearBuffer()
def _getCmdResult(self, cmd):
self.processProtocol.clearBuffer()
self.processProtocol.transport.write(cmd+'\n')
timeout = time.time() + 10
while (self.processProtocol.buffer.find('cftp> ') == -1) and (time.time() < timeout):
reactor.iterate(0.1)
self.failIf(time.time() > timeout, "timeout")
if self.processProtocol.buffer.startswith('cftp> '):
self.processProtocol.buffer = self.processProtocol.buffer[6:]
return self.processProtocol.buffer[:-6].strip()
def wait(self):
start = time.time()
while time.time() - start < 1:
reactor.iterate(1.0)
def setUp(self):
self.finished = 0
self.counter = 0
self.timers = {}
self.deferred = defer.Deferred()
# ick. Sometimes there are magic timers already running:
# popsicle.Freezer.tick . Kill off all such timers now so they won't
# interfere with the test. Of course, this kind of requires that
# getDelayedCalls already works, so certain failure modes won't be
# noticed.
if not hasattr(reactor, "getDelayedCalls"):
return
for t in reactor.getDelayedCalls():
t.cancel()
reactor.iterate() # flush timers
def testActive(self):
dcall = reactor.callLater(0, lambda: None)
self.assertEquals(dcall.active(), 1)
reactor.iterate()
self.assertEquals(dcall.active(), 0)
def testScheduling(self):
c = Counter()
for i in range(100):
self.schedule(c.add)
for i in range(100):
reactor.iterate()
self.assertEquals(c.index, 100)
def testCorrectOrder(self):
o = Order()
self.schedule(o.a)
self.schedule(o.b)
self.schedule(o.c)
reactor.iterate()
reactor.iterate()
reactor.iterate()
self.assertEquals(o.stage, 3)
def testNotRunAtOnce(self):
c = Counter()
self.schedule(c.add)
# scheduled tasks should not be run at once:
self.assertEquals(c.index, 0)
reactor.iterate()
self.assertEquals(c.index, 1)
def doIterations(self, count=5):
for i in range(count):
reactor.iterate()
def ____wait(d):
"."
from twisted.internet import reactor
from twisted.python import failure
l = []
d.addBoth(l.append)
while not l:
reactor.iterate()
if isinstance(l[0], failure.Failure):
l[0].raiseException()
return l[0]
def testHiddenException(self):
"""
What happens if an error is raised in a DelayedCall and an error is
also raised in the test?
L{test_reporter.TestErrorReporting.testHiddenException} checks that
both errors get reported.
Note that this behaviour is deprecated. A B{real} test would return a
Deferred that got triggered by the callLater. This would guarantee the
delayed call error gets reported.
"""
reactor.callLater(0, self.go)
reactor.iterate(0.01)
self.fail("Deliberate failure to mask the hidden exception")