def fireSystemEvent(self, eventType):
"""See twisted.internet.interfaces.IReactorCore.fireSystemEvent.
"""
sysEvtTriggers = self._eventTriggers.get(eventType)
if not sysEvtTriggers:
return
defrList = []
for callable, args, kw in sysEvtTriggers[0]:
try:
d = callable(*args, **kw)
except:
log.deferr()
else:
if isinstance(d, Deferred):
defrList.append(d)
if defrList:
DeferredList(defrList).addBoth(self._cbContinueSystemEvent, eventType)
else:
self.callLater(0, self._continueSystemEvent, eventType)
python类Deferred()的实例源码
def runWithWarningsSuppressed(suppressedWarnings, f, *a, **kw):
"""Run the function C{f}, but with some warnings suppressed.
@param suppressedWarnings: A list of arguments to pass to filterwarnings.
Must be a sequence of 2-tuples (args, kwargs).
@param f: A callable, followed by its arguments and keyword arguments
"""
for args, kwargs in suppressedWarnings:
warnings.filterwarnings(*args, **kwargs)
addedFilters = warnings.filters[:len(suppressedWarnings)]
try:
result = f(*a, **kw)
except:
exc_info = sys.exc_info()
_resetWarningFilters(None, addedFilters)
raise exc_info[0], exc_info[1], exc_info[2]
else:
if isinstance(result, defer.Deferred):
result.addBoth(_resetWarningFilters, addedFilters)
else:
_resetWarningFilters(None, addedFilters)
return result
def test_stdin(self):
"""
Making sure getPassword accepts a password from standard input by
running a child process which uses getPassword to read in a string
which it then writes it out again. Write a string to the child
process and then read one and make sure it is the right string.
"""
p = PasswordTestingProcessProtocol()
p.finished = Deferred()
reactor.spawnProcess(
p,
sys.executable,
[sys.executable,
'-c',
('import sys\n'
'from twisted.python.util import getPassword\n'
'sys.stdout.write(getPassword())\n'
'sys.stdout.flush()\n')],
env={'PYTHONPATH': os.pathsep.join(sys.path)})
def processFinished((reason, output)):
reason.trap(ProcessDone)
self.assertEquals(output, [(1, 'secret')])
def testDeadReferenceError(self):
"""
Test that when a connection is lost, calling a method on a
RemoteReference obtained from it raises DeadReferenceError.
"""
factory, rootObjDeferred = self.getFactoryAndRootObject()
def gotRootObject(rootObj):
disconnectedDeferred = defer.Deferred()
rootObj.notifyOnDisconnect(disconnectedDeferred.callback)
def lostConnection(ign):
self.assertRaises(
pb.DeadReferenceError,
rootObj.callRemote, 'method')
disconnectedDeferred.addCallback(lostConnection)
factory.disconnect()
return disconnectedDeferred
return rootObjDeferred.addCallback(gotRootObject)
def testWaitDeferred(self):
# Tests if the callable isn't scheduled again before the returned
# deferred has fired.
timings = [0.2, 0.8]
clock = task.Clock()
def foo():
d = defer.Deferred()
d.addCallback(lambda _: lc.stop())
clock.callLater(1, d.callback, None)
return d
lc = TestableLoopingCall(clock, foo)
d = lc.start(0.2)
clock.pump(timings)
self.failIf(clock.calls)
def testFailurePropagation(self):
# Tests if the failure of the errback of the deferred returned by the
# callable is propagated to the lc errback.
#
# To make sure this test does not hang trial when LoopingCall does not
# wait for the callable's deferred, it also checks there are no
# calls in the clock's callLater queue.
timings = [0.3]
clock = task.Clock()
def foo():
d = defer.Deferred()
clock.callLater(0.3, d.errback, TestException())
return d
lc = TestableLoopingCall(clock, foo)
d = lc.start(1)
self.assertFailure(d, TestException)
clock.pump(timings)
self.failIf(clock.calls)
return d
def _request(self, method, **params):
"""
Send a request over whatever transport is provided
"""
this_id = self.next_id
self.next_id += 1
request = r'{{"jsonrpc": "2.0", "id": {}, "method": "{}", "params": {}}}' \
.format(this_id, method, json.dumps(params))
print('### SENDING REQUEST {}'.format(request))
self.sdata.add_to_push_queue('request', text=request)
self.sdata.transport_tx_cb(request)
d = defer.Deferred()
self.pending_reply_map[this_id] = d
return d
def shutdown(seconds, result=None):
if not isinstance(seconds, numbers.Number):
log.err(seconds)
seconds = 1
d = Deferred()
d.addCallback(stop_reaktor)
reactor.callLater(seconds, d.callback, result)
return d
def wait(seconds, result=None):
"""Returns a deferred that will be fired later"""
d = Deferred()
reactor.callLater(seconds, d.callback, result)
return d
def when_finished(self):
"""
Get a deferred that will be fired when the connection is closed.
"""
d = Deferred()
self._waiting.append(d)
return d
def notifyFinish(self):
"""
Return a L{Deferred} which is called back with C{None} when the request
is finished. This will probably only work if you haven't called
C{finish} yet.
"""
finished = Deferred()
self._finishedDeferreds.append(finished)
return finished
def finish(self):
"""
Record that the request is finished and callback and L{Deferred}s
waiting for notification of this.
"""
self.finished = self.finished + 1
if self._finishedDeferreds is not None:
observers = self._finishedDeferreds
self._finishedDeferreds = None
for obs in observers:
obs.callback(None)
def __init__(self, url, method, *args):
self.url, self.host = url.path, url.host
self.user, self.password = url.user, url.password
self.payload = xmlrpc.payloadTemplate % (
method, xmlrpclib.dumps(args))
self.deferred = defer.Deferred()
def __init__(self, collector, string, firstPage):
self._atFirstPage = True
self._firstPage = firstPage
self._deferred = Deferred()
StringPager.__init__(self, collector, string, callback=self.done)
def __init__(self, collector, path):
self._deferred = Deferred()
print "%s, %d bytes" % (path, os.path.getsize(path))
fd = file(path, 'rb')
FilePager.__init__(self, collector, fd, callback=self.done)
def twisted_fetch(self, url, runner):
# http://twistedmatrix.com/documents/current/web/howto/client.html
chunks = []
client = Agent(self.reactor)
d = client.request(b'GET', utf8(url))
class Accumulator(Protocol):
def __init__(self, finished):
self.finished = finished
def dataReceived(self, data):
chunks.append(data)
def connectionLost(self, reason):
self.finished.callback(None)
def callback(response):
finished = Deferred()
response.deliverBody(Accumulator(finished))
return finished
d.addCallback(callback)
def shutdown(failure):
if hasattr(self, 'stop_loop'):
self.stop_loop()
elif failure is not None:
# loop hasn't been initialized yet; try our best to
# get an error message out. (the runner() interaction
# should probably be refactored).
try:
failure.raiseException()
except:
logging.error('exception before starting loop', exc_info=True)
d.addBoth(shutdown)
runner()
self.assertTrue(chunks)
return ''.join(chunks)
def twisted_fetch(self, url, runner):
# http://twistedmatrix.com/documents/current/web/howto/client.html
chunks = []
client = Agent(self.reactor)
d = client.request(b'GET', utf8(url))
class Accumulator(Protocol):
def __init__(self, finished):
self.finished = finished
def dataReceived(self, data):
chunks.append(data)
def connectionLost(self, reason):
self.finished.callback(None)
def callback(response):
finished = Deferred()
response.deliverBody(Accumulator(finished))
return finished
d.addCallback(callback)
def shutdown(failure):
if hasattr(self, 'stop_loop'):
self.stop_loop()
elif failure is not None:
# loop hasn't been initialized yet; try our best to
# get an error message out. (the runner() interaction
# should probably be refactored).
try:
failure.raiseException()
except:
logging.error('exception before starting loop', exc_info=True)
d.addBoth(shutdown)
runner()
self.assertTrue(chunks)
return ''.join(chunks)
def twisted_fetch(self, url, runner):
# http://twistedmatrix.com/documents/current/web/howto/client.html
chunks = []
client = Agent(self.reactor)
d = client.request(b'GET', utf8(url))
class Accumulator(Protocol):
def __init__(self, finished):
self.finished = finished
def dataReceived(self, data):
chunks.append(data)
def connectionLost(self, reason):
self.finished.callback(None)
def callback(response):
finished = Deferred()
response.deliverBody(Accumulator(finished))
return finished
d.addCallback(callback)
def shutdown(failure):
if hasattr(self, 'stop_loop'):
self.stop_loop()
elif failure is not None:
# loop hasn't been initialized yet; try our best to
# get an error message out. (the runner() interaction
# should probably be refactored).
try:
failure.raiseException()
except:
logging.error('exception before starting loop', exc_info=True)
d.addBoth(shutdown)
runner()
self.assertTrue(chunks)
return ''.join(chunks)
def forward_procedure(self, func_path, uri, options=None):
@inlineCallbacks
def wrapped(*args, **kwargs):
reply_channel_name = self.channel_layer.new_channel('{}?'.format(uri))
payload = {
'func_path': func_path,
'uri': uri,
'args': args,
'kwargs': kwargs,
'reply_channel': reply_channel_name,
}
channel = Channel('wamp.events')
channel.send(payload)
d = Deferred()
def cleanup(result):
self.channels.remove(reply_channel_name)
del self.reply_channels[reply_channel_name]
self.log.info('result: {}'.format(result['total']))
d.addCallback(cleanup)
self.channels.add(reply_channel_name)
self.reply_channels[reply_channel_name] = d
yield d
self.log.info("registered procedure for '{}'".format(uri))
if options is None:
options = types.RegisterOptions()
return self.register(wrapped, uri, options=options)
def test_preprocessor(self):
d = defer.Deferred()
def pp(sr):
self.assertIdentical(sr, self.service_request)
d.callback(None)
gw = twisted.TwistedGateway({'echo': lambda x: x}, preprocessor=pp)
self.service_request = gateway.ServiceRequest(
None, gw.services['echo'], None
)
gw.preprocessRequest(self.service_request)
return d