def test_deferred_service(self):
def echo(data):
x = defer.Deferred()
reactor.callLater(0, x.callback, data)
return x
self.gw.addService(echo)
d = self.doRequest('echo', 'hello')
def cb(response):
self.assertEqual(response.amfVersion, pyamf.AMF3)
self.assertTrue('/1' in response)
body_response = response['/1']
self.assertEqual(body_response.status, remoting.STATUS_OK)
self.assertEqual(body_response.body, 'hello')
return d.addCallback(cb)
python类callLater()的实例源码
def test_deferred_auth(self):
d = defer.Deferred()
def auth(u, p):
return reactor.callLater(0, lambda: True)
p = self.getProcessor({'echo': lambda x: x}, authenticator=auth)
request = remoting.Request('echo', envelope=remoting.Envelope())
def cb(result):
self.assertTrue(result)
d.callback(None)
p(request).addCallback(cb).addErrback(lambda failure: d.errback())
return d
def test_exposed_preprocessor(self):
d = defer.Deferred()
def preprocessor(http_request, service_request):
return reactor.callLater(0, lambda: True)
preprocessor = gateway.expose_request(preprocessor)
p = self.getProcessor({'echo': lambda x: x}, preprocessor=preprocessor)
request = remoting.Request('echo', envelope=remoting.Envelope())
def cb(result):
self.assertTrue(result)
d.callback(None)
p(request).addCallback(cb).addErrback(lambda failure: d.errback())
return d
def sendPart(self):
""" send a segment of data """
if not self.connected:
self._pendingSend = None
return # may be buggy (if handle_CCL/BYE is called but self.connected is still 1)
data = self.file.read(self.segmentSize)
if data:
dataSize = len(data)
header = self.makeHeader(dataSize)
self.bytesSent += dataSize
self.transport.write(header + data)
self._pendingSend = reactor.callLater(0, self.sendPart)
else:
self._pendingSend = None
self.completed = 1
# mapping of error codes to error messages
def processOne(self, deferred):
if self.stopping:
deferred.callback(self.root)
return
try:
self.remaining=self.iterator.next()
except StopIteration:
self.stopping=1
except:
deferred.errback(failure.Failure())
if self.remaining%10==0:
reactor.callLater(0, self.updateBar, deferred)
if self.remaining%100==0:
log.msg(self.remaining)
reactor.callLater(0, self.processOne, deferred)
def fixPdb():
def do_stop(self, arg):
self.clear_all_breaks()
self.set_continue()
from twisted.internet import reactor
reactor.callLater(0, reactor.stop)
return 1
def help_stop(self):
print """stop - Continue execution, then cleanly shutdown the twisted reactor."""
def set_quit(self):
os._exit(0)
pdb.Pdb.set_quit = set_quit
pdb.Pdb.do_stop = do_stop
pdb.Pdb.help_stop = help_stop
def serviceStarted(self):
self.authenticatedWith = []
self.loginAttempts = 0
self.user = None
self.nextService = None
self.portal = self.transport.factory.portal
self.supportedAuthentications = []
for i in self.portal.listCredentialsInterfaces():
if i in self.interfaceToMethod:
self.supportedAuthentications.append(self.interfaceToMethod[i])
if not self.transport.isEncrypted('out'):
if 'password' in self.supportedAuthentications:
self.supportedAuthentications.remove('password')
if 'keyboard-interactive' in self.supportedAuthentications:
self.supportedAuthentications.remove('keyboard-interactive')
# don't let us transport password in plaintext
self.cancelLoginTimeout = reactor.callLater(self.loginTimeout,
self.timeoutAuthentication)
def setTimeout(self, seconds, timeoutFunc=timeout, *args, **kw):
"""Set a timeout function to be triggered if I am not called.
@param seconds: How long to wait (from now) before firing the
timeoutFunc.
@param timeoutFunc: will receive the Deferred and *args, **kw as its
arguments. The default timeoutFunc will call the errback with a
L{TimeoutError}.
"""
warnings.warn(
"Deferred.setTimeout is deprecated. Look for timeout "
"support specific to the API you are using instead.",
DeprecationWarning, stacklevel=2)
if self.called:
return
assert not self.timeoutCall, "Don't call setTimeout twice on the same Deferred."
from twisted.internet import reactor
self.timeoutCall = reactor.callLater(
seconds,
lambda: self.called or timeoutFunc(self, *args, **kw))
return self.timeoutCall
def testStopOutstanding(self):
"""
Test that a running iterator paused on a third-party Deferred will
properly stop when .stop() is called.
"""
testControlD = defer.Deferred()
outstandingD = defer.Deferred()
def myiter():
reactor.callLater(0, testControlD.callback, None)
yield outstandingD
self.fail()
c = task.Cooperator()
d = c.coiterate(myiter())
def stopAndGo(ign):
c.stop()
outstandingD.callback('arglebargle')
testControlD.addCallback(stopAndGo)
d.addCallback(self.cbIter)
d.addErrback(self.ebIter)
return d.addCallback(lambda result: self.assertEquals(result, self.RESULT))
def testWaitDeferred(self):
# Tests if the callable isn't scheduled again before the returned
# deferred has fired.
timings = [0.2, 0.8]
clock = task.Clock()
def foo():
d = defer.Deferred()
d.addCallback(lambda _: lc.stop())
clock.callLater(1, d.callback, None)
return d
lc = TestableLoopingCall(clock, foo)
d = lc.start(0.2)
clock.pump(timings)
self.failIf(clock.calls)
def testFailurePropagation(self):
# Tests if the failure of the errback of the deferred returned by the
# callable is propagated to the lc errback.
#
# To make sure this test does not hang trial when LoopingCall does not
# wait for the callable's deferred, it also checks there are no
# calls in the clock's callLater queue.
timings = [0.3]
clock = task.Clock()
def foo():
d = defer.Deferred()
clock.callLater(0.3, d.errback, TestException())
return d
lc = TestableLoopingCall(clock, foo)
d = lc.start(1)
self.assertFailure(d, TestException)
clock.pump(timings)
self.failIf(clock.calls)
return d
def test_cancelDelayedCall(self):
"""
Test that when a DelayedCall is cancelled it does not run.
"""
called = []
def function():
called.append(None)
call = reactor.callLater(0, function)
call.cancel()
# Schedule a call in two "iterations" to check to make sure that the
# above call never ran.
d = Deferred()
def check():
try:
self.assertEqual(called, [])
except:
d.errback()
else:
d.callback(None)
reactor.callLater(0, reactor.callLater, 0, check)
return d
def test_cancelCalledDelayedCallSynchronous(self):
"""
Test that cancelling a DelayedCall in the DelayedCall's function as
that function is being invoked by the DelayedCall raises the
appropriate exception.
"""
d = Deferred()
def later():
try:
self.assertRaises(error.AlreadyCalled, call.cancel)
except:
d.errback()
else:
d.callback(None)
call = reactor.callLater(0, later)
return d
def test_cancelCalledDelayedCallAsynchronous(self):
"""
Test that cancelling a DelayedCall after it has run its function
raises the appropriate exception.
"""
d = Deferred()
def check():
try:
self.assertRaises(error.AlreadyCalled, call.cancel)
except:
d.errback()
else:
d.callback(None)
def later():
reactor.callLater(0, check)
call = reactor.callLater(0, later)
return d
def testCallInNextIteration(self):
calls = []
def f1():
calls.append('f1')
reactor.callLater(0.0, f2)
def f2():
calls.append('f2')
reactor.callLater(0.0, f3)
def f3():
calls.append('f3')
reactor.callLater(0, f1)
self.assertEquals(calls, [])
reactor.iterate()
self.assertEquals(calls, ['f1'])
reactor.iterate()
self.assertEquals(calls, ['f1', 'f2'])
reactor.iterate()
self.assertEquals(calls, ['f1', 'f2', 'f3'])
def testCallLaterOrder(self):
l = []
l2 = []
def f(x):
l.append(x)
def f2(x):
l2.append(x)
def done():
self.assertEquals(l, range(20))
def done2():
self.assertEquals(l2, range(10))
for n in range(10):
reactor.callLater(0, f, n)
for n in range(10):
reactor.callLater(0, f, n+10)
reactor.callLater(0.1, f2, n)
reactor.callLater(0, done)
reactor.callLater(0.1, done2)
d = Deferred()
reactor.callLater(0.2, d.callback, None)
return d
def testDelayedCallStringification(self):
# Mostly just make sure str() isn't going to raise anything for
# DelayedCalls within reason.
dc = reactor.callLater(0, lambda x, y: None, 'x', y=10)
str(dc)
dc.reset(5)
str(dc)
dc.cancel()
str(dc)
dc = reactor.callLater(0, lambda: None, x=[({'hello': u'world'}, 10j), reactor], *range(10))
str(dc)
dc.cancel()
str(dc)
def calledBack(ignored):
str(dc)
d = Deferred().addCallback(calledBack)
dc = reactor.callLater(0, d.callback, None)
str(dc)
return d
def test_crash(self):
"""
reactor.crash should NOT fire shutdown triggers
"""
events = []
self.addTrigger(
"before", "shutdown",
lambda: events.append(("before", "shutdown")))
# reactor.crash called from an "after-startup" trigger is too early
# for the gtkreactor: gtk_mainloop is not yet running. Same is true
# when called with reactor.callLater(0). Must be >0 seconds in the
# future to let gtk_mainloop start first.
reactor.callWhenRunning(
reactor.callLater, 0, reactor.crash)
reactor.run()
self.failIf(events, "reactor.crash invoked shutdown triggers, but it "
"isn't supposed to.")
# XXX Test that reactor.stop() invokes shutdown triggers
def testMaybeDeferred(self):
S, E = [], []
d = defer.maybeDeferred((lambda x: x + 5), 10)
d.addCallbacks(S.append, E.append)
self.assertEquals(E, [])
self.assertEquals(S, [15])
S, E = [], []
try:
'10' + 5
except TypeError, e:
expected = str(e)
d = defer.maybeDeferred((lambda x: x + 5), '10')
d.addCallbacks(S.append, E.append)
self.assertEquals(S, [])
self.assertEquals(len(E), 1)
self.assertEquals(str(E[0].value), expected)
d = defer.Deferred()
reactor.callLater(0.2, d.callback, 'Success')
d.addCallback(self.assertEquals, 'Success')
d.addCallback(self._testMaybeError)
return d
def sendNextCommand(self):
"""(Private) Processes the next command in the queue."""
ftpCommand = self.popCommandQueue()
if ftpCommand is None:
self.nextDeferred = None
return
if not ftpCommand.ready:
self.actionQueue.insert(0, ftpCommand)
reactor.callLater(1.0, self.sendNextCommand)
self.nextDeferred = None
return
# FIXME: this if block doesn't belong in FTPClientBasic, it belongs in
# FTPClient.
if ftpCommand.text == 'PORT':
self.generatePortCommand(ftpCommand)
if self.debug:
log.msg('<-- %s' % ftpCommand.text)
self.nextDeferred = ftpCommand.deferred
self.sendLine(ftpCommand.text)
def setTimeout(self, period):
"""Change the timeout period
@type period: C{int} or C{NoneType}
@param period: The period, in seconds, to change the timeout to, or
C{None} to disable the timeout.
"""
prev = self.timeOut
self.timeOut = period
if self.__timeoutCall is not None:
if period is None:
self.__timeoutCall.cancel()
self.__timeoutCall = None
else:
self.__timeoutCall.reset(period)
elif period is not None:
self.__timeoutCall = self.callLater(period, self.__timedOut)
return prev
def maybeParseConfig(self):
if self.resolv is None:
# Don't try to parse it, don't set up a call loop
return
try:
resolvConf = file(self.resolv)
except IOError, e:
if e.errno == errno.ENOENT:
# Missing resolv.conf is treated the same as an empty resolv.conf
self.parseConfig(())
else:
raise
else:
mtime = os.fstat(resolvConf.fileno()).st_mtime
if mtime != self._lastResolvTime:
log.msg('%s changed, reparsing' % (self.resolv,))
self._lastResolvTime = mtime
self.parseConfig(resolvConf)
# Check again in a little while
from twisted.internet import reactor
self._parseCall = reactor.callLater(self._resolvReadInterval, self.maybeParseConfig)
def cacheResult(self, query, payload):
if self.verbose > 1:
log.msg('Adding %r to cache' % query)
self.cache[query] = (time.time(), payload)
if self.cancel.has_key(query):
self.cancel[query].cancel()
s = list(payload[0]) + list(payload[1]) + list(payload[2])
m = s[0].ttl
for r in s:
m = min(m, r.ttl)
from twisted.internet import reactor
self.cancel[query] = reactor.callLater(m, self.clearEntry, query)
def __cbManualSearch(self, result, tag, mbox, query, uid, searchResults = None):
if searchResults is None:
searchResults = []
i = 0
for (i, (id, msg)) in zip(range(5), result):
if self.searchFilter(query, id, msg):
if uid:
searchResults.append(str(msg.getUID()))
else:
searchResults.append(str(id))
if i == 4:
from twisted.internet import reactor
reactor.callLater(0, self.__cbManualSearch, result, tag, mbox, query, uid, searchResults)
else:
if searchResults:
self.sendUntaggedResponse('SEARCH ' + ' '.join(searchResults))
self.sendPositiveResponse(tag, 'SEARCH completed')
def apply_script(protocol, connection, config):
def send_help_nick(connection):
connection.protocol.send_chat(
"TO CHANGE YOUR NAME: Start Menu-> "
"All Programs-> Ace of Spades-> Configuration")
connection.protocol.irc_say("* Sent nick help to %s" % connection.name)
def send_help_airstrike(connection):
connection.protocol.send_chat(
"TO USE AN AIRSTRIKE: Once you have 15 points, "
"get a 6 killstreak -> "
"Then type /airstrike G4 if you want the strike to hit G4")
connection.protocol.irc_say(
"* Sent airstrike help to %s" % connection.name)
class AutoHelpConnection(connection):
def on_chat(self, value, global_message):
if deuce_howto_match(self, value):
reactor.callLater(1.0, send_help_nick, self)
if airstrike_howto_match(self, value):
reactor.callLater(1.0, send_help_airstrike, self)
return connection.on_chat(self, value, global_message)
return protocol, AutoHelpConnection
def get_input(self):
while msvcrt.kbhit():
c = msvcrt.getwch()
if c == u'\r': # new line
c = u'\n'
stdout.write(c)
self.input += c
self.protocol.dataReceived(self.input)
self.input = ''
elif c in (u'\xE0', u'\x00'):
# ignore special characters
msvcrt.getwch()
elif c == u'\x08': # delete
self.input = self.input[:-1]
stdout.write('\x08 \x08')
else:
self.input += c
stdout.write(c)
reactor.callLater(self.interval, self.get_input)
def run(self):
"""
Connect to the docker engine and begin listening for docker events
"""
self.client = docker.from_env()
now = time.time()
nowNano = now * 1000000000
startEvent = Event(status='init',
id=None,
time=int(now),
timeNano=int(nowNano),
actor=EventActor(image=None, name=None, signal=None, id=None),
action='init',
eventFrom=None,
eventType='dockerish',
engine=self,
)
self.callLater(0, self._callHandlers, 'dockerish.init', startEvent)
self.callLater(PEEK_INTERVAL_SECONDS, self._genEvents, time.time())
def getReply(self, line, proto, transport):
proto.lineReceived(line)
if line[:4] not in ['HELO', 'MAIL', 'RCPT', 'DATA']:
return succeed("")
def check_transport(_):
reply = transport.value()
if reply:
transport.clear()
return succeed(reply)
d = Deferred()
d.addCallback(check_transport)
reactor.callLater(0, lambda: d.callback(None))
return d
return check_transport(None)
def _wait_and_kill(self, killfun, proc, deferred, tries=0):
"""
Check if the process is still alive, and call the killfun
after waiting several times during a timeout period.
:param tries: counter of tries, used in recursion
:type tries: int
"""
if tries < TERMINATE_MAXTRIES:
if proc.transport.pid is None:
deferred.callback(True)
return
else:
self.log.debug('Process did not die, waiting...')
tries += 1
reactor.callLater(
TERMINATE_WAIT,
self._wait_and_kill, killfun, proc, deferred, tries)
return
# after running out of patience, we try a killProcess
d = killfun()
d.addCallback(lambda _: deferred.callback(True))
return d
def _connect_to_management(self, retries=30):
if retries == 0:
self.log.error('Timeout while connecting to management')
self.failed = True
return
def retry(retries):
ctr = retries - 1
self.log.warn(
'Error connecting to management, retrying. '
'Retries left: %s' % ctr)
reactor.callLater(
0.1, self._connect_to_management, ctr)
self._d = connectProtocol(
self._management_endpoint,
ManagementProtocol(verbose=True))
self._d.addCallbacks(
self._got_management_protocol,
lambda f: retry(retries))