python类ConnectionDone()的实例源码

pollreactor.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _doReadOrWrite(self, selectable, fd, event, POLLIN, POLLOUT, log, 
        faildict={
            error.ConnectionDone: failure.Failure(error.ConnectionDone()),
            error.ConnectionLost: failure.Failure(error.ConnectionLost())
        }):
        why = None
        inRead = False
        if event & POLL_DISCONNECTED and not (event & POLLIN):
            why = main.CONNECTION_LOST
        else:
            try:
                if event & POLLIN:
                    why = selectable.doRead()
                    inRead = True
                if not why and event & POLLOUT:
                    why = selectable.doWrite()
                    inRead = False
                if not selectable.fileno() == fd:
                    why = error.ConnectionFdescWentAway('Filedescriptor went away')
                    inRead = False
            except:
                log.deferr()
                why = sys.exc_info()[1]
        if why:
            self._disconnectSelectable(selectable, why, inRead)
test_manhole.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def testControlBackslash(self):
        self._testwrite("cancelled line")
        partialLine = self.recvlineClient.expect("cancelled line")

        def gotPartialLine(ign):
            self._assertBuffer(
                [">>> cancelled line"])
            self._testwrite(manhole.CTRL_BACKSLASH)

            d = self.recvlineClient.onDisconnection
            return self.assertFailure(d, error.ConnectionDone)

        def gotClearedLine(ign):
            self._assertBuffer(
                [""])

        return partialLine.addCallback(gotPartialLine).addCallback(gotClearedLine)
test_manhole.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def testControlD(self):
        self._testwrite("1 + 1")
        helloWorld = self.wfd(self.recvlineClient.expect(r"\+ 1"))
        yield helloWorld
        helloWorld.getResult()
        self._assertBuffer([">>> 1 + 1"])

        self._testwrite(manhole.CTRL_D + " + 1")
        cleared = self.wfd(self.recvlineClient.expect(r"\+ 1"))
        yield cleared
        cleared.getResult()
        self._assertBuffer([">>> 1 + 1 + 1"])

        self._testwrite("\n")
        printed = self.wfd(self.recvlineClient.expect("3\n>>> "))
        yield printed
        printed.getResult()

        self._testwrite(manhole.CTRL_D)
        d = self.recvlineClient.onDisconnection
        disconnected = self.wfd(self.assertFailure(d, error.ConnectionDone))
        yield disconnected
        disconnected.getResult()
posixbase.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _disconnectSelectable(self, selectable, why, isRead, faildict={
        error.ConnectionDone: failure.Failure(error.ConnectionDone()),
        error.ConnectionLost: failure.Failure(error.ConnectionLost())
        }):
        """Utility function for disconnecting a selectable.

        Supports half-close notification, isRead should be boolean indicating
        whether error resulted from doRead().
        """
        self.removeReader(selectable)
        f = faildict.get(why.__class__)
        if f:
            if (isRead and why.__class__ ==  error.ConnectionDone
                and IHalfCloseableDescriptor.providedBy(selectable)):
                selectable.readConnectionLost(f)
            else:
                self.removeWriter(selectable)
                selectable.connectionLost(f)
        else:
            self.removeWriter(selectable)
            selectable.connectionLost(failure.Failure(why))
test_amp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_helloFatalErrorHandling(self):
        """
        Verify that if a known, fatal error type is raised and handled, it will
        be properly relayed to the other end of the connection and translated
        into an exception, no error will be logged, and the connection will be
        terminated.
        """
        L=[]
        c, s, p = connectedServerAndClient(
            ServerClass=SimpleSymmetricCommandProtocol,
            ClientClass=SimpleSymmetricCommandProtocol)
        HELLO = 'die'
        c.sendHello(HELLO).addErrback(L.append)
        p.flush()
        L.pop().trap(DeathThreat)
        c.sendHello(HELLO).addErrback(L.append)
        p.flush()
        L.pop().trap(error.ConnectionDone)
