def receiveFromConnection(self, commands, protocol):
"""
Retrieves a file or listing generated by the given command,
feeding it to the given protocol.
@param commands: list of strings of FTP commands to execute then receive
the results of (e.g. C{LIST}, C{RETR})
@param protocol: A L{Protocol} B{instance} e.g. an
L{FTPFileListProtocol}, or something that can be adapted to one.
Typically this will be an L{IConsumer} implementation.
@return: L{Deferred}.
"""
protocol = interfaces.IProtocol(protocol)
wrapper = ProtocolWrapper(protocol, defer.Deferred())
return self._openDataConnection(commands, wrapper)
python类Protocol()的实例源码
def retrieveFile(self, path, protocol, offset=0):
"""
Retrieve a file from the given path
This method issues the 'RETR' FTP command.
The file is fed into the given Protocol instance. The data connection
will be passive if self.passive is set.
@param path: path to file that you wish to receive.
@param protocol: a L{Protocol} instance.
@param offset: offset to start downloading from
@return: L{Deferred}
"""
cmds = ['RETR ' + self.escapePath(path)]
if offset:
cmds.insert(0, ('REST ' + str(offset)))
return self.receiveFromConnection(cmds, protocol)
def list(self, path, protocol):
"""
Retrieve a file listing into the given protocol instance.
This method issues the 'LIST' FTP command.
@param path: path to get a file listing for.
@param protocol: a L{Protocol} instance, probably a
L{FTPFileListProtocol} instance. It can cope with most common file
listing formats.
@return: L{Deferred}
"""
if path is None:
path = ''
return self.receiveFromConnection(['LIST ' + self.escapePath(path)], protocol)
def test_wrappedProtocolInterfaces(self):
"""
L{TLSMemoryBIOProtocol} instances provide the interfaces provided by
the transport they wrap.
"""
class ITransport(Interface):
pass
class MyTransport(object):
def write(self, data):
pass
clientFactory = ClientFactory()
contextFactory = ClientTLSContext()
wrapperFactory = TLSMemoryBIOFactory(
contextFactory, True, clientFactory)
transport = MyTransport()
directlyProvides(transport, ITransport)
tlsProtocol = TLSMemoryBIOProtocol(wrapperFactory, Protocol())
tlsProtocol.makeConnection(transport)
self.assertTrue(ITransport.providedBy(tlsProtocol))
def test_getHandle(self):
"""
L{TLSMemoryBIOProtocol.getHandle} returns the L{OpenSSL.SSL.Connection}
instance it uses to actually implement TLS.
This may seem odd. In fact, it is. The L{OpenSSL.SSL.Connection} is
not actually the "system handle" here, nor even an object the reactor
knows about directly. However, L{twisted.internet.ssl.Certificate}'s
C{peerFromTransport} and C{hostFromTransport} methods depend on being
able to get an L{OpenSSL.SSL.Connection} object in order to work
properly. Implementing L{ISystemHandle.getHandle} like this is the
easiest way for those APIs to be made to work. If they are changed,
then it may make sense to get rid of this implementation of
L{ISystemHandle} and return the underlying socket instead.
"""
factory = ClientFactory()
contextFactory = ClientTLSContext()
wrapperFactory = TLSMemoryBIOFactory(contextFactory, True, factory)
proto = TLSMemoryBIOProtocol(wrapperFactory, Protocol())
transport = StringTransport()
proto.makeConnection(transport)
self.assertIsInstance(proto.getHandle(), ConnectionType)
def test_makeConnection(self):
"""
When L{TLSMemoryBIOProtocol} is connected to a transport, it connects
the protocol it wraps to a transport.
"""
clientProtocol = Protocol()
clientFactory = ClientFactory()
clientFactory.protocol = lambda: clientProtocol
contextFactory = ClientTLSContext()
wrapperFactory = TLSMemoryBIOFactory(
contextFactory, True, clientFactory)
sslProtocol = wrapperFactory.buildProtocol(None)
transport = StringTransport()
sslProtocol.makeConnection(transport)
self.assertIsNotNone(clientProtocol.transport)
self.assertIsNot(clientProtocol.transport, transport)
self.assertIs(clientProtocol.transport, sslProtocol)
def handshakeProtocols(self):
"""
Start handshake between TLS client and server.
"""
clientFactory = ClientFactory()
clientFactory.protocol = Protocol
clientContextFactory, handshakeDeferred = (
HandshakeCallbackContextFactory.factoryAndDeferred())
wrapperFactory = TLSMemoryBIOFactory(
clientContextFactory, True, clientFactory)
sslClientProtocol = wrapperFactory.buildProtocol(None)
serverFactory = ServerFactory()
serverFactory.protocol = Protocol
serverContextFactory = ServerTLSContext()
wrapperFactory = TLSMemoryBIOFactory(
serverContextFactory, False, serverFactory)
sslServerProtocol = wrapperFactory.buildProtocol(None)
connectionDeferred = loopbackAsync(sslServerProtocol, sslClientProtocol)
return (sslClientProtocol, sslServerProtocol, handshakeDeferred,
connectionDeferred)
def test_writeUnicodeRaisesTypeError(self):
"""
Writing C{unicode} to L{TLSMemoryBIOProtocol} throws a C{TypeError}.
"""
notBytes = u"hello"
result = []
class SimpleSendingProtocol(Protocol):
def connectionMade(self):
try:
self.transport.write(notBytes)
except TypeError:
result.append(True)
self.transport.write(b"bytes")
self.transport.loseConnection()
d = self.writeBeforeHandshakeTest(SimpleSendingProtocol, b"bytes")
return d.addCallback(lambda ign: self.assertEqual(result, [True]))
def twisted_coroutine_fetch(self, url, runner):
body = [None]
@gen.coroutine
def f():
# This is simpler than the non-coroutine version, but it cheats
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield client.request(b'GET', utf8(url))
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
warnings.simplefilter('ignore', category=DeprecationWarning)
body[0] = yield readBody(response)
self.stop_loop()
self.io_loop.add_callback(f)
runner()
return body[0]
def _abortConnection(self):
"""
We need a way to close the connection when an event line is too long
or if we time out waiting for an event. This is normally done by
calling :meth:`~twisted.internet.interfaces.ITransport.loseConnection``
or :meth:`~twisted.internet.interfaces.ITCPTransport.abortConnection`,
but newer versions of Twisted make this complicated.
Despite what the documentation says for
:class:`twisted.internet.protocol.Protocol`, the ``transport``
attribute is not necessarily a
:class:`twisted.internet.interfaces.ITransport`. Looking at the
documentation for :class:`twisted.internet.interfaces.IProtocol`, the
``transport`` attribute is actually not defined and neither is the
type of the ``transport`` parameter to
:meth:`~twisted.internet.interfaces.IProtocol.makeConnection`.
``SseProtocol`` will most often be used with HTTP requests initiated
with :class:`twisted.web.client.Agent` which, in newer versions of
Twisted, ends up giving us a
:class:`twisted.web._newclient.TransportProxyProducer` for our
``transport``. This is just a
:class:`twisted.internet.interfaces.IPushProducer` that wraps the
actual transport. If our transport is one of these, try call
``abortConnection()`` on the underlying transport.
"""
transport = self.transport
if isinstance(transport, TransportProxyProducer):
transport = transport._producer
if hasattr(transport, 'abortConnection'):
transport.abortConnection()
else:
self.log.error(
'Transport {} has no abortConnection method'.format(transport))
def twisted_fetch(self, url, runner):
# http://twistedmatrix.com/documents/current/web/howto/client.html
chunks = []
client = Agent(self.reactor)
d = client.request(b'GET', utf8(url))
class Accumulator(Protocol):
def __init__(self, finished):
self.finished = finished
def dataReceived(self, data):
chunks.append(data)
def connectionLost(self, reason):
self.finished.callback(None)
def callback(response):
finished = Deferred()
response.deliverBody(Accumulator(finished))
return finished
d.addCallback(callback)
def shutdown(failure):
if hasattr(self, 'stop_loop'):
self.stop_loop()
elif failure is not None:
# loop hasn't been initialized yet; try our best to
# get an error message out. (the runner() interaction
# should probably be refactored).
try:
failure.raiseException()
except:
logging.error('exception before starting loop', exc_info=True)
d.addBoth(shutdown)
runner()
self.assertTrue(chunks)
return ''.join(chunks)
def twisted_fetch(self, url, runner):
# http://twistedmatrix.com/documents/current/web/howto/client.html
chunks = []
client = Agent(self.reactor)
d = client.request(b'GET', utf8(url))
class Accumulator(Protocol):
def __init__(self, finished):
self.finished = finished
def dataReceived(self, data):
chunks.append(data)
def connectionLost(self, reason):
self.finished.callback(None)
def callback(response):
finished = Deferred()
response.deliverBody(Accumulator(finished))
return finished
d.addCallback(callback)
def shutdown(failure):
if hasattr(self, 'stop_loop'):
self.stop_loop()
elif failure is not None:
# loop hasn't been initialized yet; try our best to
# get an error message out. (the runner() interaction
# should probably be refactored).
try:
failure.raiseException()
except:
logging.error('exception before starting loop', exc_info=True)
d.addBoth(shutdown)
runner()
self.assertTrue(chunks)
return ''.join(chunks)
def twisted_fetch(self, url, runner):
# http://twistedmatrix.com/documents/current/web/howto/client.html
chunks = []
client = Agent(self.reactor)
d = client.request(b'GET', utf8(url))
class Accumulator(Protocol):
def __init__(self, finished):
self.finished = finished
def dataReceived(self, data):
chunks.append(data)
def connectionLost(self, reason):
self.finished.callback(None)
def callback(response):
finished = Deferred()
response.deliverBody(Accumulator(finished))
return finished
d.addCallback(callback)
def shutdown(failure):
if hasattr(self, 'stop_loop'):
self.stop_loop()
elif failure is not None:
# loop hasn't been initialized yet; try our best to
# get an error message out. (the runner() interaction
# should probably be refactored).
try:
failure.raiseException()
except:
logging.error('exception before starting loop', exc_info=True)
d.addBoth(shutdown)
runner()
self.assertTrue(chunks)
return ''.join(chunks)
def do_ProtocolVersion(self):
# Remove the servers version and return our own
print('Protocol version recieved "%s"' % (''.join(self.read_buffer[:11].tostring()), ))
self.read_buffer = self.read_buffer[12:]
self.transport.write('RFB 003.003\n')
self.state = 'Authentication'
def __init__(self, account, chatui, logonDeferred):
for base in self.__class__.__bases__:
if issubclass(base, Protocol):
self.__class__._protoBase = base
break
else:
pass
self.account = account
self.chat = chatui
self._logonDeferred = logonDeferred
def _initializeStream(self):
""" Sets up XML Parser. """
self.stream = domish.elementStream()
self.stream.DocumentStartEvent = self.onDocumentStart
self.stream.ElementEvent = self.onElement
self.stream.DocumentEndEvent = self.onDocumentEnd
### --------------------------------------------------------------
###
### Protocol events
###
### --------------------------------------------------------------
def set_overwrite(self, boolean):
"""May I overwrite existing files?
"""
self.overwrite = boolean
# Protocol-level methods.
def wrapProcessProtocol(inst):
if isinstance(inst, protocol.Protocol):
return _ProtocolWrapper(inst)
else:
return inst
def makeConnection(self, transport):
"""
Save the platform-specific socket handle for future
introspection.
"""
self.handle = transport.getHandle()
return protocol.Protocol.makeConnection(self, transport)