def test_ssl_verification_positive(self):
"""
The client transport should complete an upload of messages to
a host which provides SSL data which can be verified by the
public key specified.
"""
resource = DataCollectingResource()
context_factory = DefaultOpenSSLContextFactory(PRIVKEY, PUBKEY)
port = reactor.listenSSL(0, server.Site(resource), context_factory,
interface="127.0.0.1")
self.ports.append(port)
transport = HTTPTransport(
None, "https://localhost:%d/" % (port.getHost().port,), PUBKEY)
result = deferToThread(transport.exchange, "HI", computer_id="34",
message_api="X.Y")
def got_result(ignored):
try:
get_header = resource.request.requestHeaders.getRawHeaders
except AttributeError:
# For backwards compatibility with Twisted versions
# without requestHeaders
def get_header(header):
return [resource.request.received_headers[header]]
self.assertEqual(get_header("x-computer-id"), ["34"])
self.assertEqual(
get_header("user-agent"), ["landscape-client/%s" % (VERSION,)])
self.assertEqual(get_header("x-message-api"), ["X.Y"])
self.assertEqual(bpickle.loads(resource.content), "HI")
result.addCallback(got_result)
return result
评论列表
文章目录