test_amp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_helloNoErrorHandling(self):
        """
        Verify that if an unknown error type is raised, it will be relayed to
        the other end of the connection and translated into an exception, it
        will be logged, and then the connection will be dropped.
        """
        L=[]
        c, s, p = connectedServerAndClient(
            ServerClass=SimpleSymmetricCommandProtocol,
            ClientClass=SimpleSymmetricCommandProtocol)
        HELLO = THING_I_DONT_UNDERSTAND
        c.sendHello(HELLO).addErrback(L.append)
        p.flush()
        ure = L.pop()
        ure.trap(amp.UnknownRemoteError)
        c.sendHello(HELLO).addErrback(L.append)
        cl = L.pop()
        cl.trap(error.ConnectionDone)
        # The exception should have been logged.
        self.failUnless(self.flushLoggedErrors(ThingIDontUnderstandError))
test_amp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_lateAnswer(self):
        """
        Verify that a command that does not get answered until after the
        connection terminates will not cause any errors.
        """
        c, s, p = connectedServerAndClient(
            ServerClass=SimpleSymmetricCommandProtocol,
            ClientClass=SimpleSymmetricCommandProtocol)
        L = []
        HELLO = 'world'
        c.callRemote(WaitForever).addErrback(L.append)
        p.flush()
        self.assertEquals(L, [])
        s.transport.loseConnection()
        p.flush()
        L.pop().trap(error.ConnectionDone)
        # Just make sure that it doesn't error...
        s.waiting.callback({})
        return s.waiting
test_amp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def test_quitBoxQuits(self):
        """
        Verify that commands with a responseType of QuitBox will in fact
        terminate the connection.
        """
        c, s, p = connectedServerAndClient(
            ServerClass=SimpleSymmetricCommandProtocol,
            ClientClass=SimpleSymmetricCommandProtocol)

        L = []
        HELLO = 'world'
        GOODBYE = 'everyone'
        c.sendHello(HELLO).addCallback(L.append)
        p.flush()
        self.assertEquals(L.pop()['hello'], HELLO)
        c.callRemote(Goodbye).addCallback(L.append)
        p.flush()
        self.assertEquals(L.pop()['goodbye'], GOODBYE)
        c.sendHello(HELLO).addErrback(L.append)
        L.pop().trap(error.ConnectionDone)
test_imap.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testPathelogicalScatteringOfLiterals(self):
        self.server.checker.addUser('testuser', 'password-test')
        transport = StringTransport()
        self.server.makeConnection(transport)

        transport.clear()
        self.server.dataReceived("01 LOGIN {8}\r\n")
        self.assertEquals(transport.value(), "+ Ready for 8 octets of text\r\n")

        transport.clear()
        self.server.dataReceived("testuser {13}\r\n")
        self.assertEquals(transport.value(), "+ Ready for 13 octets of text\r\n")

        transport.clear()
        self.server.dataReceived("password-test\r\n")
        self.assertEquals(transport.value(), "01 OK LOGIN succeeded\r\n")
        self.assertEquals(self.server.state, 'auth')

        self.server.connectionLost(error.ConnectionDone("Connection done."))
e2reactor.py 文件源码 项目:enigma2 作者: Openeight 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _doReadOrWrite(self, selectable, fd, event, POLLIN, POLLOUT, log,
        faildict={
            error.ConnectionDone: failure.Failure(error.ConnectionDone()),
            error.ConnectionLost: failure.Failure(error.ConnectionLost())
        }):
        why = None
        inRead = False
        if event & POLL_DISCONNECTED and not (event & POLLIN):
            why = main.CONNECTION_LOST
        else:
            try:
                if event & POLLIN:
                    why = selectable.doRead()
                    inRead = True
                if not why and event & POLLOUT:
                    why = selectable.doWrite()
                    inRead = False
                if not selectable.fileno() == fd:
                    why = error.ConnectionFdescWentAway('Filedescriptor went away')
                    inRead = False
            except:
                log.deferr()
                why = sys.exc_info()[1]
        if why:
            self._disconnectSelectable(selectable, why, inRead)
