test_transport.py 文件源码

python
阅读 30 收藏 0 点赞 0 评论 0

项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码
def request_with_payload(self, payload):
        resource = DataCollectingResource()
        port = reactor.listenTCP(
            0, server.Site(resource), interface="127.0.0.1")
        self.ports.append(port)
        transport = HTTPTransport(
            None, "http://localhost:%d/" % (port.getHost().port,))
        result = deferToThread(transport.exchange, payload, computer_id="34",
                               exchange_token="abcd-efgh", message_api="X.Y")

        def got_result(ignored):
            try:
                get_header = resource.request.requestHeaders.getRawHeaders
            except AttributeError:
                # For backwards compatibility with Twisted versions
                # without requestHeaders
                def get_header(header):
                    return [resource.request.received_headers[header]]

            self.assertEqual(get_header(u"x-computer-id"), ["34"])
            self.assertEqual(get_header("x-exchange-token"), ["abcd-efgh"])
            self.assertEqual(
                get_header("user-agent"), ["landscape-client/%s" % (VERSION,)])
            self.assertEqual(get_header("x-message-api"), ["X.Y"])
            self.assertEqual(bpickle.loads(resource.content), payload)
        result.addCallback(got_result)
        return result
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号