python类Protocol()的实例源码

twisted_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def twisted_coroutine_fetch(self, url, runner):
        body = [None]

        @gen.coroutine
        def f():
            # This is simpler than the non-coroutine version, but it cheats
            # by reading the body in one blob instead of streaming it with
            # a Protocol.
            client = Agent(self.reactor)
            response = yield client.request(b'GET', utf8(url))
            with warnings.catch_warnings():
                # readBody has a buggy DeprecationWarning in Twisted 15.0:
                # https://twistedmatrix.com/trac/changeset/43379
                warnings.simplefilter('ignore', category=DeprecationWarning)
                body[0] = yield readBody(response)
            self.stop_loop()
        self.io_loop.add_callback(f)
        runner()
        return body[0]
twisted_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def twisted_coroutine_fetch(self, url, runner):
        body = [None]

        @gen.coroutine
        def f():
            # This is simpler than the non-coroutine version, but it cheats
            # by reading the body in one blob instead of streaming it with
            # a Protocol.
            client = Agent(self.reactor)
            response = yield client.request(b'GET', utf8(url))
            with warnings.catch_warnings():
                # readBody has a buggy DeprecationWarning in Twisted 15.0:
                # https://twistedmatrix.com/trac/changeset/43379
                warnings.simplefilter('ignore', category=DeprecationWarning)
                body[0] = yield readBody(response)
            self.stop_loop()
        self.io_loop.add_callback(f)
        runner()
        return body[0]
test_ssl.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def testImmediateDisconnect(self):
        org = "twisted.test.test_ssl"
        self.setupServerAndClient(
            (org, org + ", client"), {},
            (org, org + ", server"), {})

        # Set up a server, connect to it with a client, which should work since our verifiers
        # allow anything, then disconnect.
        serverProtocolFactory = protocol.ServerFactory()
        serverProtocolFactory.protocol = protocol.Protocol
        self.serverPort = serverPort = reactor.listenSSL(0, 
            serverProtocolFactory, self.serverCtxFactory)

        clientProtocolFactory = protocol.ClientFactory()
        clientProtocolFactory.protocol = ImmediatelyDisconnectingProtocol
        clientProtocolFactory.connectionDisconnected = defer.Deferred()
        clientConnector = reactor.connectSSL('127.0.0.1', 
            serverPort.getHost().port, clientProtocolFactory, self.clientCtxFactory)

        return clientProtocolFactory.connectionDisconnected.addCallback(
            lambda ignoredResult: self.serverPort.stopListening())
ftp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def receiveFromConnection(self, commands, protocol):
        """
        Retrieves a file or listing generated by the given command,
        feeding it to the given protocol.

        @param command: list of strings of FTP commands to execute then receive
            the results of (e.g. LIST, RETR)
        @param protocol: A L{Protocol} *instance* e.g. an
            L{FTPFileListProtocol}, or something that can be adapted to one.
            Typically this will be an L{IConsumer} implemenation.

        @return: L{Deferred}.
        """
        protocol = interfaces.IProtocol(protocol)
        wrapper = ProtocolWrapper(protocol, defer.Deferred())
        return self._openDataConnection(commands, wrapper)
ftp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def list(self, path, protocol):
        """
        Retrieve a file listing into the given protocol instance.

        This method issues the 'LIST' FTP command.

        @param path: path to get a file listing for.
        @param protocol: a L{Protocol} instance, probably a
            L{FTPFileListProtocol} instance.  It can cope with most common file
            listing formats.

        @return: L{Deferred}
        """
        if path is None:
            path = ''
        return self.receiveFromConnection(['LIST ' + self.escapePath(path)], protocol)
test_ssl.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testImmediateDisconnect(self):
        org = "twisted.test.test_ssl"
        self.setupServerAndClient(
            (org, org + ", client"), {},
            (org, org + ", server"), {})

        # Set up a server, connect to it with a client, which should work since our verifiers
        # allow anything, then disconnect.
        serverProtocolFactory = protocol.ServerFactory()
        serverProtocolFactory.protocol = protocol.Protocol
        self.serverPort = serverPort = reactor.listenSSL(0, 
            serverProtocolFactory, self.serverCtxFactory)

        clientProtocolFactory = protocol.ClientFactory()
        clientProtocolFactory.protocol = ImmediatelyDisconnectingProtocol
        clientProtocolFactory.connectionDisconnected = defer.Deferred()
        clientConnector = reactor.connectSSL('127.0.0.1', 
            serverPort.getHost().port, clientProtocolFactory, self.clientCtxFactory)

        return clientProtocolFactory.connectionDisconnected.addCallback(
            lambda ignoredResult: self.serverPort.stopListening())