test_amp.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_retry_without_retry_on_reconnect(self):
        """
        If C{retryOnReconnect} is C{False}, the L{RemoteObject} object won't
        retry to perform requests which failed because the connection was
        lost, however requests made after a reconnection will still succeed.
        """
        self.client.factor = 0.01  # Try reconnecting very quickly
        connector = reactor.connectUNIX(self.socket, self.client)
        remote = yield self.client.getRemoteObject()

        # Disconnect
        deferred = Deferred()
        self.client.notifyOnConnect(deferred.callback)
        connector.disconnect()

        yield self.assertFailure(remote.modt(), ConnectionDone)

        # Wait for reconnection and peform another call
        yield deferred
        result = yield remote.method("john")
        self.assertEqual(result, "John")

        self.client.stopTrying()
        connector.disconnect()
test_manhole.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def testControlBackslash(self):
        self._testwrite("cancelled line")
        partialLine = self.recvlineClient.expect("cancelled line")

        def gotPartialLine(ign):
            self._assertBuffer(
                [">>> cancelled line"])
            self._testwrite(manhole.CTRL_BACKSLASH)

            d = self.recvlineClient.onDisconnection
            return self.assertFailure(d, error.ConnectionDone)

        def gotClearedLine(ign):
            self._assertBuffer(
                [""])

        return partialLine.addCallback(gotPartialLine).addCallback(gotClearedLine)
test_manhole.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def testControlD(self):
        self._testwrite("1 + 1")
        helloWorld = self.wfd(self.recvlineClient.expect(r"\+ 1"))
        yield helloWorld
        helloWorld.getResult()
        self._assertBuffer([">>> 1 + 1"])

        self._testwrite(manhole.CTRL_D + " + 1")
        cleared = self.wfd(self.recvlineClient.expect(r"\+ 1"))
        yield cleared
        cleared.getResult()
        self._assertBuffer([">>> 1 + 1 + 1"])

        self._testwrite("\n")
        printed = self.wfd(self.recvlineClient.expect("3\n>>> "))
        yield printed
        printed.getResult()

        self._testwrite(manhole.CTRL_D)
        d = self.recvlineClient.onDisconnection
        disconnected = self.wfd(self.assertFailure(d, error.ConnectionDone))
        yield disconnected
        disconnected.getResult()
pollreactor.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _doReadOrWrite(self, selectable, fd, event, POLLIN, POLLOUT, log, 
        faildict={
            error.ConnectionDone: failure.Failure(error.ConnectionDone()),
            error.ConnectionLost: failure.Failure(error.ConnectionLost())
        }):
        why = None
        inRead = False
        if event & POLL_DISCONNECTED and not (event & POLLIN):
            why = main.CONNECTION_LOST
        else:
            try:
                if event & POLLIN:
                    why = selectable.doRead()
                    inRead = True
                if not why and event & POLLOUT:
                    why = selectable.doWrite()
                    inRead = False
                if not selectable.fileno() == fd:
                    why = error.ConnectionFdescWentAway('Filedescriptor went away')
                    inRead = False
            except:
                log.deferr()
                why = sys.exc_info()[1]
        if why:
            self._disconnectSelectable(selectable, why, inRead)
posixbase.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _disconnectSelectable(self, selectable, why, isRead, faildict={
        error.ConnectionDone: failure.Failure(error.ConnectionDone()),
        error.ConnectionLost: failure.Failure(error.ConnectionLost())
        }):
        """Utility function for disconnecting a selectable.

        Supports half-close notification, isRead should be boolean indicating
        whether error resulted from doRead().
        """
        self.removeReader(selectable)
        f = faildict.get(why.__class__)
        if f:
            if (isRead and why.__class__ ==  error.ConnectionDone
                and IHalfCloseableDescriptor.providedBy(selectable)):
                selectable.readConnectionLost(f)
            else:
                self.removeWriter(selectable)
                selectable.connectionLost(f)
        else:
            self.removeWriter(selectable)
            selectable.connectionLost(failure.Failure(why))
