python类Data()的实例源码

test_sources.py 文件源码 项目:duct 作者: ducted 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def start_fake_riak_server(self, stats):
        def cb(listener):
            self.addCleanup(listener.stopListening)
            return listener

        data = static.Data(json.dumps(stats).encode(), 'application/json')

        data.isLeaf = True
        site = server.Site(data)
        endpoint = endpoints.TCP4ServerEndpoint(reactor, 0)
        return endpoint.listen(site).addCallback(cb)
test_network.py 文件源码 项目:txkube 作者: LeastAuthority 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def check_tls_config(self, ca_key, ca_cert, get_kubernetes):
        """
        Verify that a TLS server configured with the given key and certificate and
        the Kubernetes client returned by ``get_kubernetes`` can negotiate a
        TLS connection.
        """
        # Set up an HTTPS server that requires the certificate chain from the
        # configuration file.  This, because there's no way to pry inside a
        # Context and inspect its state nor any easy way to make Agent talk
        # over an in-memory transport.
        from twisted.internet import reactor
        endpoint = SSL4ServerEndpoint(
            reactor,
            0,
            CertificateOptions(
                privateKey=ca_key.original,
                certificate=ca_cert.original,
                trustRoot=trustRootFromCertificates([ca_cert]),
            ),
        )
        root = Resource()
        root.putChild(b"", Data(b"success", b"text/plain"))

        # Construct the Kubernetes client objects with a Redirectable reactor.
        # This is necessary because the URL we pass to the Agent we get needs
        # to agree with the configuration file that was already written (or it
        # won't select the right client certificate).  Just one of the many
        # reasons it would be better if we didn't have to do real networking
        # here.
        redirectable = Redirectable(reactor)
        client = get_kubernetes(redirectable).client()
        agent = client.agent

        d = endpoint.listen(Site(root))
        def listening(port):
            self.addCleanup(port.stopListening)
            redirectable.set_redirect(port.getHost().host, port.getHost().port)
            url = b"https://127.0.0.1:8443/"
            return agent.request(b"GET", url)
        d.addCallback(listening)
        return d


问题


面经


文章

微信
公众号

扫码关注公众号