python类Protocol()的实例源码

ftp.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def receiveFromConnection(self, commands, protocol):
        """
        Retrieves a file or listing generated by the given command,
        feeding it to the given protocol.

        @param commands: list of strings of FTP commands to execute then receive
            the results of (e.g. C{LIST}, C{RETR})
        @param protocol: A L{Protocol} B{instance} e.g. an
            L{FTPFileListProtocol}, or something that can be adapted to one.
            Typically this will be an L{IConsumer} implementation.

        @return: L{Deferred}.
        """
        protocol = interfaces.IProtocol(protocol)
        wrapper = ProtocolWrapper(protocol, defer.Deferred())
        return self._openDataConnection(commands, wrapper)
ftp.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def retrieveFile(self, path, protocol, offset=0):
        """
        Retrieve a file from the given path

        This method issues the 'RETR' FTP command.

        The file is fed into the given Protocol instance.  The data connection
        will be passive if self.passive is set.

        @param path: path to file that you wish to receive.
        @param protocol: a L{Protocol} instance.
        @param offset: offset to start downloading from

        @return: L{Deferred}
        """
        cmds = ['RETR ' + self.escapePath(path)]
        if offset:
            cmds.insert(0, ('REST ' + str(offset)))
        return self.receiveFromConnection(cmds, protocol)
ftp.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def list(self, path, protocol):
        """
        Retrieve a file listing into the given protocol instance.

        This method issues the 'LIST' FTP command.

        @param path: path to get a file listing for.
        @param protocol: a L{Protocol} instance, probably a
            L{FTPFileListProtocol} instance.  It can cope with most common file
            listing formats.

        @return: L{Deferred}
        """
        if path is None:
            path = ''
        return self.receiveFromConnection(['LIST ' + self.escapePath(path)], protocol)
test_tls.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_wrappedProtocolInterfaces(self):
        """
        L{TLSMemoryBIOProtocol} instances provide the interfaces provided by
        the transport they wrap.
        """
        class ITransport(Interface):
            pass

        class MyTransport(object):
            def write(self, data):
                pass

        clientFactory = ClientFactory()
        contextFactory = ClientTLSContext()
        wrapperFactory = TLSMemoryBIOFactory(
            contextFactory, True, clientFactory)

        transport = MyTransport()
        directlyProvides(transport, ITransport)
        tlsProtocol = TLSMemoryBIOProtocol(wrapperFactory, Protocol())
        tlsProtocol.makeConnection(transport)
        self.assertTrue(ITransport.providedBy(tlsProtocol))
test_tls.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_getHandle(self):
        """
        L{TLSMemoryBIOProtocol.getHandle} returns the L{OpenSSL.SSL.Connection}
        instance it uses to actually implement TLS.

        This may seem odd.  In fact, it is.  The L{OpenSSL.SSL.Connection} is
        not actually the "system handle" here, nor even an object the reactor
        knows about directly.  However, L{twisted.internet.ssl.Certificate}'s
        C{peerFromTransport} and C{hostFromTransport} methods depend on being
        able to get an L{OpenSSL.SSL.Connection} object in order to work
        properly.  Implementing L{ISystemHandle.getHandle} like this is the
        easiest way for those APIs to be made to work.  If they are changed,
        then it may make sense to get rid of this implementation of
        L{ISystemHandle} and return the underlying socket instead.
        """
        factory = ClientFactory()
        contextFactory = ClientTLSContext()
        wrapperFactory = TLSMemoryBIOFactory(contextFactory, True, factory)
        proto = TLSMemoryBIOProtocol(wrapperFactory, Protocol())
        transport = StringTransport()
        proto.makeConnection(transport)
        self.assertIsInstance(proto.getHandle(), ConnectionType)
test_tls.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_makeConnection(self):
        """
        When L{TLSMemoryBIOProtocol} is connected to a transport, it connects
        the protocol it wraps to a transport.
        """
        clientProtocol = Protocol()
        clientFactory = ClientFactory()
        clientFactory.protocol = lambda: clientProtocol

        contextFactory = ClientTLSContext()
        wrapperFactory = TLSMemoryBIOFactory(
            contextFactory, True, clientFactory)
        sslProtocol = wrapperFactory.buildProtocol(None)

        transport = StringTransport()
        sslProtocol.makeConnection(transport)

        self.assertIsNotNone(clientProtocol.transport)
        self.assertIsNot(clientProtocol.transport, transport)
        self.assertIs(clientProtocol.transport, sslProtocol)