test_amp.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_helloFatalErrorHandling(self):
        """
        Verify that if a known, fatal error type is raised and handled, it will
        be properly relayed to the other end of the connection and translated
        into an exception, no error will be logged, and the connection will be
        terminated.
        """
        L=[]
        c, s, p = connectedServerAndClient(
            ServerClass=SimpleSymmetricCommandProtocol,
            ClientClass=SimpleSymmetricCommandProtocol)
        HELLO = 'die'
        c.sendHello(HELLO).addErrback(L.append)
        p.flush()
        L.pop().trap(DeathThreat)
        c.sendHello(HELLO).addErrback(L.append)
        p.flush()
        L.pop().trap(error.ConnectionDone)
test_amp.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_helloNoErrorHandling(self):
        """
        Verify that if an unknown error type is raised, it will be relayed to
        the other end of the connection and translated into an exception, it
        will be logged, and then the connection will be dropped.
        """
        L=[]
        c, s, p = connectedServerAndClient(
            ServerClass=SimpleSymmetricCommandProtocol,
            ClientClass=SimpleSymmetricCommandProtocol)
        HELLO = THING_I_DONT_UNDERSTAND
        c.sendHello(HELLO).addErrback(L.append)
        p.flush()
        ure = L.pop()
        ure.trap(amp.UnknownRemoteError)
        c.sendHello(HELLO).addErrback(L.append)
        cl = L.pop()
        cl.trap(error.ConnectionDone)
        # The exception should have been logged.
        self.failUnless(self.flushLoggedErrors(ThingIDontUnderstandError))
test_amp.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_lateAnswer(self):
        """
        Verify that a command that does not get answered until after the
        connection terminates will not cause any errors.
        """
        c, s, p = connectedServerAndClient(
            ServerClass=SimpleSymmetricCommandProtocol,
            ClientClass=SimpleSymmetricCommandProtocol)
        L = []
        HELLO = 'world'
        c.callRemote(WaitForever).addErrback(L.append)
        p.flush()
        self.assertEquals(L, [])
        s.transport.loseConnection()
        p.flush()
        L.pop().trap(error.ConnectionDone)
        # Just make sure that it doesn't error...
        s.waiting.callback({})
        return s.waiting
test_amp.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def test_quitBoxQuits(self):
        """
        Verify that commands with a responseType of QuitBox will in fact
        terminate the connection.
        """
        c, s, p = connectedServerAndClient(
            ServerClass=SimpleSymmetricCommandProtocol,
            ClientClass=SimpleSymmetricCommandProtocol)

        L = []
        HELLO = 'world'
        GOODBYE = 'everyone'
        c.sendHello(HELLO).addCallback(L.append)
        p.flush()
        self.assertEquals(L.pop()['hello'], HELLO)
        c.callRemote(Goodbye).addCallback(L.append)
        p.flush()
        self.assertEquals(L.pop()['goodbye'], GOODBYE)
        c.sendHello(HELLO).addErrback(L.append)
        L.pop().trap(error.ConnectionDone)
test_imap.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def testPathelogicalScatteringOfLiterals(self):
        self.server.checker.addUser('testuser', 'password-test')
        transport = StringTransport()
        self.server.makeConnection(transport)

        transport.clear()
        self.server.dataReceived("01 LOGIN {8}\r\n")
        self.assertEquals(transport.value(), "+ Ready for 8 octets of text\r\n")

        transport.clear()
        self.server.dataReceived("testuser {13}\r\n")
        self.assertEquals(transport.value(), "+ Ready for 13 octets of text\r\n")

        transport.clear()
        self.server.dataReceived("password-test\r\n")
        self.assertEquals(transport.value(), "01 OK LOGIN succeeded\r\n")
        self.assertEquals(self.server.state, 'auth')

        self.server.connectionLost(error.ConnectionDone("Connection done."))
e2reactor.py 文件源码 项目:enigma2 作者: BlackHole 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _doReadOrWrite(self, selectable, fd, event, POLLIN, POLLOUT, log, faildict=None):
        if not faildict: faildict = {
        error.ConnectionDone: failure.Failure(error.ConnectionDone()),
        error.ConnectionLost: failure.Failure(error.ConnectionLost())
        }
        why = None
        inRead = False
        if event & POLL_DISCONNECTED and not (event & POLLIN):
            why = main.CONNECTION_LOST
        else:
            try:
                if event & POLLIN:
                    why = selectable.doRead()
                    inRead = True
                if not why and event & POLLOUT:
                    why = selectable.doWrite()
                    inRead = False
                if not selectable.fileno() == fd:
                    why = error.ConnectionFdescWentAway('Filedescriptor went away')
                    inRead = False
            except:
                log.deferr()
                why = sys.exc_info()[1]
        if why:
            self._disconnectSelectable(selectable, why, inRead)
