def inlineCallbacks(f,*args, **kwargs):
# ...
try:
gen = f(*args, **kwargs)
except defer._DefGen_Return:
raise TypeError(
"inlineCallbacks requires %r to produce a generator; instead"
"caught returnValue being used in a non-generator" % (f,))
if not isinstance(gen, types.GeneratorType):
raise TypeError(
"inlineCallbacks requires %r to produce a generator; "
"instead got %r" % (f, gen))
return defer._inlineCallbacks(None, gen, defer.Deferred())
# ...
# ...
# ...
python类Deferred()的实例源码
def test_deferred_service(self):
def echo(data):
x = defer.Deferred()
reactor.callLater(0, x.callback, data)
return x
self.gw.addService(echo)
d = self.doRequest('echo', 'hello')
def cb(response):
self.assertEqual(response.amfVersion, pyamf.AMF3)
self.assertTrue('/1' in response)
body_response = response['/1']
self.assertEqual(body_response.status, remoting.STATUS_OK)
self.assertEqual(body_response.body, 'hello')
return d.addCallback(cb)
def test_exposed_preprocessor(self):
d = defer.Deferred()
def pp(hr, sr):
self.assertEqual(hr, 'hello')
self.assertIdentical(sr, self.service_request)
d.callback(None)
pp = gateway.expose_request(pp)
gw = twisted.TwistedGateway({'echo': lambda x: x}, preprocessor=pp)
self.service_request = gateway.ServiceRequest(
None, gw.services['echo'], None
)
gw.preprocessRequest(self.service_request, http_request='hello')
return d
def test_exposed_preprocessor_no_request(self):
d = defer.Deferred()
def pp(hr, sr):
self.assertEqual(hr, None)
self.assertIdentical(sr, self.service_request)
d.callback(None)
pp = gateway.expose_request(pp)
gw = twisted.TwistedGateway(
{'echo': lambda x: x}, preprocessor=pp
)
self.service_request = gateway.ServiceRequest(
None, gw.services['echo'], None
)
gw.preprocessRequest(self.service_request)
return d
def test_authenticate(self):
d = defer.Deferred()
def auth(u, p):
try:
self.assertEqual(u, 'u')
self.assertEqual(p, 'p')
except:
d.errback(failure.Failure())
else:
d.callback(None)
gw = twisted.TwistedGateway({'echo': lambda x: x}, authenticator=auth)
self.service_request = gateway.ServiceRequest(
None, gw.services['echo'], None
)
gw.authenticateRequest(self.service_request, 'u', 'p')
return d
def test_error_auth(self):
def auth(u, p):
raise IndexError
p = self.getProcessor({'echo': lambda x: x}, authenticator=auth)
request = remoting.Request('echo', envelope=remoting.Envelope())
d = p(request)
self.assertTrue(isinstance(d, defer.Deferred))
response = d.result
self.assertTrue(isinstance(response, remoting.Response))
self.assertTrue(response.status, remoting.STATUS_ERROR)
self.assertTrue(isinstance(response.body, remoting.ErrorFault))
self.assertEqual(response.body.code, 'IndexError')
def test_auth_fail(self):
def auth(u, p):
return False
p = self.getProcessor({'echo': lambda x: x}, authenticator=auth)
request = remoting.Request('echo', envelope=remoting.Envelope())
d = p(request)
self.assertTrue(isinstance(d, defer.Deferred))
def check_response(response):
self.assertTrue(isinstance(response, remoting.Response))
self.assertTrue(response.status, remoting.STATUS_ERROR)
self.assertTrue(isinstance(response.body, remoting.ErrorFault))
self.assertEqual(response.body.code, 'AuthenticationError')
d.addCallback(check_response)
return d
def test_deferred_auth(self):
d = defer.Deferred()
def auth(u, p):
return reactor.callLater(0, lambda: True)
p = self.getProcessor({'echo': lambda x: x}, authenticator=auth)
request = remoting.Request('echo', envelope=remoting.Envelope())
def cb(result):
self.assertTrue(result)
d.callback(None)
p(request).addCallback(cb).addErrback(lambda failure: d.errback())
return d
def test_exposed_preprocessor(self):
d = defer.Deferred()
def preprocessor(http_request, service_request):
return reactor.callLater(0, lambda: True)
preprocessor = gateway.expose_request(preprocessor)
p = self.getProcessor({'echo': lambda x: x}, preprocessor=preprocessor)
request = remoting.Request('echo', envelope=remoting.Envelope())
def cb(result):
self.assertTrue(result)
d.callback(None)
p(request).addCallback(cb).addErrback(lambda failure: d.errback())
return d
def test_unknown_service_request(self):
gw = twisted.TwistedGateway(
{'echo': lambda x: x}, expose_request=False
)
proc = twisted.AMF3RequestProcessor(gw)
request = remoting.Request(
'null', body=[
messaging.RemotingMessage(body=['spam.eggs'], operation='ss')
]
)
d = proc(request)
self.assertTrue(isinstance(d, defer.Deferred))
response = d.result
self.assertTrue(isinstance(response, remoting.Response))
self.assertTrue(response.status, remoting.STATUS_ERROR)
self.assertTrue(isinstance(response.body, messaging.ErrorMessage))
def test_error_body(self):
def echo(x):
raise KeyError
gw = twisted.TwistedGateway({'echo': echo}, expose_request=False)
proc = twisted.AMF3RequestProcessor(gw)
request = remoting.Request(
'null',
body=[
messaging.RemotingMessage(body=['spam.eggs'], operation='echo')
]
)
d = proc(request)
self.assertTrue(isinstance(d, defer.Deferred))
response = d.result
self.assertTrue(isinstance(response, remoting.Response))
self.assertTrue(response.status, remoting.STATUS_ERROR)
self.assertTrue(isinstance(response.body, messaging.ErrorMessage))
self.assertEqual(response.body.faultCode, 'KeyError')
def logOn(self, chatui):
"""
@returns: this breaks with L{interfaces.IAccount}
@returntype: DeferredList of L{interfaces.IClient}s
"""
# Overriding basesupport's implementation on account of the
# fact that _startLogOn tends to return a deferredList rather
# than a simple Deferred, and we need to do registerAccountClient.
if (not self._isConnecting) and (not self._isOnline):
self._isConnecting = 1
d = self._startLogOn(chatui)
d.addErrback(self._loginFailed)
def registerMany(results):
for success, result in results:
if success:
chatui.registerAccountClient(result)
self._cb_logOn(result)
else:
log.err(result)
d.addCallback(registerMany)
return d
else:
raise error.ConnectionError("Connection in progress")
def testOnFailure(self):
"""
Test that the SASL error condition is correctly extracted.
"""
self.authenticator = xmlstream.Authenticator()
self.xmlstream = xmlstream.XmlStream(self.authenticator)
init = sasl.SASLInitiatingInitializer(self.xmlstream)
failure = domish.Element(('urn:ietf:params:xml:ns:xmpp-sasl',
'failure'))
failure.addElement('not-authorized')
init._deferred = defer.Deferred()
init.onFailure(failure)
self.assertFailure(init._deferred, sasl.SASLAuthError)
init._deferred.addCallback(lambda e:
self.assertEquals('not-authorized',
e.condition))
return init._deferred
def _login(userHandle, passwd, nexusServer, cached=0, authData=''):
"""
This function is used internally and should not ever be called
directly.
"""
cb = Deferred()
def _cb(server, auth):
loginFac = ClientFactory()
loginFac.protocol = lambda : PassportLogin(cb, userHandle, passwd, server, auth)
reactor.connectSSL(_parsePrimitiveHost(server)[0], 443, loginFac, ClientContextFactory())
if cached:
_cb(nexusServer, authData)
else:
fac = ClientFactory()
d = Deferred()
d.addCallbacks(_cb, callbackArgs=(authData,))
d.addErrback(lambda f: cb.errback(f))
fac.protocol = lambda : PassportNexus(d, nexusServer)
reactor.connectSSL(_parsePrimitiveHost(nexusServer)[0], 443, fac, ClientContextFactory())
return cb
def addListGroup(self, name):
"""
Used to create a new list group.
A default callback is added to the
returned Deferred which updates the
contacts attribute of the factory.
@param name: The desired name of the new group.
@return: A Deferred, the callbacck for which will be called
when the server clarifies that the new group has been
created. The callback argument will be a tuple with 3
elements: the new list version (int), the new group name
(str) and the new group ID (int).
"""
id, d = self._createIDMapping()
self.sendLine("ADG %s %s 0" % (id, quote(name)))
def _cb(r):
self.factory.contacts.version = r[0]
self.factory.contacts.setGroup(r[1], r[2])
return r
return d.addCallback(_cb)
def remListGroup(self, groupID):
"""
Used to remove a list group.
A default callback is added to the
returned Deferred which updates the
contacts attribute of the factory.
@param groupID: the ID of the desired group to be removed.
@return: A Deferred, the callback for which will be called when
the server clarifies the deletion of the group.
The callback argument will be a tuple with 2 elements:
the new list version (int) and the group ID (int) of
the removed group.
"""
id, d = self._createIDMapping()
self.sendLine("RMG %s %s" % (id, groupID))
def _cb(r):
self.factory.contacts.version = r[0]
self.factory.contacts.remGroup(r[1])
return r
return d.addCallback(_cb)
def changeScreenName(self, newName):
"""
Used to change your current screen name.
A default callback is added to the returned
Deferred which updates the screenName attribute
of the factory and also updates the contact list
version.
@param newName: the new screen name
@return: A Deferred, the callback for which will be called
when the server sends an adequate reply.
The callback argument will be a tuple of 2 elements:
the new list version and the new screen name.
"""
id, d = self._createIDMapping()
self.sendLine("REA %s %s %s" % (id, self.factory.userHandle, quote(newName)))
def _cb(r):
self.factory.contacts.version = r[0]
self.factory.screenName = r[1]
return r
return d.addCallback(_cb)
def inviteUser(self, userHandle):
"""
used to invite a user to the current switchboard server.
@param userHandle: the user handle (passport) of the desired user.
@return: A Deferred, the callback for which will be called
when the server notifies us that the user has indeed
been invited. The callback argument will be a tuple
with 1 element, the sessionID given to the invited user.
I'm not sure if this is useful or not.
"""
id, d = self._createIDMapping()
self.sendLine("CAL %s %s" % (id, userHandle))
return d
def displayhook(self, obj):
self.locals['_'] = obj
if isinstance(obj, defer.Deferred):
# XXX Ick, where is my "hasFired()" interface?
if hasattr(obj, "result"):
self.write(repr(obj))
elif id(obj) in self._pendingDeferreds:
self.write("<Deferred #%d>" % (self._pendingDeferreds[id(obj)][0],))
else:
d = self._pendingDeferreds
k = self.numDeferreds
d[id(obj)] = (k, obj)
self.numDeferreds += 1
obj.addCallbacks(self._cbDisplayDeferred, self._ebDisplayDeferred,
callbackArgs=(k, obj), errbackArgs=(k, obj))
self.write("<Deferred #%d>" % (k,))
elif obj is not None:
self.write(repr(obj))
def sendGlobalRequest(self, request, data, wantReply = 0):
"""
Send a global request for this connection. Current this is only used
for remote->local TCP forwarding.
@type request: C{str}
@type data: C{str}
@type wantReply: C{bool}
@rtype C{Deferred}/C{None}
"""
self.transport.sendPacket(MSG_GLOBAL_REQUEST,
common.NS(request)
+ (wantReply and '\xff' or '\x00')
+ data)
if wantReply:
d = defer.Deferred()
self.deferreds.setdefault('global', []).append(d)
return d
def _pamConv(self, items):
resp = []
for message, kind in items:
if kind == 1: # password
resp.append((message, 0))
elif kind == 2: # text
resp.append((message, 1))
elif kind in (3, 4):
return defer.fail(error.ConchError('cannot handle PAM 3 or 4 messages'))
else:
return defer.fail(error.ConchError('bad PAM auth kind %i' % kind))
packet = NS('')+NS('')+NS('')
packet += struct.pack('>L', len(resp))
for prompt, echo in resp:
packet += NS(prompt)
packet += chr(echo)
self.transport.sendPacket(MSG_USERAUTH_INFO_REQUEST, packet)
self._pamDeferred = defer.Deferred()
return self._pamDeferred
def modelChanged(self, payload):
request = payload.get('request', None)
if request is None:
request = Dummy()
request.d = document
oldNode = self.node
if payload.has_key(self.submodel):
data = payload[self.submodel]
else:
data = self.getData(request)
newNode = self._regenerate(request, oldNode, data)
returnNode = self.dispatchResult(request, oldNode, newNode)
# shot in the dark: this seems to make *my* code work. probably will
# break if returnNode returns a Deferred, as it's supposed to be able
# to do -glyph
# self.viewStack.push(self)
# self.controller.controllerStack.push(self.controller)
self.handleNewNode(request, returnNode)
self.handleOutstanding(request)
self.controller.domChanged(request, self, returnNode)
def dispatchResult(self, request, node, result):
"""
Check a given result from handling a node and hand it to a process*
method which will convert the result into a node and insert it
into the DOM tree. Return the new node.
"""
if not isinstance(result, defer.Deferred):
adapter = INodeMutator(result, None)
if adapter is None:
raise NotImplementedError(
"Your factory method returned %s, but there is no "
"INodeMutator adapter registerred for %s." %
(result, getattr(result, "__class__",
None) or type(result)))
result = adapter.generate(request, node)
if isinstance(result, defer.Deferred):
self.outstandingCallbacks += 1
result.addCallback(self.dispatchResultCallback, request, node)
result.addErrback(self.renderFailure, request)
# Got to wait until the callback comes in
return result
def getSubmodel(self, request, name):
"""
Get the submodel `name' of this model. If I ever return a
Deferred, then I ought to check for cached values (created by
L{setSubmodel}) before doing a regular Deferred lookup.
"""
if self.submodels.has_key(name):
return self.submodels[name]
if not self.submodelCheck(request, name):
return None
m = self.submodelFactory(request, name)
if m is None:
return None
sm = adaptToIModel(m, self, name)
self.submodels[name] = sm
return sm
def dispatchResult(self, request, node, result):
"""Check a given result from handling a node and look up a NodeMutator
adapter which will convert the result into a node and insert it
into the DOM tree. Return the new node.
"""
if not isinstance(result, defer.Deferred):
if node.parentNode is not None:
node.parentNode.replaceChild(result, node)
else:
raise RuntimeError, "We're dying here, please report this immediately"
else:
self.outstandingCallbacks += 1
result.addCallback(self.dispatchResultCallback, request, node)
result.addErrback(self.renderFailure, request)
# Got to wait until the callback comes in
return result
def handleControllerResults(self,
controllerResult, request, node, controller, view):
"""Handle a deferred from a controller.
"""
self.outstandingCallbacks -= 1
if isinstance(controllerResult, defer.Deferred):
self.outstandingCallbacks += 1
controllerResult.addCallback(
self.handleControllerResults,
request,
node,
controller,
view)
controllerResult.addErrback(self.renderFailure, request)
else:
viewResult = view.generate(request, node)
returnNode = self.dispatchResult(request, node, viewResult)
self.handleNewNode(request, returnNode)
return controllerResult
def possiblyDeferWidget(widget, request):
# web in my head get it out get it out
try:
disp = widget.display(request)
# if this widget wants to defer anything -- well, I guess we've got to
# defer it.
for elem in disp:
if isinstance(elem, defer.Deferred):
req = _RequestDeferral()
RenderSession(disp, req)
return req.deferred
return string.join(disp, '')
except:
io = StringIO()
traceback.print_exc(file=io)
return html.PRE(io.getvalue())
def __init__(self, url, fileOrName,
method='GET', postdata=None, headers=None,
agent="Twisted client", supportPartial=0):
self.requestedPartial = 0
if isinstance(fileOrName, types.StringTypes):
self.fileName = fileOrName
self.file = None
if supportPartial and os.path.exists(self.fileName):
fileLength = os.path.getsize(self.fileName)
if fileLength:
self.requestedPartial = fileLength
if headers == None:
headers = {}
headers["range"] = "bytes=%d-" % fileLength
else:
self.file = fileOrName
HTTPClientFactory.__init__(self, url, method=method, postdata=postdata, headers=headers, agent=agent)
self.deferred = defer.Deferred()
self.waiting = 1
def _getFunction(self, functionPath):
"""Given a string, return a function, or raise NoSuchFunction.
This returned function will be called, and should return the result
of the call, a Deferred, or a Fault instance.
Override in subclasses if you want your own policy. The default
policy is that given functionPath 'foo', return the method at
self.xmlrpc_foo, i.e. getattr(self, "xmlrpc_" + functionPath).
If functionPath contains self.separator, the sub-handler for
the initial prefix is used to search for the remaining path.
"""
if functionPath.find(self.separator) != -1:
prefix, functionPath = functionPath.split(self.separator, 1)
handler = self.getSubHandler(prefix)
if handler is None: raise NoSuchFunction(self.NOT_FOUND, "no such subHandler %s" % prefix)
return handler._getFunction(functionPath)
f = getattr(self, "xmlrpc_%s" % functionPath, None)
if not f:
raise NoSuchFunction(self.NOT_FOUND, "function %s not found" % functionPath)
elif not callable(f):
raise NoSuchFunction(self.NOT_FOUND, "function %s not callable" % functionPath)
else:
return f
def _tick(self):
"""
Run one scheduler tick.
"""
self._delayedCall = None
for taskObj in self._tasks():
iterator, doneDeferred = taskObj
try:
result = iterator.next()
except StopIteration:
self.iterators.remove(taskObj)
doneDeferred.callback(iterator)
except:
self.iterators.remove(taskObj)
doneDeferred.errback()
else:
if isinstance(result, defer.Deferred):
self.iterators.remove(taskObj)
def cbContinue(result, taskObj=taskObj):
self.coiterate(*taskObj)
result.addCallbacks(cbContinue, doneDeferred.errback)
self._reschedule()