ftp.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def receiveFromConnection(self, commands, protocol):
        """
        Retrieves a file or listing generated by the given command,
        feeding it to the given protocol.

        @param command: list of strings of FTP commands to execute then receive
            the results of (e.g. LIST, RETR)
        @param protocol: A L{Protocol} *instance* e.g. an
            L{FTPFileListProtocol}, or something that can be adapted to one.
            Typically this will be an L{IConsumer} implemenation.

        @return: L{Deferred}.
        """
        protocol = interfaces.IProtocol(protocol)
        wrapper = ProtocolWrapper(protocol, defer.Deferred())
        return self._openDataConnection(commands, wrapper)
ftp.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def list(self, path, protocol):
        """
        Retrieve a file listing into the given protocol instance.

        This method issues the 'LIST' FTP command.

        @param path: path to get a file listing for.
        @param protocol: a L{Protocol} instance, probably a
            L{FTPFileListProtocol} instance.  It can cope with most common file
            listing formats.

        @return: L{Deferred}
        """
        if path is None:
            path = ''
        return self.receiveFromConnection(['LIST ' + self.escapePath(path)], protocol)
twisted_test.py 文件源码 项目:My-Web-Server-Framework-With-Python2.7 作者: syjsu 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def twisted_coroutine_fetch(self, url, runner):
        body = [None]

        @gen.coroutine
        def f():
            # This is simpler than the non-coroutine version, but it cheats
            # by reading the body in one blob instead of streaming it with
            # a Protocol.
            client = Agent(self.reactor)
            response = yield client.request(b'GET', utf8(url))
            with warnings.catch_warnings():
                # readBody has a buggy DeprecationWarning in Twisted 15.0:
                # https://twistedmatrix.com/trac/changeset/43379
                warnings.simplefilter('ignore', category=DeprecationWarning)
                body[0] = yield readBody(response)
            self.stop_loop()
        self.io_loop.add_callback(f)
        runner()
        return body[0]
twisted_test.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def twisted_coroutine_fetch(self, url, runner):
        body = [None]

        @gen.coroutine
        def f():
            # This is simpler than the non-coroutine version, but it cheats
            # by reading the body in one blob instead of streaming it with
            # a Protocol.
            client = Agent(self.reactor)
            response = yield client.request(b'GET', utf8(url))
            with warnings.catch_warnings():
                # readBody has a buggy DeprecationWarning in Twisted 15.0:
                # https://twistedmatrix.com/trac/changeset/43379
                warnings.simplefilter('ignore', category=DeprecationWarning)
                body[0] = yield readBody(response)
            self.stop_loop()
        self.io_loop.add_callback(f)
        runner()
        return body[0]
twisted_test.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def twisted_coroutine_fetch(self, url, runner):
        body = [None]

        @gen.coroutine
        def f():
            # This is simpler than the non-coroutine version, but it cheats
            # by reading the body in one blob instead of streaming it with
            # a Protocol.
            client = Agent(self.reactor)
            response = yield client.request(b'GET', utf8(url))
            with warnings.catch_warnings():
                # readBody has a buggy DeprecationWarning in Twisted 15.0:
                # https://twistedmatrix.com/trac/changeset/43379
                warnings.simplefilter('ignore', category=DeprecationWarning)
                body[0] = yield readBody(response)
            self.stop_loop()
        self.io_loop.add_callback(f)
        runner()
        return body[0]
twisted_test.py 文件源码 项目:projects-2017-2 作者: ncss 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def twisted_coroutine_fetch(self, url, runner):
        body = [None]

        @gen.coroutine
        def f():
            # This is simpler than the non-coroutine version, but it cheats
            # by reading the body in one blob instead of streaming it with
            # a Protocol.
            client = Agent(self.reactor)
            response = yield client.request(b'GET', utf8(url))
            with warnings.catch_warnings():
                # readBody has a buggy DeprecationWarning in Twisted 15.0:
                # https://twistedmatrix.com/trac/changeset/43379
                warnings.simplefilter('ignore', category=DeprecationWarning)
                body[0] = yield readBody(response)
            self.stop_loop()
        self.io_loop.add_callback(f)
        runner()
        return body[0]