test_brokerclient.py 文件源码 项目:afkak 作者: ciena 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def test_stopTryingWhenConnected(self):
        """
        test_stopTryingWhenConnected
        If a L{KafkaBrokerClient} has C{stopTrying} called while it is
        connected, it does not subsequently attempt to reconnect if the
        connection is later lost.
        """
        class NoConnectConnector(object):
            def stopConnecting(self):
                raise ClientError("Shouldn't be called, "
                                  "we're connected.")  # pragma: no cover

            def connect(self):
                raise ClientError(
                    "Shouldn't be reconnecting.")  # pragma: no cover

        c = KafkaBrokerClient('broker')
        c.protocol = Protocol
        # Let's pretend we've connected:
        c.buildProtocol(None)
        # Now we stop trying, then disconnect:
        c.stopTrying()
        c.clientConnectionLost(NoConnectConnector(), Failure(ConnectionDone()))
        self.assertFalse(c.continueTrying)
test_brokerclient.py 文件源码 项目:afkak 作者: ciena 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_close(self):
        reactor = MemoryReactorClock()
        c = KafkaBrokerClient('test_close', reactor=reactor)
        c._connect()  # Force a connection attempt
        c.connector.factory = c  # MemoryReactor doesn't make this connection.
        c.connector.state = 'connected'  # set the connector to connected state
        dd = c.close()
        self.assertIsInstance(dd, Deferred)
        self.assertNoResult(dd)
        f = Failure(ConnectionDone('test_close'))
        c.clientConnectionLost(c.connector, f)
        self.assertNoResult(dd)
        # Advance the clock so the notify() call fires
        reactor.advance(0.1)
        r = self.successResultOf(dd)
        self.assertIs(r, None)
test_manhole.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_ControlBackslash(self):
        """
        Evaluate cancelling with CTRL-\.
        """
        self._testwrite(b"cancelled line")
        partialLine = self.recvlineClient.expect(b"cancelled line")

        def gotPartialLine(ign):
            self._assertBuffer(
                [b">>> cancelled line"])
            self._testwrite(manhole.CTRL_BACKSLASH)

            d = self.recvlineClient.onDisconnection
            return self.assertFailure(d, error.ConnectionDone)

        def gotClearedLine(ign):
            self._assertBuffer(
                [b""])

        return partialLine.addCallback(gotPartialLine).addCallback(
            gotClearedLine)
test_manhole.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_controlD(self):
        """
        A CTRL+D in the middle of a line doesn't close a connection,
        but at the beginning of a line it does.
        """
        self._testwrite(b"1 + 1")
        yield self.recvlineClient.expect(br"\+ 1")
        self._assertBuffer([b">>> 1 + 1"])

        self._testwrite(manhole.CTRL_D + b" + 1")
        yield self.recvlineClient.expect(br"\+ 1")
        self._assertBuffer([b">>> 1 + 1 + 1"])

        self._testwrite(b"\n")
        yield self.recvlineClient.expect(b"3\n>>> ")

        self._testwrite(manhole.CTRL_D)
        d = self.recvlineClient.onDisconnection
        yield self.assertFailure(d, error.ConnectionDone)
test_endpoints.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_connectionClosedBeforeSecure(self):
        """
        If the connection closes at any point before the SSH transport layer
        has finished key exchange (ie, gotten to the point where we may attempt
        to authenticate), the L{Deferred} returned by
        L{SSHCommandClientEndpoint.connect} fires with a L{Failure} wrapping
        the reason for the lost connection.
        """
        endpoint = SSHCommandClientEndpoint.newConnection(
            self.reactor, b"/bin/ls -l", b"dummy user",
            self.hostname, self.port, knownHosts=self.knownHosts,
            ui=FixedResponseUI(False))

        factory = Factory()
        factory.protocol = Protocol
        d = endpoint.connect(factory)

        transport = StringTransport()
        factory = self.reactor.tcpClients[0][2]
        client = factory.buildProtocol(None)
        client.makeConnection(transport)

        client.connectionLost(Failure(ConnectionDone()))
        self.failureResultOf(d).trap(ConnectionDone)
