def test_error_deferred_body(self):
d = defer.Deferred()
def echo(x):
d2 = defer.Deferred()
def cb(result):
raise IndexError
reactor.callLater(0, lambda: d2.callback(None))
d2.addCallback(cb)
return d2
gw = twisted.TwistedGateway({'echo': echo}, expose_request=False)
proc = twisted.AMF3RequestProcessor(gw)
request = remoting.Request(
'null',
body=[
messaging.RemotingMessage(body=['spam.eggs'], operation='echo')
])
def cb(result):
try:
self.assertTrue(isinstance(result, remoting.Response))
self.assertTrue(result.status, remoting.STATUS_ERROR)
self.assertTrue(
isinstance(result.body, messaging.ErrorMessage)
)
self.assertEqual(result.body.faultCode, 'IndexError')
except:
d.errback()
else:
d.callback(None)
proc(request).addCallback(cb).addErrback(lambda x: d.errback())
return d
test_twisted.py 文件源码
python
阅读 26
收藏 0
点赞 0
评论 0
评论列表
文章目录