def test_reactorParametrizationInClientMultipleStart(self):
"""
Like L{test_reactorParametrizationInClient}, but stop and restart the
service and check that the given reactor is still used.
"""
reactor = MemoryReactor()
factory = protocol.ClientFactory()
t = internet.TCPClient('127.0.0.1', 1234, factory, reactor=reactor)
t.startService()
self.assertEqual(
reactor.tcpClients.pop()[:3], ('127.0.0.1', 1234, factory))
t.stopService()
t.startService()
self.assertEqual(
reactor.tcpClients.pop()[:3], ('127.0.0.1', 1234, factory))
python类ClientFactory()的实例源码
def test_openSSLBuffering(self):
serverProto = self.serverProto = SingleLineServerProtocol()
clientProto = self.clientProto = RecordingClientProtocol()
server = protocol.ServerFactory()
client = self.client = protocol.ClientFactory()
server.protocol = lambda: serverProto
client.protocol = lambda: clientProto
sCTX = ssl.DefaultOpenSSLContextFactory(certPath, certPath)
cCTX = ssl.ClientContextFactory()
port = reactor.listenSSL(0, server, sCTX, interface='127.0.0.1')
self.addCleanup(port.stopListening)
clientConnector = reactor.connectSSL('127.0.0.1', port.getHost().port,
client, cCTX)
self.addCleanup(clientConnector.disconnect)
return clientProto.deferred.addCallback(
self.assertEqual, b"+OK <some crap>\r\n")
def testImmediateDisconnect(self):
org = "twisted.test.test_ssl"
self.setupServerAndClient(
(org, org + ", client"), {},
(org, org + ", server"), {})
# Set up a server, connect to it with a client, which should work since our verifiers
# allow anything, then disconnect.
serverProtocolFactory = protocol.ServerFactory()
serverProtocolFactory.protocol = protocol.Protocol
self.serverPort = serverPort = reactor.listenSSL(0,
serverProtocolFactory, self.serverCtxFactory)
clientProtocolFactory = protocol.ClientFactory()
clientProtocolFactory.protocol = ImmediatelyDisconnectingProtocol
clientProtocolFactory.connectionDisconnected = defer.Deferred()
reactor.connectSSL('127.0.0.1',
serverPort.getHost().port, clientProtocolFactory, self.clientCtxFactory)
return clientProtocolFactory.connectionDisconnected.addCallback(
lambda ignoredResult: self.serverPort.stopListening())
def loopback(self, serverCertOpts, clientCertOpts,
onServerLost=None, onClientLost=None, onData=None):
if onServerLost is None:
self.onServerLost = onServerLost = defer.Deferred()
if onClientLost is None:
self.onClientLost = onClientLost = defer.Deferred()
if onData is None:
onData = defer.Deferred()
serverFactory = protocol.ServerFactory()
serverFactory.protocol = DataCallbackProtocol
serverFactory.onLost = onServerLost
serverFactory.onData = onData
clientFactory = protocol.ClientFactory()
clientFactory.protocol = WritingProtocol
clientFactory.onLost = onClientLost
self.serverPort = reactor.listenSSL(0, serverFactory, serverCertOpts)
self.clientConn = reactor.connectSSL('127.0.0.1',
self.serverPort.getHost().port, clientFactory, clientCertOpts)
def clientConnectionFailed(self, connector, reason):
"""
Fail all pending TCP DNS queries if the TCP connection attempt
fails.
@see: L{twisted.internet.protocol.ClientFactory}
@param connector: Not used.
@type connector: L{twisted.internet.interfaces.IConnector}
@param reason: A C{Failure} containing information about the
cause of the connection failure. This will be passed as the
argument to C{errback} on every pending TCP query
C{deferred}.
@type reason: L{twisted.python.failure.Failure}
"""
# Copy the current pending deferreds then reset the master
# pending list. This prevents triggering new deferreds which
# may be added by callback or errback functions on the current
# deferreds.
pending = self.controller.pending[:]
del self.controller.pending[:]
for d, query, timeout in pending:
d.errback(reason)
def __init__(self, connection):
# ClientFactory does not define __init__() in parent classes
# and does not inherit from object.
self.conn = connection
def buildProtocol(self, addr):
"""
Twisted function that defines which kind of protocol to use
in the ClientFactory.
"""
return TwistedConnectionProtocol()
def buildProtocol(self, addr):
p = protocol.ClientFactory.buildProtocol(self, addr)
if self.timeout:
timeoutCall = reactor.callLater(self.timeout, p.timeout)
self.deferred.addBoth(self._cancelTimeout, timeoutCall)
return p
def testStoppingServer(self):
if not interfaces.IReactorUNIX(reactor, None):
raise unittest.SkipTest, "This reactor does not support UNIX domain sockets"
factory = protocol.ServerFactory()
factory.protocol = wire.Echo
t = internet.UNIXServer('echo.skt', factory)
t.startService()
t.stopService()
self.failIf(t.running)
factory = protocol.ClientFactory()
d = defer.Deferred()
factory.clientConnectionFailed = lambda *args: d.callback(None)
reactor.connectUNIX('echo.skt', factory)
return d
def _runTest(self, clientProto, serverProto, clientIsServer=False):
self.clientProto = clientProto
cf = self.clientFactory = protocol.ClientFactory()
cf.protocol = lambda: clientProto
if clientIsServer:
cf.server = 0
else:
cf.client = 1
self.serverProto = serverProto
sf = self.serverFactory = protocol.ServerFactory()
sf.protocol = lambda: serverProto
if clientIsServer:
sf.client = 0
else:
sf.server = 1
if clientIsServer:
inCharge = cf
else:
inCharge = sf
inCharge.done = 0
port = self.port = reactor.listenTCP(0, sf, interface="127.0.0.1")
portNo = port.getHost().port
reactor.connectTCP('127.0.0.1', portNo, cf)
i = 0
while i < 1000 and not inCharge.done:
reactor.iterate(0.01)
i += 1
self.failUnless(
inCharge.done,
"Never finished reading all lines: %s" % (inCharge.lines,))
def testFailedVerify(self):
org = "twisted.test.test_ssl"
self.setupServerAndClient(
(org, org + ", client"), {},
(org, org + ", server"), {})
def verify(*a):
return False
self.clientCtxFactory.getContext().set_verify(SSL.VERIFY_PEER, verify)
serverConnLost = defer.Deferred()
serverProtocol = protocol.Protocol()
serverProtocol.connectionLost = serverConnLost.callback
serverProtocolFactory = protocol.ServerFactory()
serverProtocolFactory.protocol = lambda: serverProtocol
self.serverPort = serverPort = reactor.listenSSL(0,
serverProtocolFactory, self.serverCtxFactory)
clientConnLost = defer.Deferred()
clientProtocol = protocol.Protocol()
clientProtocol.connectionLost = clientConnLost.callback
clientProtocolFactory = protocol.ClientFactory()
clientProtocolFactory.protocol = lambda: clientProtocol
clientConnector = reactor.connectSSL('127.0.0.1',
serverPort.getHost().port, clientProtocolFactory, self.clientCtxFactory)
dl = defer.DeferredList([serverConnLost, clientConnLost], consumeErrors=True)
return dl.addCallback(self._cbLostConns)
def testPortforward(self):
"""
Test port forwarding through Echo protocol.
"""
realServerFactory = protocol.ServerFactory()
realServerFactory.protocol = lambda: self.serverProtocol
realServerPort = reactor.listenTCP(0, realServerFactory,
interface='127.0.0.1')
self.openPorts.append(realServerPort)
proxyServerFactory = portforward.ProxyFactory('127.0.0.1',
realServerPort.getHost().port)
proxyServerPort = reactor.listenTCP(0, proxyServerFactory,
interface='127.0.0.1')
self.openPorts.append(proxyServerPort)
nBytes = 1000
received = []
d = defer.Deferred()
def testDataReceived(data):
received.extend(data)
if len(received) >= nBytes:
self.assertEquals(''.join(received), 'x' * nBytes)
d.callback(None)
self.clientProtocol.dataReceived = testDataReceived
def testConnectionMade():
self.clientProtocol.transport.write('x' * nBytes)
self.clientProtocol.connectionMade = testConnectionMade
clientFactory = protocol.ClientFactory()
clientFactory.protocol = lambda: self.clientProtocol
reactor.connectTCP(
'127.0.0.1', proxyServerPort.getHost().port, clientFactory)
return d
def buildProtocol(self, *args, **kw):
prot = protocol.ClientFactory.buildProtocol(self, *args, **kw)
prot.setPeer(self.server)
return prot
def client_factory(deferred, error_buffer):
factory = protocol.ClientFactory()
factory.deferred = deferred
factory.error_buffer = error_buffer
factory.protocol = VNCClient
return factory
def closeConnection(self):
"""
Disconnect from the remote SMB/CIFS server. The TCP connection will be closed at the earliest opportunity after this method returns.
:return: None
"""
if not self.instance:
raise NotConnectedError('Not connected to server')
self.instance.transport.loseConnection()
#
# ClientFactory methods
# (Do not touch these unless you know what you are doing)
#
def clientConnectionFailed(self, connector, reason): # ?????ClientFactory???
print("Connection failed goodbye!")
reactor.stop() # ????
def clientConnectionLost(self, connector, reason): # ?????ClientFactory???
print("Connection lost goodbye")
reactor.stop() # ????
def closeConnection(self):
"""
Disconnect from the remote SMB/CIFS server. The TCP connection will be closed at the earliest opportunity after this method returns.
:return: None
"""
if not self.instance:
raise NotConnectedError('Not connected to server')
self.instance.transport.loseConnection()
#
# ClientFactory methods
# (Do not touch these unless you know what you are doing)
#
def buildProtocol(self, addr):
p = protocol.ClientFactory.buildProtocol(self, addr)
if self.timeout:
timeoutCall = reactor.callLater(self.timeout, p.timeout)
self.deferred.addBoth(self._cancelTimeout, timeoutCall)
return p