test_tls.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def handshakeProtocols(self):
        """
        Start handshake between TLS client and server.
        """
        clientFactory = ClientFactory()
        clientFactory.protocol = Protocol

        clientContextFactory, handshakeDeferred = (
            HandshakeCallbackContextFactory.factoryAndDeferred())
        wrapperFactory = TLSMemoryBIOFactory(
            clientContextFactory, True, clientFactory)
        sslClientProtocol = wrapperFactory.buildProtocol(None)

        serverFactory = ServerFactory()
        serverFactory.protocol = Protocol

        serverContextFactory = ServerTLSContext()
        wrapperFactory = TLSMemoryBIOFactory(
            serverContextFactory, False, serverFactory)
        sslServerProtocol = wrapperFactory.buildProtocol(None)

        connectionDeferred = loopbackAsync(sslServerProtocol, sslClientProtocol)
        return (sslClientProtocol, sslServerProtocol, handshakeDeferred,
                connectionDeferred)
test_tls.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_writeUnicodeRaisesTypeError(self):
        """
        Writing C{unicode} to L{TLSMemoryBIOProtocol} throws a C{TypeError}.
        """
        notBytes = u"hello"
        result = []
        class SimpleSendingProtocol(Protocol):
            def connectionMade(self):
                try:
                    self.transport.write(notBytes)
                except TypeError:
                    result.append(True)
                self.transport.write(b"bytes")
                self.transport.loseConnection()
        d = self.writeBeforeHandshakeTest(SimpleSendingProtocol, b"bytes")
        return d.addCallback(lambda ign: self.assertEqual(result, [True]))
twisted_test.py 文件源码 项目:PyQYT 作者: collinsctk 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def twisted_coroutine_fetch(self, url, runner):
        body = [None]

        @gen.coroutine
        def f():
            # This is simpler than the non-coroutine version, but it cheats
            # by reading the body in one blob instead of streaming it with
            # a Protocol.
            client = Agent(self.reactor)
            response = yield client.request(b'GET', utf8(url))
            with warnings.catch_warnings():
                # readBody has a buggy DeprecationWarning in Twisted 15.0:
                # https://twistedmatrix.com/trac/changeset/43379
                warnings.simplefilter('ignore', category=DeprecationWarning)
                body[0] = yield readBody(response)
            self.stop_loop()
        self.io_loop.add_callback(f)
        runner()
        return body[0]
sse_protocol.py 文件源码 项目:marathon-acme 作者: praekeltfoundation 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _abortConnection(self):
        """
        We need a way to close the connection when an event line is too long
        or if we time out waiting for an event. This is normally done by
        calling :meth:`~twisted.internet.interfaces.ITransport.loseConnection``
        or :meth:`~twisted.internet.interfaces.ITCPTransport.abortConnection`,
        but newer versions of Twisted make this complicated.

        Despite what the documentation says for
        :class:`twisted.internet.protocol.Protocol`, the ``transport``
        attribute is not necessarily a
        :class:`twisted.internet.interfaces.ITransport`. Looking at the
        documentation for :class:`twisted.internet.interfaces.IProtocol`, the
        ``transport`` attribute is actually not defined and neither is the
        type of the ``transport`` parameter to
        :meth:`~twisted.internet.interfaces.IProtocol.makeConnection`.

        ``SseProtocol`` will most often be used with HTTP requests initiated
        with :class:`twisted.web.client.Agent` which, in newer versions of
        Twisted, ends up giving us a
        :class:`twisted.web._newclient.TransportProxyProducer` for our
        ``transport``. This is just a
        :class:`twisted.internet.interfaces.IPushProducer` that wraps the
        actual transport. If our transport is one of these, try call
        ``abortConnection()`` on the underlying transport.
        """
        transport = self.transport
        if isinstance(transport, TransportProxyProducer):
            transport = transport._producer

        if hasattr(transport, 'abortConnection'):
            transport.abortConnection()
        else:
            self.log.error(
                'Transport {} has no abortConnection method'.format(transport))
