def request_with_payload(self, payload):
resource = DataCollectingResource()
port = reactor.listenTCP(
0, server.Site(resource), interface="127.0.0.1")
self.ports.append(port)
transport = HTTPTransport(
None, "http://localhost:%d/" % (port.getHost().port,))
result = deferToThread(transport.exchange, payload, computer_id="34",
exchange_token="abcd-efgh", message_api="X.Y")
def got_result(ignored):
try:
get_header = resource.request.requestHeaders.getRawHeaders
except AttributeError:
# For backwards compatibility with Twisted versions
# without requestHeaders
def get_header(header):
return [resource.request.received_headers[header]]
self.assertEqual(get_header(u"x-computer-id"), ["34"])
self.assertEqual(get_header("x-exchange-token"), ["abcd-efgh"])
self.assertEqual(
get_header("user-agent"), ["landscape-client/%s" % (VERSION,)])
self.assertEqual(get_header("x-message-api"), ["X.Y"])
self.assertEqual(bpickle.loads(resource.content), payload)
result.addCallback(got_result)
return result
评论列表
文章目录