def test_exposed_preprocessor(self):
d = defer.Deferred()
def preprocessor(http_request, service_request):
return reactor.callLater(0, lambda: True)
preprocessor = gateway.expose_request(preprocessor)
gw = twisted.TwistedGateway(
{'echo': lambda x: x},
expose_request=False,
preprocessor=preprocessor
)
proc = twisted.AMF3RequestProcessor(gw)
request = remoting.Request(
'null',
body=[
messaging.RemotingMessage(body=['spam.eggs'], operation='echo')
]
)
def cb(result):
try:
self.assertTrue(result)
except:
d.errback()
else:
d.callback(None)
proc(request).addCallback(cb).addErrback(lambda failure: d.errback())
return d
test_twisted.py 文件源码
python
阅读 24
收藏 0
点赞 0
评论 0
评论列表
文章目录