twisted_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def twisted_fetch(self, url, runner):
        # http://twistedmatrix.com/documents/current/web/howto/client.html
        chunks = []
        client = Agent(self.reactor)
        d = client.request(b'GET', utf8(url))

        class Accumulator(Protocol):
            def __init__(self, finished):
                self.finished = finished

            def dataReceived(self, data):
                chunks.append(data)

            def connectionLost(self, reason):
                self.finished.callback(None)

        def callback(response):
            finished = Deferred()
            response.deliverBody(Accumulator(finished))
            return finished
        d.addCallback(callback)

        def shutdown(failure):
            if hasattr(self, 'stop_loop'):
                self.stop_loop()
            elif failure is not None:
                # loop hasn't been initialized yet; try our best to
                # get an error message out. (the runner() interaction
                # should probably be refactored).
                try:
                    failure.raiseException()
                except:
                    logging.error('exception before starting loop', exc_info=True)
        d.addBoth(shutdown)
        runner()
        self.assertTrue(chunks)
        return ''.join(chunks)
twisted_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def twisted_fetch(self, url, runner):
        # http://twistedmatrix.com/documents/current/web/howto/client.html
        chunks = []
        client = Agent(self.reactor)
        d = client.request(b'GET', utf8(url))

        class Accumulator(Protocol):
            def __init__(self, finished):
                self.finished = finished

            def dataReceived(self, data):
                chunks.append(data)

            def connectionLost(self, reason):
                self.finished.callback(None)

        def callback(response):
            finished = Deferred()
            response.deliverBody(Accumulator(finished))
            return finished
        d.addCallback(callback)

        def shutdown(failure):
            if hasattr(self, 'stop_loop'):
                self.stop_loop()
            elif failure is not None:
                # loop hasn't been initialized yet; try our best to
                # get an error message out. (the runner() interaction
                # should probably be refactored).
                try:
                    failure.raiseException()
                except:
                    logging.error('exception before starting loop', exc_info=True)
        d.addBoth(shutdown)
        runner()
        self.assertTrue(chunks)
        return ''.join(chunks)
twisted_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def twisted_fetch(self, url, runner):
        # http://twistedmatrix.com/documents/current/web/howto/client.html
        chunks = []
        client = Agent(self.reactor)
        d = client.request(b'GET', utf8(url))

        class Accumulator(Protocol):
            def __init__(self, finished):
                self.finished = finished

            def dataReceived(self, data):
                chunks.append(data)

            def connectionLost(self, reason):
                self.finished.callback(None)

        def callback(response):
            finished = Deferred()
            response.deliverBody(Accumulator(finished))
            return finished
        d.addCallback(callback)

        def shutdown(failure):
            if hasattr(self, 'stop_loop'):
                self.stop_loop()
            elif failure is not None:
                # loop hasn't been initialized yet; try our best to
                # get an error message out. (the runner() interaction
                # should probably be refactored).
                try:
                    failure.raiseException()
                except:
                    logging.error('exception before starting loop', exc_info=True)
        d.addBoth(shutdown)
        runner()
        self.assertTrue(chunks)
        return ''.join(chunks)
vnc.py 文件源码 项目:darkc0de-old-stuff 作者: tuwid 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def do_ProtocolVersion(self):
        # Remove the servers version and return our own
        print('Protocol version recieved "%s"' % (''.join(self.read_buffer[:11].tostring()), ))
        self.read_buffer = self.read_buffer[12:]
        self.transport.write('RFB 003.003\n')
        self.state = 'Authentication'
basesupport.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, account, chatui, logonDeferred):
        for base in self.__class__.__bases__:
            if issubclass(base, Protocol):
                self.__class__._protoBase = base
                break
        else:
            pass
        self.account = account
        self.chat = chatui
        self._logonDeferred = logonDeferred
xmlstream.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _initializeStream(self):
        """ Sets up XML Parser. """
        self.stream = domish.elementStream()
        self.stream.DocumentStartEvent = self.onDocumentStart
        self.stream.ElementEvent = self.onElement
        self.stream.DocumentEndEvent = self.onDocumentEnd

    ### --------------------------------------------------------------
    ###
    ### Protocol events
    ###
    ### --------------------------------------------------------------
irc.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def set_overwrite(self, boolean):
        """May I overwrite existing files?
        """
        self.overwrite = boolean


    # Protocol-level methods.
session.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def wrapProcessProtocol(inst):
    if isinstance(inst, protocol.Protocol):
        return _ProtocolWrapper(inst)
    else:
        return inst
test_tcp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def makeConnection(self, transport):
        """
        Save the platform-specific socket handle for future
        introspection.
        """
        self.handle = transport.getHandle()
        return protocol.Protocol.makeConnection(self, transport)


问题


面经


文章

微信
公众号

扫码关注公众号