test_endpoints.py 文件源码

python
阅读 21 收藏 0 点赞 0 评论 0

项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码
def test_connectionCancelledBeforeSecure(self):
        """
        If the connection is cancelled before the SSH transport layer has
        finished key exchange (ie, gotten to the point where we may attempt to
        authenticate), the L{Deferred} returned by
        L{SSHCommandClientEndpoint.connect} fires with a L{Failure} wrapping
        L{CancelledError} and the connection is aborted.
        """
        endpoint = SSHCommandClientEndpoint.newConnection(
            self.reactor, b"/bin/ls -l", b"dummy user",
            self.hostname, self.port, knownHosts=self.knownHosts,
            ui=FixedResponseUI(False))

        factory = Factory()
        factory.protocol = Protocol
        d = endpoint.connect(factory)

        transport = AbortableFakeTransport(None, isServer=False)
        factory = self.reactor.tcpClients[0][2]
        client = factory.buildProtocol(None)
        client.makeConnection(transport)
        d.cancel()

        self.failureResultOf(d).trap(CancelledError)
        self.assertTrue(transport.aborted)
        # Make sure the connection closing doesn't result in unexpected
        # behavior when due to cancellation:
        client.connectionLost(Failure(ConnectionDone()))
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号