def twisted_coroutine_fetch(self, url, runner):
body = [None]
@gen.coroutine
def f():
# This is simpler than the non-coroutine version, but it cheats
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield client.request(b'GET', utf8(url))
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
warnings.simplefilter('ignore', category=DeprecationWarning)
body[0] = yield readBody(response)
self.stop_loop()
self.io_loop.add_callback(f)
runner()
return body[0]
python类Protocol()的实例源码
def twisted_coroutine_fetch(self, url, runner):
body = [None]
@gen.coroutine
def f():
# This is simpler than the non-coroutine version, but it cheats
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield client.request(b'GET', utf8(url))
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
warnings.simplefilter('ignore', category=DeprecationWarning)
body[0] = yield readBody(response)
self.stop_loop()
self.io_loop.add_callback(f)
runner()
return body[0]
def testImmediateDisconnect(self):
org = "twisted.test.test_ssl"
self.setupServerAndClient(
(org, org + ", client"), {},
(org, org + ", server"), {})
# Set up a server, connect to it with a client, which should work since our verifiers
# allow anything, then disconnect.
serverProtocolFactory = protocol.ServerFactory()
serverProtocolFactory.protocol = protocol.Protocol
self.serverPort = serverPort = reactor.listenSSL(0,
serverProtocolFactory, self.serverCtxFactory)
clientProtocolFactory = protocol.ClientFactory()
clientProtocolFactory.protocol = ImmediatelyDisconnectingProtocol
clientProtocolFactory.connectionDisconnected = defer.Deferred()
clientConnector = reactor.connectSSL('127.0.0.1',
serverPort.getHost().port, clientProtocolFactory, self.clientCtxFactory)
return clientProtocolFactory.connectionDisconnected.addCallback(
lambda ignoredResult: self.serverPort.stopListening())
def receiveFromConnection(self, commands, protocol):
"""
Retrieves a file or listing generated by the given command,
feeding it to the given protocol.
@param command: list of strings of FTP commands to execute then receive
the results of (e.g. LIST, RETR)
@param protocol: A L{Protocol} *instance* e.g. an
L{FTPFileListProtocol}, or something that can be adapted to one.
Typically this will be an L{IConsumer} implemenation.
@return: L{Deferred}.
"""
protocol = interfaces.IProtocol(protocol)
wrapper = ProtocolWrapper(protocol, defer.Deferred())
return self._openDataConnection(commands, wrapper)
def list(self, path, protocol):
"""
Retrieve a file listing into the given protocol instance.
This method issues the 'LIST' FTP command.
@param path: path to get a file listing for.
@param protocol: a L{Protocol} instance, probably a
L{FTPFileListProtocol} instance. It can cope with most common file
listing formats.
@return: L{Deferred}
"""
if path is None:
path = ''
return self.receiveFromConnection(['LIST ' + self.escapePath(path)], protocol)
def testImmediateDisconnect(self):
org = "twisted.test.test_ssl"
self.setupServerAndClient(
(org, org + ", client"), {},
(org, org + ", server"), {})
# Set up a server, connect to it with a client, which should work since our verifiers
# allow anything, then disconnect.
serverProtocolFactory = protocol.ServerFactory()
serverProtocolFactory.protocol = protocol.Protocol
self.serverPort = serverPort = reactor.listenSSL(0,
serverProtocolFactory, self.serverCtxFactory)
clientProtocolFactory = protocol.ClientFactory()
clientProtocolFactory.protocol = ImmediatelyDisconnectingProtocol
clientProtocolFactory.connectionDisconnected = defer.Deferred()
clientConnector = reactor.connectSSL('127.0.0.1',
serverPort.getHost().port, clientProtocolFactory, self.clientCtxFactory)
return clientProtocolFactory.connectionDisconnected.addCallback(
lambda ignoredResult: self.serverPort.stopListening())
def receiveFromConnection(self, commands, protocol):
"""
Retrieves a file or listing generated by the given command,
feeding it to the given protocol.
@param command: list of strings of FTP commands to execute then receive
the results of (e.g. LIST, RETR)
@param protocol: A L{Protocol} *instance* e.g. an
L{FTPFileListProtocol}, or something that can be adapted to one.
Typically this will be an L{IConsumer} implemenation.
@return: L{Deferred}.
"""
protocol = interfaces.IProtocol(protocol)
wrapper = ProtocolWrapper(protocol, defer.Deferred())
return self._openDataConnection(commands, wrapper)
def list(self, path, protocol):
"""
Retrieve a file listing into the given protocol instance.
This method issues the 'LIST' FTP command.
@param path: path to get a file listing for.
@param protocol: a L{Protocol} instance, probably a
L{FTPFileListProtocol} instance. It can cope with most common file
listing formats.
@return: L{Deferred}
"""
if path is None:
path = ''
return self.receiveFromConnection(['LIST ' + self.escapePath(path)], protocol)
twisted_test.py 文件源码
项目:My-Web-Server-Framework-With-Python2.7
作者: syjsu
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def twisted_coroutine_fetch(self, url, runner):
body = [None]
@gen.coroutine
def f():
# This is simpler than the non-coroutine version, but it cheats
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield client.request(b'GET', utf8(url))
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
warnings.simplefilter('ignore', category=DeprecationWarning)
body[0] = yield readBody(response)
self.stop_loop()
self.io_loop.add_callback(f)
runner()
return body[0]
def twisted_coroutine_fetch(self, url, runner):
body = [None]
@gen.coroutine
def f():
# This is simpler than the non-coroutine version, but it cheats
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield client.request(b'GET', utf8(url))
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
warnings.simplefilter('ignore', category=DeprecationWarning)
body[0] = yield readBody(response)
self.stop_loop()
self.io_loop.add_callback(f)
runner()
return body[0]
def twisted_coroutine_fetch(self, url, runner):
body = [None]
@gen.coroutine
def f():
# This is simpler than the non-coroutine version, but it cheats
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield client.request(b'GET', utf8(url))
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
warnings.simplefilter('ignore', category=DeprecationWarning)
body[0] = yield readBody(response)
self.stop_loop()
self.io_loop.add_callback(f)
runner()
return body[0]
def twisted_coroutine_fetch(self, url, runner):
body = [None]
@gen.coroutine
def f():
# This is simpler than the non-coroutine version, but it cheats
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield client.request(b'GET', utf8(url))
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
warnings.simplefilter('ignore', category=DeprecationWarning)
body[0] = yield readBody(response)
self.stop_loop()
self.io_loop.add_callback(f)
runner()
return body[0]
def _setClient(self, client):
"""
Called when the connection was established to the forwarding
destination.
@param client: Client protocol connected to the forwarding destination.
@type client: L{protocol.Protocol}
"""
self.client = client
log.msg("connected to %s:%i" % self.hostport)
if self.clientBuf:
self.client.transport.write(self.clientBuf)
self.clientBuf = None
if self.client.buf[1:]:
self.write(self.client.buf[1:])
self.client.buf = b''
def test_wrapProtocol(self):
"""
L{wrapProtocol}, when passed a L{Protocol} should return something that
has write(), writeSequence(), loseConnection() methods which call the
Protocol's dataReceived() and connectionLost() methods, respectively.
"""
protocol = MockProtocol()
protocol.transport = StubTransport()
protocol.connectionMade()
wrapped = session.wrapProtocol(protocol)
wrapped.dataReceived(b'dataReceived')
self.assertEqual(protocol.transport.buf, b'dataReceived')
wrapped.write(b'data')
wrapped.writeSequence([b'1', b'2'])
wrapped.loseConnection()
self.assertEqual(protocol.data, b'data12')
protocol.reason.trap(error.ConnectionDone)
def test_wrapProcessProtocol_Protocol(self):
"""
L{wrapPRocessProtocol}, when passed a L{Protocol} should return
something that follows the L{IProcessProtocol} interface, with
connectionMade() mapping to connectionMade(), outReceived() mapping to
dataReceived() and processEnded() mapping to connectionLost().
"""
protocol = MockProtocol()
protocol.transport = StubTransport()
process_protocol = session.wrapProcessProtocol(protocol)
process_protocol.connectionMade()
process_protocol.outReceived(b'data')
self.assertEqual(protocol.transport.buf, b'data~')
process_protocol.processEnded(failure.Failure(
error.ProcessTerminated(0, None, None)))
protocol.reason.trap(error.ProcessTerminated)
def test_makeConnection(self):
"""
The L{IProtocol} provider passed to L{Response.deliverBody} has its
C{makeConnection} method called with an L{IPushProducer} provider
hooked up to the response as an argument.
"""
producers = []
transport = StringTransport()
class SomeProtocol(Protocol):
def makeConnection(self, producer):
producers.append(producer)
consumer = SomeProtocol()
response = justTransportResponse(transport)
response.deliverBody(consumer)
[theProducer] = producers
theProducer.pauseProducing()
self.assertEqual(transport.producerState, u'paused')
theProducer.resumeProducing()
self.assertEqual(transport.producerState, u'producing')
def test_dataReceived(self):
"""
The L{IProtocol} provider passed to L{Response.deliverBody} has its
C{dataReceived} method called with bytes received as part of the
response body.
"""
bytes = []
class ListConsumer(Protocol):
def dataReceived(self, data):
bytes.append(data)
consumer = ListConsumer()
response = justTransportResponse(StringTransport())
response.deliverBody(consumer)
response._bodyDataReceived(b'foo')
self.assertEqual(bytes, [b'foo'])
def test_connectionLost(self):
"""
The L{IProtocol} provider passed to L{Response.deliverBody} has its
C{connectionLost} method called with a L{Failure} wrapping
L{ResponseDone} when the response's C{_bodyDataFinished} method is
called.
"""
lost = []
class ListConsumer(Protocol):
def connectionLost(self, reason):
lost.append(reason)
consumer = ListConsumer()
response = justTransportResponse(StringTransport())
response.deliverBody(consumer)
response._bodyDataFinished()
lost[0].trap(ResponseDone)
self.assertEqual(len(lost), 1)
# The protocol reference should be dropped, too, to facilitate GC or
# whatever.
self.assertIdentical(response._bodyProtocol, None)
def test_bufferEarlyData(self):
"""
If data is delivered to the L{Response} before a protocol is registered
with C{deliverBody}, that data is buffered until the protocol is
registered and then is delivered.
"""
bytes = []
class ListConsumer(Protocol):
def dataReceived(self, data):
bytes.append(data)
protocol = ListConsumer()
response = justTransportResponse(StringTransport())
response._bodyDataReceived(b'foo')
response._bodyDataReceived(b'bar')
response.deliverBody(protocol)
response._bodyDataReceived(b'baz')
self.assertEqual(bytes, [b'foo', b'bar', b'baz'])
# Make sure the implementation-detail-byte-buffer is cleared because
# not clearing it wastes memory.
self.assertIdentical(response._bodyBuffer, None)
def test_transportResumed(self):
"""
L{Response.deliverBody} resumes the HTTP connection's transport
after passing it to the consumer's C{makeConnection} method.
"""
transportState = []
class ListConsumer(Protocol):
def makeConnection(self, transport):
transportState.append(transport.producerState)
transport = StringTransport()
transport.pauseProducing()
protocol = ListConsumer()
response = justTransportResponse(transport)
self.assertEqual(transport.producerState, u'paused')
response.deliverBody(protocol)
self.assertEqual(transportState, [u'paused'])
self.assertEqual(transport.producerState, u'producing')
def test_removeReader(self):
"""
Removing a filesystem file reader from a reactor will make sure it is
no longer polled.
"""
reactor = self.buildReactor()
self.addCleanup(self.unbuildReactor, reactor)
path = self.mktemp()
open(path, "wb").close()
with open(path, "rb") as f:
# Have the reader added:
stdio = StandardIO(Protocol(), stdin=f.fileno(),
stdout=self.extraFile.fileno(),
reactor=reactor)
self.assertIn(stdio._reader, reactor.getReaders())
stdio._reader.stopReading()
self.assertNotIn(stdio._reader, reactor.getReaders())
def test_removeWriter(self):
"""
Removing a filesystem file writer from a reactor will make sure it is
no longer polled.
"""
reactor = self.buildReactor()
self.addCleanup(self.unbuildReactor, reactor)
# Cleanup might fail if file is GCed too soon:
self.f = f = open(self.mktemp(), "wb")
# Have the reader added:
protocol = Protocol()
stdio = StandardIO(protocol, stdout=f.fileno(),
stdin=self.extraFile.fileno(),
reactor=reactor)
protocol.transport.write(b"hello")
self.assertIn(stdio._writer, reactor.getWriters())
stdio._writer.stopWriting()
self.assertNotIn(stdio._writer, reactor.getWriters())
def test_removeAll(self):
"""
Calling C{removeAll} on a reactor includes descriptors that are
filesystem files.
"""
reactor = self.buildReactor()
self.addCleanup(self.unbuildReactor, reactor)
path = self.mktemp()
open(path, "wb").close()
# Cleanup might fail if file is GCed too soon:
self.f = f = open(path, "rb")
# Have the reader added:
stdio = StandardIO(Protocol(), stdin=f.fileno(),
stdout=self.extraFile.fileno(), reactor=reactor)
# And then removed:
removed = reactor.removeAll()
self.assertIn(stdio._reader, removed)
self.assertNotIn(stdio._reader, reactor.getReaders())
def test_getReaders(self):
"""
C{reactor.getReaders} includes descriptors that are filesystem files.
"""
reactor = self.buildReactor()
self.addCleanup(self.unbuildReactor, reactor)
path = self.mktemp()
open(path, "wb").close()
# Cleanup might fail if file is GCed too soon:
with open(path, "rb") as f:
# Have the reader added:
stdio = StandardIO(Protocol(), stdin=f.fileno(),
stdout=self.extraFile.fileno(), reactor=reactor)
self.assertIn(stdio._reader, reactor.getReaders())
def test_getWriters(self):
"""
C{reactor.getWriters} includes descriptors that are filesystem files.
"""
reactor = self.buildReactor()
self.addCleanup(self.unbuildReactor, reactor)
# Cleanup might fail if file is GCed too soon:
self.f = f = open(self.mktemp(), "wb")
# Have the reader added:
stdio = StandardIO(Protocol(), stdout=f.fileno(),
stdin=self.extraFile.fileno(), reactor=reactor)
self.assertNotIn(stdio._writer, reactor.getWriters())
stdio._writer.startWriting()
self.assertIn(stdio._writer, reactor.getWriters())
def setUp(self):
"""
Construct a L{StandardIOEndpoint} with a dummy reactor and a fake
L{stdio.StandardIO} like object. Listening on it with a
L{SpecificFactory}.
"""
self.reactor = object()
endpoint = endpoints.StandardIOEndpoint(self.reactor)
self.assertIs(endpoint._stdio, stdio.StandardIO)
endpoint._stdio = FakeStdio
self.specificProtocol = Protocol()
self.fakeStdio = self.successResultOf(
endpoint.listen(SpecificFactory(self.specificProtocol))
)
def test_deferBadEncodingToConnect(self):
"""
Since any client of L{IStreamClientEndpoint} needs to handle Deferred
failures from C{connect}, L{HostnameEndpoint}'s constructor will not
raise exceptions when given bad host names, instead deferring to
returning a failing L{Deferred} from C{connect}.
"""
endpoint = endpoints.HostnameEndpoint(
deterministicResolvingReactor(MemoryReactor(), ['127.0.0.1']),
b'\xff-garbage-\xff', 80
)
deferred = endpoint.connect(Factory.forProtocol(Protocol))
err = self.failureResultOf(deferred, ValueError)
self.assertIn("\\xff-garbage-\\xff", str(err))
endpoint = endpoints.HostnameEndpoint(
deterministicResolvingReactor(MemoryReactor(), ['127.0.0.1']),
u'\u2ff0-garbage-\u2ff0', 80
)
deferred = endpoint.connect(Factory())
err = self.failureResultOf(deferred, ValueError)
self.assertIn("\\u2ff0-garbage-\\u2ff0", str(err))
def test_Year(self):
"""
This example derived from bug description in issue 514.
@return: L{Deferred} of command response
"""
fileList = ftp.FTPFileListProtocol()
exampleLine = (
b'-rw-r--r-- 1 root other 531 Jan 29 2003 README\n')
class PrintLine(protocol.Protocol):
def connectionMade(self):
self.transport.write(exampleLine)
self.transport.loseConnection()
def check(ignored):
file = fileList.files[0]
self.assertTrue(file['size'] == 531, 'misparsed fileitem')
self.assertTrue(file['date'] ==
'Jan 29 2003', 'misparsed fileitem')
self.assertTrue(file['filename'] == 'README', 'misparsed fileitem')
d = loopback.loopbackAsync(PrintLine(), fileList)
return d.addCallback(check)
def testImmediateDisconnect(self):
org = "twisted.test.test_ssl"
self.setupServerAndClient(
(org, org + ", client"), {},
(org, org + ", server"), {})
# Set up a server, connect to it with a client, which should work since our verifiers
# allow anything, then disconnect.
serverProtocolFactory = protocol.ServerFactory()
serverProtocolFactory.protocol = protocol.Protocol
self.serverPort = serverPort = reactor.listenSSL(0,
serverProtocolFactory, self.serverCtxFactory)
clientProtocolFactory = protocol.ClientFactory()
clientProtocolFactory.protocol = ImmediatelyDisconnectingProtocol
clientProtocolFactory.connectionDisconnected = defer.Deferred()
reactor.connectSSL('127.0.0.1',
serverPort.getHost().port, clientProtocolFactory, self.clientCtxFactory)
return clientProtocolFactory.connectionDisconnected.addCallback(
lambda ignoredResult: self.serverPort.stopListening())
def testConnectionCounting(self):
# Make a basic factory
factory = policies.LimitTotalConnectionsFactory()
factory.protocol = protocol.Protocol
# connectionCount starts at zero
self.assertEqual(0, factory.connectionCount)
# connectionCount increments as connections are made
p1 = factory.buildProtocol(None)
self.assertEqual(1, factory.connectionCount)
p2 = factory.buildProtocol(None)
self.assertEqual(2, factory.connectionCount)
# and decrements as they are lost
p1.connectionLost(None)
self.assertEqual(1, factory.connectionCount)
p2.connectionLost(None)
self.assertEqual(0, factory.connectionCount)