def testWriter(self):
f = protocol.Factory()
f.protocol = WriterProtocol
f.done = 0
f.problem = 0
wrappedF = WiredFactory(f)
p = reactor.listenTCP(0, wrappedF, interface="127.0.0.1")
n = p.getHost().port
self.ports.append(p)
clientF = WriterClientFactory()
wrappedClientF = WiredFactory(clientF)
reactor.connectTCP("127.0.0.1", n, wrappedClientF)
def check(ignored):
self.failUnless(f.done, "writer didn't finish, it probably died")
self.failUnless(f.problem == 0, "writer indicated an error")
self.failUnless(clientF.done,
"client didn't see connection dropped")
expected = "".join(["Hello Cleveland!\n",
"Goodbye", " cruel", " world", "\n"])
self.failUnless(clientF.data == expected,
"client didn't receive all the data it expected")
d = defer.gatherResults([wrappedF.onDisconnect,
wrappedClientF.onDisconnect])
return d.addCallback(check)
python类Factory()的实例源码
def testWriter(self):
f = protocol.Factory()
f.protocol = LargeBufferWriterProtocol
f.done = 0
f.problem = 0
f.len = self.datalen
wrappedF = FireOnCloseFactory(f)
p = reactor.listenTCP(0, wrappedF, interface="127.0.0.1")
n = p.getHost().port
self.ports.append(p)
clientF = LargeBufferReaderClientFactory()
wrappedClientF = FireOnCloseFactory(clientF)
reactor.connectTCP("127.0.0.1", n, wrappedClientF)
d = defer.gatherResults([wrappedF.deferred, wrappedClientF.deferred])
def check(ignored):
self.failUnless(f.done, "writer didn't finish, it probably died")
self.failUnless(clientF.len == self.datalen,
"client didn't receive all the data it expected "
"(%d != %d)" % (clientF.len, self.datalen))
self.failUnless(clientF.done,
"client didn't see connection dropped")
return d.addCallback(check)
def testPeerBind(self):
"""assert the remote endpoint (getPeer) on the receiving end matches
the local endpoint (bind) on the connecting end, for unix sockets"""
filename = self.mktemp()
peername = self.mktemp()
f = Factory(self, filename, peername=peername)
l = reactor.listenUNIX(filename, f)
self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self._sock.bind(peername)
self._sock.connect(filename)
d = f.deferred
def done(x):
self._addPorts(l)
self._sock.close()
del self._sock
return x
d.addBoth(done)
return d
def testPORTCannotConnect(self):
# Login
d = self._anonymousLogin()
# Listen on a port, and immediately stop listening as a way to find a
# port number that is definitely closed.
def loggedIn(ignored):
port = reactor.listenTCP(0, protocol.Factory(),
interface='127.0.0.1')
portNum = port.getHost().port
d = port.stopListening()
d.addCallback(lambda _: portNum)
return d
d.addCallback(loggedIn)
# Tell the server to connect to that port with a PORT command, and
# verify that it fails with the right error.
def gotPortNum(portNum):
return self.assertCommandFailed(
'PORT ' + ftp.encodeHostPort('127.0.0.1', portNum),
["425 Can't open data connection."])
return d.addCallback(gotPortNum)
# -- Client Tests -----------------------------------------------------------
def loopbackUNIX(server, client, noisy=True):
"""Run session between server and client protocol instances over UNIX socket."""
path = tempfile.mktemp()
from twisted.internet import reactor
f = policies.WrappingFactory(protocol.Factory())
serverWrapper = _FireOnClose(f, server)
f.noisy = noisy
f.buildProtocol = lambda addr: serverWrapper
serverPort = reactor.listenUNIX(path, f)
clientF = LoopbackClientFactory(client)
clientF.noisy = noisy
reactor.connectUNIX(path, clientF)
d = clientF.deferred
d.addCallback(lambda x: serverWrapper.deferred)
d.addCallback(lambda x: serverPort.stopListening())
return d
def testWriter(self):
f = protocol.Factory()
f.protocol = WriterProtocol
f.done = 0
f.problem = 0
wrappedF = WiredFactory(f)
p = reactor.listenTCP(0, wrappedF, interface="127.0.0.1")
n = p.getHost().port
self.ports.append(p)
clientF = WriterClientFactory()
wrappedClientF = WiredFactory(clientF)
reactor.connectTCP("127.0.0.1", n, wrappedClientF)
def check(ignored):
self.failUnless(f.done, "writer didn't finish, it probably died")
self.failUnless(f.problem == 0, "writer indicated an error")
self.failUnless(clientF.done,
"client didn't see connection dropped")
expected = "".join(["Hello Cleveland!\n",
"Goodbye", " cruel", " world", "\n"])
self.failUnless(clientF.data == expected,
"client didn't receive all the data it expected")
d = defer.gatherResults([wrappedF.onDisconnect,
wrappedClientF.onDisconnect])
return d.addCallback(check)
def testWriter(self):
f = protocol.Factory()
f.protocol = LargeBufferWriterProtocol
f.done = 0
f.problem = 0
f.len = self.datalen
wrappedF = FireOnCloseFactory(f)
p = reactor.listenTCP(0, wrappedF, interface="127.0.0.1")
n = p.getHost().port
self.ports.append(p)
clientF = LargeBufferReaderClientFactory()
wrappedClientF = FireOnCloseFactory(clientF)
reactor.connectTCP("127.0.0.1", n, wrappedClientF)
d = defer.gatherResults([wrappedF.deferred, wrappedClientF.deferred])
def check(ignored):
self.failUnless(f.done, "writer didn't finish, it probably died")
self.failUnless(clientF.len == self.datalen,
"client didn't receive all the data it expected "
"(%d != %d)" % (clientF.len, self.datalen))
self.failUnless(clientF.done,
"client didn't see connection dropped")
return d.addCallback(check)
def testPeerBind(self):
"""assert the remote endpoint (getPeer) on the receiving end matches
the local endpoint (bind) on the connecting end, for unix sockets"""
filename = self.mktemp()
peername = self.mktemp()
f = Factory(self, filename, peername=peername)
l = reactor.listenUNIX(filename, f)
self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self._sock.bind(peername)
self._sock.connect(filename)
d = f.deferred
def done(x):
self._addPorts(l)
self._sock.close()
del self._sock
return x
d.addBoth(done)
return d
def testPORTCannotConnect(self):
# Login
d = self._anonymousLogin()
# Listen on a port, and immediately stop listening as a way to find a
# port number that is definitely closed.
def loggedIn(ignored):
port = reactor.listenTCP(0, protocol.Factory(),
interface='127.0.0.1')
portNum = port.getHost().port
d = port.stopListening()
d.addCallback(lambda _: portNum)
return d
d.addCallback(loggedIn)
# Tell the server to connect to that port with a PORT command, and
# verify that it fails with the right error.
def gotPortNum(portNum):
return self.assertCommandFailed(
'PORT ' + ftp.encodeHostPort('127.0.0.1', portNum),
["425 Can't open data connection."])
return d.addCallback(gotPortNum)
# -- Client Tests -----------------------------------------------------------
def loopbackUNIX(server, client, noisy=True):
"""Run session between server and client protocol instances over UNIX socket."""
path = tempfile.mktemp()
from twisted.internet import reactor
f = policies.WrappingFactory(protocol.Factory())
serverWrapper = _FireOnClose(f, server)
f.noisy = noisy
f.buildProtocol = lambda addr: serverWrapper
serverPort = reactor.listenUNIX(path, f)
clientF = LoopbackClientFactory(client)
clientF.noisy = noisy
reactor.connectUNIX(path, clientF)
d = clientF.deferred
d.addCallback(lambda x: serverWrapper.deferred)
d.addCallback(lambda x: serverPort.stopListening())
return d
def test_simpleSuccess(self):
"""
If C{getaddrinfo} gives one L{GAIEndpoint.connect}.
"""
gaiendpoint = self.makeEndpoint()
protos = []
f = Factory()
f.protocol = Protocol
gaiendpoint.connect(f).addCallback(protos.append)
WHO_CARES = 0
WHAT_EVER = ""
self.gaiResult(AF_INET, SOCK_STREAM, WHO_CARES, WHAT_EVER,
("1.2.3.4", 4321))
self.clock.advance(1.0)
attempt = self.fakeRealEndpoints[0]._attempt
attempt.callback(self.fakeRealEndpoints[0]._factory.buildProtocol(None))
self.assertEqual(len(protos), 1)
def buildProtocol(self, addr):
"""
Create an instance of the server side of the SSH protocol.
@type addr: L{twisted.internet.interfaces.IAddress} provider
@param addr: The address at which the server will listen.
@rtype: L{twisted.conch.ssh.transport.SSHServerTransport}
@return: The built transport.
"""
t = protocol.Factory.buildProtocol(self, addr)
t.supportedPublicKeys = self.privateKeys.keys()
if not self.primes:
log.msg('disabling non-fixed-group key exchange algorithms '
'because we cannot find moduli file')
t.supportedKeyExchanges = [
kexAlgorithm for kexAlgorithm in t.supportedKeyExchanges
if _kex.isFixedGroup(kexAlgorithm) or
_kex.isEllipticCurve(kexAlgorithm)]
return t
def connectToAgent(self, endpoint):
"""
Set up a connection to the authentication agent and trigger its
initialization.
@param endpoint: An endpoint which can be used to connect to the
authentication agent.
@type endpoint: L{IStreamClientEndpoint} provider
@return: A L{Deferred} which fires when the agent connection is ready
for use.
"""
factory = Factory()
factory.protocol = SSHAgentClient
d = endpoint.connect(factory)
def connected(agent):
self.agent = agent
return agent.getPublicKeys()
d.addCallback(connected)
return d
def connect(self, protocolFactory):
"""
Set up an SSH connection, use a channel from that connection to launch
a command, and hook the stdin and stdout of that command up as a
transport for a protocol created by the given factory.
@param protocolFactory: A L{Factory} to use to create the protocol
which will be connected to the stdin and stdout of the command on
the SSH server.
@return: A L{Deferred} which will fire with an error if the connection
cannot be set up for any reason or with the protocol instance
created by C{protocolFactory} once it has been connected to the
command.
"""
d = self._creator.secureConnection()
d.addCallback(self._executeCommand, protocolFactory)
return d
def test_processAddress(self):
"""
The address passed to the factory's buildProtocol in the endpoint is a
_ProcessAddress instance.
"""
class TestAddrFactory(protocol.Factory):
protocol = StubApplicationProtocol
address = None
def buildProtocol(self, addr):
self.address = addr
p = self.protocol()
p.factory = self
return p
myFactory = TestAddrFactory()
d = self.ep.connect(myFactory)
self.successResultOf(d)
self.assertIsInstance(myFactory.address, _ProcessAddress)
def test_deferBadEncodingToConnect(self):
"""
Since any client of L{IStreamClientEndpoint} needs to handle Deferred
failures from C{connect}, L{HostnameEndpoint}'s constructor will not
raise exceptions when given bad host names, instead deferring to
returning a failing L{Deferred} from C{connect}.
"""
endpoint = endpoints.HostnameEndpoint(
deterministicResolvingReactor(MemoryReactor(), ['127.0.0.1']),
b'\xff-garbage-\xff', 80
)
deferred = endpoint.connect(Factory.forProtocol(Protocol))
err = self.failureResultOf(deferred, ValueError)
self.assertIn("\\xff-garbage-\\xff", str(err))
endpoint = endpoints.HostnameEndpoint(
deterministicResolvingReactor(MemoryReactor(), ['127.0.0.1']),
u'\u2ff0-garbage-\u2ff0', 80
)
deferred = endpoint.connect(Factory())
err = self.failureResultOf(deferred, ValueError)
self.assertIn("\\u2ff0-garbage-\\u2ff0", str(err))
def test_connectProtocolCreatesFactory(self):
"""
C{endpoints.connectProtocol} calls the given endpoint's C{connect()}
method with a factory that will build the given protocol.
"""
reactor = MemoryReactor()
endpoint = endpoints.TCP4ClientEndpoint(reactor, "127.0.0.1", 0)
theProtocol = object()
endpoints.connectProtocol(endpoint, theProtocol)
# A TCP connection was made via the given endpoint:
self.assertEqual(len(reactor.tcpClients), 1)
# TCP4ClientEndpoint uses a _WrapperFactory around the underlying
# factory, so we need to unwrap it:
factory = reactor.tcpClients[0][2]._wrappedFactory
self.assertIsInstance(factory, protocol.Factory)
self.assertIs(factory.buildProtocol(None), theProtocol)
def testWriter(self):
f = protocol.Factory()
f.protocol = LargeBufferWriterProtocol
f.done = 0
f.problem = 0
f.len = self.datalen
wrappedF = FireOnCloseFactory(f)
p = reactor.listenTCP(0, wrappedF, interface="127.0.0.1")
self.addCleanup(p.stopListening)
n = p.getHost().port
clientF = LargeBufferReaderClientFactory()
wrappedClientF = FireOnCloseFactory(clientF)
reactor.connectTCP("127.0.0.1", n, wrappedClientF)
d = defer.gatherResults([wrappedF.deferred, wrappedClientF.deferred])
def check(ignored):
self.assertTrue(f.done, "writer didn't finish, it probably died")
self.assertTrue(clientF.len == self.datalen,
"client didn't receive all the data it expected "
"(%d != %d)" % (clientF.len, self.datalen))
self.assertTrue(clientF.done,
"client didn't see connection dropped")
return d.addCallback(check)
def test_service(self):
"""
L{strports.service} returns a L{StreamServerEndpointService}
constructed with an endpoint produced from
L{endpoint.serverFromString}, using the same syntax.
"""
reactor = object() # the cake is a lie
aFactory = Factory()
aGoodPort = 1337
svc = strports.service(
'tcp:' + str(aGoodPort), aFactory, reactor=reactor)
self.assertIsInstance(svc, internet.StreamServerEndpointService)
# See twisted.application.test.test_internet.EndpointServiceTests.
# test_synchronousRaiseRaisesSynchronously
self.assertTrue(svc._raiseSynchronously)
self.assertIsInstance(svc.endpoint, TCP4ServerEndpoint)
# Maybe we should implement equality for endpoints.
self.assertEqual(svc.endpoint._port, aGoodPort)
self.assertIs(svc.factory, aFactory)
self.assertIs(svc.endpoint._reactor, reactor)
def test_listenDefaultHost(self):
"""
L{MemoryReactor.listenTCP}, L{MemoryReactor.listenSSL} and
L{MemoryReactor.listenUNIX} will return an L{IListeningPort} whose
C{getHost} method returns an L{IAddress}; C{listenTCP} and C{listenSSL}
will have a default host of C{'0.0.0.0'}, and a port that reflects the
value passed, and C{listenUNIX} will have a name that reflects the path
passed.
"""
memoryReactor = MemoryReactor()
for port in [memoryReactor.listenTCP(8242, Factory()),
memoryReactor.listenSSL(8242, Factory(), None)]:
verifyObject(IListeningPort, port)
address = port.getHost()
verifyObject(IAddress, address)
self.assertEqual(address.host, '0.0.0.0')
self.assertEqual(address.port, 8242)
port = memoryReactor.listenUNIX(b"/path/to/socket", Factory())
verifyObject(IListeningPort, port)
address = port.getHost()
verifyObject(IAddress, address)
self.assertEqual(address.name, b"/path/to/socket")
def test_portRange(self):
"""
L{FTP.passivePortRange} should determine the ports which
L{FTP.getDTPPort} attempts to bind. If no port from that iterator can
be bound, L{error.CannotListenError} should be raised, otherwise the
first successful result from L{FTP.listenFactory} should be returned.
"""
def listenFactory(portNumber, factory):
if portNumber in (22032, 22033, 22034):
raise error.CannotListenError('localhost', portNumber, 'error')
return portNumber
self.serverProtocol.listenFactory = listenFactory
port = self.serverProtocol.getDTPPort(protocol.Factory())
self.assertEqual(port, 0)
self.serverProtocol.passivePortRange = xrange(22032, 65536)
port = self.serverProtocol.getDTPPort(protocol.Factory())
self.assertEqual(port, 22035)
self.serverProtocol.passivePortRange = xrange(22032, 22035)
self.assertRaises(error.CannotListenError,
self.serverProtocol.getDTPPort,
protocol.Factory())
def test_PORTCannotConnect(self):
"""
Listen on a port, and immediately stop listening as a way to find a
port number that is definitely closed.
"""
# Login
d = self._anonymousLogin()
def loggedIn(ignored):
port = reactor.listenTCP(0, protocol.Factory(),
interface='127.0.0.1')
portNum = port.getHost().port
d = port.stopListening()
d.addCallback(lambda _: portNum)
return d
d.addCallback(loggedIn)
# Tell the server to connect to that port with a PORT command, and
# verify that it fails with the right error.
def gotPortNum(portNum):
return self.assertCommandFailed(
'PORT ' + ftp.encodeHostPort('127.0.0.1', portNum),
["425 Can't open data connection."])
return d.addCallback(gotPortNum)
def loopbackUNIX(server, client, noisy=True):
"""Run session between server and client protocol instances over UNIX socket."""
path = tempfile.mktemp()
from twisted.internet import reactor
f = policies.WrappingFactory(protocol.Factory())
serverWrapper = _FireOnClose(f, server)
f.noisy = noisy
f.buildProtocol = lambda addr: serverWrapper
serverPort = reactor.listenUNIX(path, f)
clientF = LoopbackClientFactory(client)
clientF.noisy = noisy
reactor.connectUNIX(path, clientF)
d = clientF.deferred
d.addCallback(lambda x: serverWrapper.deferred)
d.addCallback(lambda x: serverPort.stopListening())
return d
def test_listen_starts_service(self):
"""
``AutoTLSEndpoint.listen`` starts an ``AcmeIssuingService``. Stopping
the port stops the service.
"""
factory = Factory()
d = self.endpoint.listen(factory)
self.assertThat(
d,
succeeded(
MatchesPredicate(
IListeningPort.providedBy,
'%r does not provide IListeningPort')))
port = d.result
self.assertThat(
self.endpoint.service,
MatchesStructure(running=Equals(True)))
self.assertThat(port.stopListening(), succeeded(Always()))
self.assertThat(
self.endpoint.service,
MatchesStructure(running=Equals(False)))
def connect(sdata, command, username, host, port=22, key_file=None, password=None):
"""
Connect to an SSH host (as it happens, persistently).
"""
sdata.set_conn_state('connecting')
try:
keys = [Key.fromFile(key_file)] if key_file else None
except exceptions.IOError as e:
print('### key load error:', str(e))
push_failure_message(str(e), sdata)
return
endpoint = SSHCommandClientEndpoint.newConnection(
reactor, command, username, host, port=int(port),
keys=keys, password=password, ui=None,
knownHosts=PermissiveKnownHosts())
factory = Factory()
factory.protocol = LineProtocol
factory.sdata = sdata
d = endpoint.connect(factory)
# Very small race condition between here and the replacement
# in connectionMade() above, but I've never managed to hit it.
def disconnect():
sdata.log('Disconnecting while still attempting to connect, by request')
d.cancel()
sdata.transport_drop_cb = disconnect
d.addErrback(lambda reason: push_failure_message(reason, sdata))
return d
def testDumber(self):
filename = self.mktemp()
f = Factory(self, filename)
l = reactor.listenUNIX(filename, f)
tcf = TestClientFactory(self, filename)
c = reactor.connectUNIX(filename, tcf)
d = defer.gatherResults([f.deferred, tcf.deferred])
d.addCallback(lambda x : self._addPorts(l, c.transport,
tcf.protocol.transport,
f.protocol.transport))
return d
def testMode(self):
filename = self.mktemp()
f = Factory(self, filename)
l = reactor.listenUNIX(filename, f, mode = 0600)
self.assertEquals(stat.S_IMODE(os.stat(filename)[0]), 0600)
tcf = TestClientFactory(self, filename)
c = reactor.connectUNIX(filename, tcf)
self._addPorts(l, c.transport)
def testSocketLocking(self):
filename = self.mktemp()
f = Factory(self, filename)
l = reactor.listenUNIX(filename, f, wantPID=True)
self.assertRaises(
error.CannotListenError,
reactor.listenUNIX, filename, f, wantPID=True)
def stoppedListening(ign):
l = reactor.listenUNIX(filename, f, wantPID=True)
return l.stopListening()
return l.stopListening().addCallback(stoppedListening)
def testUncleanServerSocketLocking(self):
def ranStupidChild(ign):
# If this next call succeeds, our lock handling is correct.
p = reactor.listenUNIX(self.filename, Factory(self, self.filename), wantPID=True)
return p.stopListening()
return self._uncleanSocketTest(ranStupidChild)
def testRepr(self):
filename = self.mktemp()
f = Factory(self, filename)
p = reactor.listenUNIX(filename, f)
self.failIf(str(p).find(filename) == -1)
def stoppedListening(ign):
self.failIf(str(p).find(filename) != -1)
return defer.maybeDeferred(p.stopListening).addCallback(stoppedListening)