def _onTaskDone(self, taskdone, taskid):
self._runningTasks -= 1
## Remove Call
calltm = self._calls.get(taskid)
if calltm:
del self._calls[taskid]
calltm.cancel()
## Call next task if exists
if self._runningTasks < self._maxConcurrentTasks and self._queuedTasks:
self._runningTasks += 1
task, args, kwargs, queuedf = self._queuedTasks.pop(0)
taskid = self._getTaskId()
taskdf = task(*args, **kwargs).addBoth(self._onTaskDone, taskid)
taskdf.chainDeferred(queuedf)
calltm = reactor.callLater(5, self._fireTimeout, taskid, taskdf)
self._calls[taskid] = calltm
## Raize Feilure
if isinstance(taskdone, failure.Failure):
taskdone.trap()
return taskdone
python类callLater()的实例源码
def render_tab(self, request):
self.has_debug_panel = False
cstate = request.sdata.connection_state
# Send update for the initiate state, to let the javascript style everything
reactor.callLater(0.1, lambda: request.sdata.set_conn_state(cstate))
if hasattr(request.sdata, 'conn_params'):
params = request.sdata.conn_params
else:
params = DEFAULT_CONN_PARAMS
return connection_html.format(**params)
def shutdown(seconds, result=None):
if not isinstance(seconds, numbers.Number):
log.err(seconds)
seconds = 1
d = Deferred()
d.addCallback(stop_reaktor)
reactor.callLater(seconds, d.callback, result)
return d
def wait(seconds, result=None):
"""Returns a deferred that will be fired later"""
d = Deferred()
reactor.callLater(seconds, d.callback, result)
return d
def _schedule_timeout(self, next_timeout):
if next_timeout:
delay = max(next_timeout - time.time(), 0)
if self._timeout_task and self._timeout_task.active():
if next_timeout < self._timeout:
self._timeout_task.reset(delay)
self._timeout = next_timeout
else:
self._timeout_task = reactor.callLater(delay, self._on_loop_timer)
self._timeout = next_timeout
def test_deferred_preprocessor(self):
d = defer.Deferred()
def preprocessor(u, p):
return reactor.callLater(0, lambda: True)
p = self.getProcessor({'echo': lambda x: x}, preprocessor=preprocessor)
request = remoting.Request('echo', envelope=remoting.Envelope())
def cb(result):
self.assertTrue(result)
d.callback(None)
p(request).addCallback(cb).addErrback(lambda failure: d.errback())
return d
def test_error_deferred_body(self):
d = defer.Deferred()
def echo(x):
d2 = defer.Deferred()
def cb(result):
raise IndexError
reactor.callLater(0, lambda: d2.callback(None))
d2.addCallback(cb)
return d2
p = self.getProcessor({'echo': echo}, expose_request=False)
request = remoting.Request('echo', envelope=remoting.Envelope())
request.body = ['a']
def cb(result):
self.assertTrue(isinstance(result, remoting.Response))
self.assertTrue(result.status, remoting.STATUS_ERROR)
self.assertTrue(isinstance(result.body, remoting.ErrorFault))
self.assertEqual(result.body.code, 'IndexError')
return p(request).addCallback(cb).addErrback(lambda x: d.errback())
def test_exposed_preprocessor(self):
d = defer.Deferred()
def preprocessor(http_request, service_request):
return reactor.callLater(0, lambda: True)
preprocessor = gateway.expose_request(preprocessor)
gw = twisted.TwistedGateway(
{'echo': lambda x: x},
expose_request=False,
preprocessor=preprocessor
)
proc = twisted.AMF3RequestProcessor(gw)
request = remoting.Request(
'null',
body=[
messaging.RemotingMessage(body=['spam.eggs'], operation='echo')
]
)
def cb(result):
try:
self.assertTrue(result)
except:
d.errback()
else:
d.callback(None)
proc(request).addCallback(cb).addErrback(lambda failure: d.errback())
return d
def test_error_deferred_body(self):
d = defer.Deferred()
def echo(x):
d2 = defer.Deferred()
def cb(result):
raise IndexError
reactor.callLater(0, lambda: d2.callback(None))
d2.addCallback(cb)
return d2
gw = twisted.TwistedGateway({'echo': echo}, expose_request=False)
proc = twisted.AMF3RequestProcessor(gw)
request = remoting.Request(
'null',
body=[
messaging.RemotingMessage(body=['spam.eggs'], operation='echo')
])
def cb(result):
try:
self.assertTrue(isinstance(result, remoting.Response))
self.assertTrue(result.status, remoting.STATUS_ERROR)
self.assertTrue(
isinstance(result.body, messaging.ErrorMessage)
)
self.assertEqual(result.body.faultCode, 'IndexError')
except:
d.errback()
else:
d.callback(None)
proc(request).addCallback(cb).addErrback(lambda x: d.errback())
return d
def open_spider(self, spider, start_requests):
self.start_requests = start_requests
self.spider = spider
yield None
reactor.callLater(0, self._next_request)
def _callLater(self, *args, **kwargs):
from twisted.internet import reactor
return reactor.callLater(*args, **kwargs)
def _sendLine(self):
if self._queue:
self._reallySendLine(self._queue.pop(0))
self._queueEmptying = reactor.callLater(self.lineRate,
self._sendLine)
else:
self._queueEmptying = None
### Interface level client->user output methods
###
### You'll want to override these.
### Methods relating to the server itself
def setKeepAlive(self,t):
self.keepAliveDelay=t
self.stopKeepAlive()
self.stopKeepAliveID = reactor.callLater(t, self.sendKeepAlive)
def sendKeepAlive(self):
self.sendFLAP("",0x05)
self.stopKeepAliveID = reactor.callLater(self.keepAliveDelay, self.sendKeepAlive)
def handleError():
from twisted.python import failure
global exitStatus
exitStatus = 2
reactor.callLater(0.01, _stopReactor)
log.err(failure.Failure())
raise
def _ebExit(f):
global exitStatus
if hasattr(f.value, 'value'):
s = f.value.value
else:
s = str(f)
exitStatus = "conch: exiting with error %s" % f
reactor.callLater(0.1, _stopReactor)
def handleInput(self, char):
#log.msg('handling %s' % repr(char))
if char in ('\n', '\r'):
self.escapeMode = 1
self.write(char)
elif self.escapeMode == 1 and char == options['escape']:
self.escapeMode = 2
elif self.escapeMode == 2:
self.escapeMode = 1 # so we can chain escapes together
if char == '.': # disconnect
log.msg('disconnecting from escape')
stopConnection()
return
elif char == '\x1a': # ^Z, suspend
def _():
_leaveRawMode()
sys.stdout.flush()
sys.stdin.flush()
os.kill(os.getpid(), signal.SIGTSTP)
_enterRawMode()
reactor.callLater(0, _)
return
elif char == 'R': # rekey connection
log.msg('rekeying connection')
self.conn.transport.sendKexInit()
return
elif char == '#': # display connections
self.stdio.write('\r\nThe following connections are open:\r\n')
channels = self.conn.channels.keys()
channels.sort()
for channelId in channels:
self.stdio.write(' #%i %s\r\n' % (channelId, str(self.conn.channels[channelId])))
return
self.write('~' + char)
else:
self.escapeMode = 0
self.write(char)
def dataReceived(self, data):
from twisted.internet import reactor
for ch in data:
if ch == '\x1b':
if self.inEscape:
self.keyReceived(ch)
self.inEscape = ''
else:
self.inEscape = ch
self.escapeCall = reactor.callLater(self.escapeTimeout,
self.endEscape)
elif ch in 'ABCD' and self.inEscape:
self.inEscape = ''
self.escapeCall.cancel()
if ch == 'A':
self.keyReceived('<Up>')
elif ch == 'B':
self.keyReceived('<Down>')
elif ch == 'C':
self.keyReceived('<Right>')
elif ch == 'D':
self.keyReceived('<Left>')
elif self.inEscape:
self.inEscape += ch
else:
self.keyReceived(ch)
def repaint(self):
if self._paintCall is None:
from twisted.internet import reactor
self._paintCall = reactor.callLater(0, self._paint)
ContainerWidget.repaint(self)