forwarding.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _setClient(self, client):
        """
        Called when the connection was established to the forwarding
        destination.

        @param client: Client protocol connected to the forwarding destination.
        @type  client: L{protocol.Protocol}
        """
        self.client = client
        log.msg("connected to %s:%i" % self.hostport)
        if self.clientBuf:
            self.client.transport.write(self.clientBuf)
            self.clientBuf = None
        if self.client.buf[1:]:
            self.write(self.client.buf[1:])
        self.client.buf = b''
test_session.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_wrapProtocol(self):
        """
        L{wrapProtocol}, when passed a L{Protocol} should return something that
        has write(), writeSequence(), loseConnection() methods which call the
        Protocol's dataReceived() and connectionLost() methods, respectively.
        """
        protocol = MockProtocol()
        protocol.transport = StubTransport()
        protocol.connectionMade()
        wrapped = session.wrapProtocol(protocol)
        wrapped.dataReceived(b'dataReceived')
        self.assertEqual(protocol.transport.buf, b'dataReceived')
        wrapped.write(b'data')
        wrapped.writeSequence([b'1', b'2'])
        wrapped.loseConnection()
        self.assertEqual(protocol.data, b'data12')
        protocol.reason.trap(error.ConnectionDone)
test_session.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_wrapProcessProtocol_Protocol(self):
        """
        L{wrapPRocessProtocol}, when passed a L{Protocol} should return
        something that follows the L{IProcessProtocol} interface, with
        connectionMade() mapping to connectionMade(), outReceived() mapping to
        dataReceived() and processEnded() mapping to connectionLost().
        """
        protocol = MockProtocol()
        protocol.transport = StubTransport()
        process_protocol = session.wrapProcessProtocol(protocol)
        process_protocol.connectionMade()
        process_protocol.outReceived(b'data')
        self.assertEqual(protocol.transport.buf, b'data~')
        process_protocol.processEnded(failure.Failure(
            error.ProcessTerminated(0, None, None)))
        protocol.reason.trap(error.ProcessTerminated)
test_newclient.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_makeConnection(self):
        """
        The L{IProtocol} provider passed to L{Response.deliverBody} has its
        C{makeConnection} method called with an L{IPushProducer} provider
        hooked up to the response as an argument.
        """
        producers = []
        transport = StringTransport()
        class SomeProtocol(Protocol):
            def makeConnection(self, producer):
                producers.append(producer)

        consumer = SomeProtocol()
        response = justTransportResponse(transport)
        response.deliverBody(consumer)
        [theProducer] = producers
        theProducer.pauseProducing()
        self.assertEqual(transport.producerState, u'paused')
        theProducer.resumeProducing()
        self.assertEqual(transport.producerState, u'producing')
test_newclient.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_dataReceived(self):
        """
        The L{IProtocol} provider passed to L{Response.deliverBody} has its
        C{dataReceived} method called with bytes received as part of the
        response body.
        """
        bytes = []
        class ListConsumer(Protocol):
            def dataReceived(self, data):
                bytes.append(data)


        consumer = ListConsumer()
        response = justTransportResponse(StringTransport())
        response.deliverBody(consumer)

        response._bodyDataReceived(b'foo')
        self.assertEqual(bytes, [b'foo'])
test_newclient.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_connectionLost(self):
        """
        The L{IProtocol} provider passed to L{Response.deliverBody} has its
        C{connectionLost} method called with a L{Failure} wrapping
        L{ResponseDone} when the response's C{_bodyDataFinished} method is
        called.
        """
        lost = []
        class ListConsumer(Protocol):
            def connectionLost(self, reason):
                lost.append(reason)

        consumer = ListConsumer()
        response = justTransportResponse(StringTransport())
        response.deliverBody(consumer)

        response._bodyDataFinished()
        lost[0].trap(ResponseDone)
        self.assertEqual(len(lost), 1)

        # The protocol reference should be dropped, too, to facilitate GC or
        # whatever.
        self.assertIdentical(response._bodyProtocol, None)