test_session.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_wrapProtocol(self):
        """
        L{wrapProtocol}, when passed a L{Protocol} should return something that
        has write(), writeSequence(), loseConnection() methods which call the
        Protocol's dataReceived() and connectionLost() methods, respectively.
        """
        protocol = MockProtocol()
        protocol.transport = StubTransport()
        protocol.connectionMade()
        wrapped = session.wrapProtocol(protocol)
        wrapped.dataReceived(b'dataReceived')
        self.assertEqual(protocol.transport.buf, b'dataReceived')
        wrapped.write(b'data')
        wrapped.writeSequence([b'1', b'2'])
        wrapped.loseConnection()
        self.assertEqual(protocol.data, b'data12')
        protocol.reason.trap(error.ConnectionDone)
test_newclient.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_unknownContentLength(self):
        """
        If a response does not include a I{Transfer-Encoding} or a
        I{Content-Length}, the end of response body is indicated by the
        connection being closed.
        """
        finished = []
        protocol = HTTPClientParser(
            Request(b'GET', b'/', _boringHeaders, None), finished.append)
        transport = StringTransport()
        protocol.makeConnection(transport)
        protocol.dataReceived(b'HTTP/1.1 200 OK\r\n')

        body = []
        protocol.response._bodyDataReceived = body.append

        protocol.dataReceived(b'\r\n')
        protocol.dataReceived(b'foo')
        protocol.dataReceived(b'bar')
        self.assertEqual(body, [b'foo', b'bar'])
        protocol.connectionLost(ConnectionDone(u"simulated end of connection"))
        self.assertEqual(finished, [b''])
test_newclient.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_failedWriteTo(self):
        """
        If the L{Deferred} returned by L{Request.writeTo} fires with a
        L{Failure}, L{HTTP11ClientProtocol.request} disconnects its transport
        and returns a L{Deferred} which fires with a L{Failure} of
        L{RequestGenerationFailed} wrapping the underlying failure.
        """
        class BrokenRequest:
            persistent = False
            def writeTo(self, transport):
                return fail(ArbitraryException())

        d = self.protocol.request(BrokenRequest())
        def cbFailed(ignored):
            self.assertTrue(self.transport.disconnecting)
            # Simulate what would happen if the protocol had a real transport
            # and make sure no exception is raised.
            self.protocol.connectionLost(
                Failure(ConnectionDone(u"you asked for it")))
        d = assertRequestGenerationFailed(self, d, [ArbitraryException])
        d.addCallback(cbFailed)
        return d
test_newclient.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_parserDataReceivedException(self):
        """
        If the parser L{HTTP11ClientProtocol} delivers bytes to in
        C{dataReceived} raises an exception, the exception is wrapped in a
        L{Failure} and passed to the parser's C{connectionLost} and then the
        L{HTTP11ClientProtocol}'s transport is disconnected.
        """
        requestDeferred = self.protocol.request(Request(b'GET', b'/',
            _boringHeaders, None))
        self.protocol.dataReceived(b'unparseable garbage goes here\r\n')
        d = assertResponseFailed(self, requestDeferred, [ParseError])
        def cbFailed(exc):
            self.assertTrue(self.transport.disconnecting)
            self.assertEqual(
                exc.reasons[0].value.data, b'unparseable garbage goes here')

            # Now do what StringTransport doesn't do but a real transport would
            # have, call connectionLost on the HTTP11ClientProtocol.  Nothing
            # is asserted about this, but it's important for it to not raise an
            # exception.
            self.protocol.connectionLost(Failure(ConnectionDone(u"it is done")))

        d.addCallback(cbFailed)
        return d


问题


面经


文章

微信
公众号

扫码关注公众号