test_newclient.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_bufferEarlyData(self):
        """
        If data is delivered to the L{Response} before a protocol is registered
        with C{deliverBody}, that data is buffered until the protocol is
        registered and then is delivered.
        """
        bytes = []
        class ListConsumer(Protocol):
            def dataReceived(self, data):
                bytes.append(data)

        protocol = ListConsumer()
        response = justTransportResponse(StringTransport())
        response._bodyDataReceived(b'foo')
        response._bodyDataReceived(b'bar')
        response.deliverBody(protocol)
        response._bodyDataReceived(b'baz')
        self.assertEqual(bytes, [b'foo', b'bar', b'baz'])
        # Make sure the implementation-detail-byte-buffer is cleared because
        # not clearing it wastes memory.
        self.assertIdentical(response._bodyBuffer, None)
test_newclient.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_transportResumed(self):
        """
        L{Response.deliverBody} resumes the HTTP connection's transport
        after passing it to the consumer's C{makeConnection} method.
        """
        transportState = []
        class ListConsumer(Protocol):
            def makeConnection(self, transport):
                transportState.append(transport.producerState)

        transport = StringTransport()
        transport.pauseProducing()
        protocol = ListConsumer()
        response = justTransportResponse(transport)
        self.assertEqual(transport.producerState, u'paused')
        response.deliverBody(protocol)
        self.assertEqual(transportState, [u'paused'])
        self.assertEqual(transport.producerState, u'producing')
test_stdio.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_removeReader(self):
        """
        Removing a filesystem file reader from a reactor will make sure it is
        no longer polled.
        """
        reactor = self.buildReactor()
        self.addCleanup(self.unbuildReactor, reactor)

        path = self.mktemp()
        open(path, "wb").close()

        with open(path, "rb") as f:
            # Have the reader added:
            stdio = StandardIO(Protocol(), stdin=f.fileno(),
                               stdout=self.extraFile.fileno(),
                               reactor=reactor)
            self.assertIn(stdio._reader, reactor.getReaders())
            stdio._reader.stopReading()
            self.assertNotIn(stdio._reader, reactor.getReaders())
test_stdio.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_removeWriter(self):
        """
        Removing a filesystem file writer from a reactor will make sure it is
        no longer polled.
        """
        reactor = self.buildReactor()
        self.addCleanup(self.unbuildReactor, reactor)

        # Cleanup might fail if file is GCed too soon:
        self.f = f = open(self.mktemp(), "wb")

        # Have the reader added:
        protocol = Protocol()
        stdio = StandardIO(protocol, stdout=f.fileno(),
                           stdin=self.extraFile.fileno(),
                           reactor=reactor)
        protocol.transport.write(b"hello")
        self.assertIn(stdio._writer, reactor.getWriters())
        stdio._writer.stopWriting()
        self.assertNotIn(stdio._writer, reactor.getWriters())
test_stdio.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_removeAll(self):
        """
        Calling C{removeAll} on a reactor includes descriptors that are
        filesystem files.
        """
        reactor = self.buildReactor()
        self.addCleanup(self.unbuildReactor, reactor)

        path = self.mktemp()
        open(path, "wb").close()

        # Cleanup might fail if file is GCed too soon:
        self.f = f = open(path, "rb")

        # Have the reader added:
        stdio = StandardIO(Protocol(), stdin=f.fileno(),
                           stdout=self.extraFile.fileno(), reactor=reactor)
        # And then removed:
        removed = reactor.removeAll()
        self.assertIn(stdio._reader, removed)
        self.assertNotIn(stdio._reader, reactor.getReaders())
test_stdio.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_getReaders(self):
        """
        C{reactor.getReaders} includes descriptors that are filesystem files.
        """
        reactor = self.buildReactor()
        self.addCleanup(self.unbuildReactor, reactor)

        path = self.mktemp()
        open(path, "wb").close()

        # Cleanup might fail if file is GCed too soon:
        with open(path, "rb") as f:
            # Have the reader added:
            stdio = StandardIO(Protocol(), stdin=f.fileno(),
                               stdout=self.extraFile.fileno(), reactor=reactor)
            self.assertIn(stdio._reader, reactor.getReaders())
test_stdio.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_getWriters(self):
        """
        C{reactor.getWriters} includes descriptors that are filesystem files.
        """
        reactor = self.buildReactor()
        self.addCleanup(self.unbuildReactor, reactor)

        # Cleanup might fail if file is GCed too soon:
        self.f = f = open(self.mktemp(), "wb")

        # Have the reader added:
        stdio = StandardIO(Protocol(), stdout=f.fileno(),
                           stdin=self.extraFile.fileno(), reactor=reactor)
        self.assertNotIn(stdio._writer, reactor.getWriters())
        stdio._writer.startWriting()
        self.assertIn(stdio._writer, reactor.getWriters())
test_endpoints.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def setUp(self):
        """
        Construct a L{StandardIOEndpoint} with a dummy reactor and a fake
        L{stdio.StandardIO} like object.  Listening on it with a
        L{SpecificFactory}.
        """
        self.reactor = object()
        endpoint = endpoints.StandardIOEndpoint(self.reactor)
        self.assertIs(endpoint._stdio, stdio.StandardIO)

        endpoint._stdio = FakeStdio
        self.specificProtocol = Protocol()

        self.fakeStdio = self.successResultOf(
            endpoint.listen(SpecificFactory(self.specificProtocol))
        )
test_endpoints.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_deferBadEncodingToConnect(self):
        """
        Since any client of L{IStreamClientEndpoint} needs to handle Deferred
        failures from C{connect}, L{HostnameEndpoint}'s constructor will not
        raise exceptions when given bad host names, instead deferring to
        returning a failing L{Deferred} from C{connect}.
        """
        endpoint = endpoints.HostnameEndpoint(
            deterministicResolvingReactor(MemoryReactor(), ['127.0.0.1']),
            b'\xff-garbage-\xff', 80
        )
        deferred = endpoint.connect(Factory.forProtocol(Protocol))
        err = self.failureResultOf(deferred, ValueError)
        self.assertIn("\\xff-garbage-\\xff", str(err))
        endpoint = endpoints.HostnameEndpoint(
            deterministicResolvingReactor(MemoryReactor(), ['127.0.0.1']),
            u'\u2ff0-garbage-\u2ff0', 80
        )
        deferred = endpoint.connect(Factory())
        err = self.failureResultOf(deferred, ValueError)
        self.assertIn("\\u2ff0-garbage-\\u2ff0", str(err))
test_ftp.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_Year(self):
        """
        This example derived from bug description in issue 514.

        @return: L{Deferred} of command response
        """
        fileList = ftp.FTPFileListProtocol()
        exampleLine = (
            b'-rw-r--r--   1 root     other        531 Jan 29 2003 README\n')
        class PrintLine(protocol.Protocol):
            def connectionMade(self):
                self.transport.write(exampleLine)
                self.transport.loseConnection()

        def check(ignored):
            file = fileList.files[0]
            self.assertTrue(file['size'] == 531, 'misparsed fileitem')
            self.assertTrue(file['date'] ==
                            'Jan 29 2003', 'misparsed fileitem')
            self.assertTrue(file['filename'] == 'README', 'misparsed fileitem')

        d = loopback.loopbackAsync(PrintLine(), fileList)
        return d.addCallback(check)
test_ssl.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testImmediateDisconnect(self):
        org = "twisted.test.test_ssl"
        self.setupServerAndClient(
            (org, org + ", client"), {},
            (org, org + ", server"), {})

        # Set up a server, connect to it with a client, which should work since our verifiers
        # allow anything, then disconnect.
        serverProtocolFactory = protocol.ServerFactory()
        serverProtocolFactory.protocol = protocol.Protocol
        self.serverPort = serverPort = reactor.listenSSL(0,
            serverProtocolFactory, self.serverCtxFactory)

        clientProtocolFactory = protocol.ClientFactory()
        clientProtocolFactory.protocol = ImmediatelyDisconnectingProtocol
        clientProtocolFactory.connectionDisconnected = defer.Deferred()
        reactor.connectSSL('127.0.0.1',
            serverPort.getHost().port, clientProtocolFactory, self.clientCtxFactory)

        return clientProtocolFactory.connectionDisconnected.addCallback(
            lambda ignoredResult: self.serverPort.stopListening())
test_policies.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def testConnectionCounting(self):
        # Make a basic factory
        factory = policies.LimitTotalConnectionsFactory()
        factory.protocol = protocol.Protocol

        # connectionCount starts at zero
        self.assertEqual(0, factory.connectionCount)

        # connectionCount increments as connections are made
        p1 = factory.buildProtocol(None)
        self.assertEqual(1, factory.connectionCount)
        p2 = factory.buildProtocol(None)
        self.assertEqual(2, factory.connectionCount)

        # and decrements as they are lost
        p1.connectionLost(None)
        self.assertEqual(1, factory.connectionCount)
        p2.connectionLost(None)
        self.assertEqual(0, factory.connectionCount)


问题


面经


文章

微信
公众号

扫码关注公众号