def connect_unix(factory, config):
'''
Internal implementation that opens unix socket connections for factory
based on config. See connect() for details.
'''
# upgrade config to a list if it has been passed as a single item
config = _listify(config)
# now process the list
connectors = []
for item in config:
if not validate_connection_config(item):
# warn but skip invalid configs
continue
if 'unix' in item:
path = item['unix']
connectors.append(reactor.connectUNIX(path, factory))
return _unlistify(connectors)
python类connectUNIX()的实例源码
def render(self, request):
"""Render this request, from my server.
This will always be asynchronous, and therefore return NOT_DONE_YET.
It spins off a request to the pb client, and either adds it to the list
of pending issues or requests it immediately, depending on if the
client is already connected.
"""
if not self.publisher:
self.pending.append(request)
if not self.waiting:
self.waiting = 1
bf = pb.PBClientFactory()
timeout = 10
if self.host == "unix":
reactor.connectUNIX(self.port, bf, timeout)
else:
reactor.connectTCP(self.host, self.port, bf, timeout)
d = bf.getRootObject()
d.addCallbacks(self.connected, self.notConnected)
else:
i = Issue(request)
self.publisher.callRemote('request', request).addCallbacks(i.finished, i.failed)
return NOT_DONE_YET
def testPIDFile(self):
filename = self.mktemp()
f = Factory(self, filename)
l = reactor.listenUNIX(filename, f, mode = 0600, wantPID=1)
self.failUnless(lockfile.isLocked(filename + ".lock"))
tcf = TestClientFactory(self, filename)
c = reactor.connectUNIX(filename, tcf, checkPID=1)
d = defer.gatherResults([f.deferred, tcf.deferred])
def _portStuff(ignored):
self._addPorts(l, c.transport, tcf.protocol.transport,
f.protocol.transport)
return self.cleanPorts(*self.ports)
def _check(ignored):
self.failIf(lockfile.isLocked(filename + ".lock"), 'locked')
d.addCallback(_portStuff)
d.addCallback(_check)
return d
def loopbackUNIX(server, client, noisy=True):
"""Run session between server and client protocol instances over UNIX socket."""
path = tempfile.mktemp()
from twisted.internet import reactor
f = policies.WrappingFactory(protocol.Factory())
serverWrapper = _FireOnClose(f, server)
f.noisy = noisy
f.buildProtocol = lambda addr: serverWrapper
serverPort = reactor.listenUNIX(path, f)
clientF = LoopbackClientFactory(client)
clientF.noisy = noisy
reactor.connectUNIX(path, clientF)
d = clientF.deferred
d.addCallback(lambda x: serverWrapper.deferred)
d.addCallback(lambda x: serverPort.stopListening())
return d
def test_reconnect(self):
"""
If the connection is lost, the L{RemoteObject} created by the factory
will transparently handle the reconnection.
"""
self.client.factor = 0.01 # Try reconnecting very quickly
connector = reactor.connectUNIX(self.socket, self.client)
remote = yield self.client.getRemoteObject()
# Disconnect and wait till we connect again
deferred = Deferred()
self.client.notifyOnConnect(deferred.callback)
connector.disconnect()
yield deferred
# The remote object is still working
result = yield remote.method("john")
self.assertEqual(result, "John")
self.client.stopTrying()
connector.disconnect()
def test_retry(self):
"""
If the connection is lost, the L{RemoteObject} created by the creator
will transparently retry to perform the L{MethodCall} requests that
failed due to the broken connection.
"""
self.client.factor = 0.01 # Try reconnecting very quickly
self.client.retryOnReconnect = True
connector = reactor.connectUNIX(self.socket, self.client)
remote = yield self.client.getRemoteObject()
# Disconnect
connector.disconnect()
# This call will fail but it's transparently retried
result = yield remote.method("john")
self.assertEqual(result, "John")
self.client.stopTrying()
connector.disconnect()
def test_retry_with_method_call_error(self):
"""
If a retried L{MethodCall} request fails due to a L{MethodCallError},
the L{RemoteObject} will properly propagate the error to the original
caller.
"""
self.methods.remove("method")
self.client.factor = 0.01 # Try reconnecting very quickly
self.client.retryOnReconnect = True
connector = reactor.connectUNIX(self.socket, self.client)
remote = yield self.client.getRemoteObject()
# Disconnect
connector.disconnect()
# A method call error is not retried
yield self.assertFailure(remote.method(), MethodCallError)
self.client.stopTrying()
connector.disconnect()
def test_retry_with_many_method_calls(self):
"""
If several L{MethodCall} requests were issued while disconnected, they
will be all eventually completed when the connection gets established
again.
"""
self.client.factor = 0.01 # Try reconnecting very quickly
self.client.retryOnReconnect = True
connector = reactor.connectUNIX(self.socket, self.client)
remote = yield self.client.getRemoteObject()
# Disconnect
connector.disconnect()
result1 = yield remote.method("john")
result2 = yield remote.method("bill")
self.assertEqual(result1, "John")
self.assertEqual(result2, "Bill")
self.client.stopTrying()
connector.disconnect()
def test_retry_without_retry_on_reconnect(self):
"""
If C{retryOnReconnect} is C{False}, the L{RemoteObject} object won't
retry to perform requests which failed because the connection was
lost, however requests made after a reconnection will still succeed.
"""
self.client.factor = 0.01 # Try reconnecting very quickly
connector = reactor.connectUNIX(self.socket, self.client)
remote = yield self.client.getRemoteObject()
# Disconnect
deferred = Deferred()
self.client.notifyOnConnect(deferred.callback)
connector.disconnect()
yield self.assertFailure(remote.modt(), ConnectionDone)
# Wait for reconnection and peform another call
yield deferred
result = yield remote.method("john")
self.assertEqual(result, "John")
self.client.stopTrying()
connector.disconnect()
def test_retry_with_timeout(self):
"""
If a C{retryTimeout} is set, the L{RemoteObject} object will errback
failed L{MethodCall}s after that amount of seconds, without retrying
them when the connection established again.
"""
self.client.retryOnReconnect = True
self.client.retryTimeout = 0.1
self.client.factor = 1 # Reconnect slower than timeout
connector = reactor.connectUNIX(self.socket, self.client)
remote = yield self.client.getRemoteObject()
# Disconnect
connector.disconnect()
error = yield self.assertFailure(remote.method("foo"), MethodCallError)
self.assertEqual("timeout", str(error))
self.client.stopTrying()
connector.disconnect()
def render(self, request):
"""Render this request, from my server.
This will always be asynchronous, and therefore return NOT_DONE_YET.
It spins off a request to the pb client, and either adds it to the list
of pending issues or requests it immediately, depending on if the
client is already connected.
"""
if not self.publisher:
self.pending.append(request)
if not self.waiting:
self.waiting = 1
bf = pb.PBClientFactory()
timeout = 10
if self.host == "unix":
reactor.connectUNIX(self.port, bf, timeout)
else:
reactor.connectTCP(self.host, self.port, bf, timeout)
d = bf.getRootObject()
d.addCallbacks(self.connected, self.notConnected)
else:
i = Issue(request)
self.publisher.callRemote('request', request).addCallbacks(i.finished, i.failed)
return NOT_DONE_YET
def testPIDFile(self):
filename = self.mktemp()
f = Factory(self, filename)
l = reactor.listenUNIX(filename, f, mode = 0600, wantPID=1)
self.failUnless(lockfile.isLocked(filename + ".lock"))
tcf = TestClientFactory(self, filename)
c = reactor.connectUNIX(filename, tcf, checkPID=1)
d = defer.gatherResults([f.deferred, tcf.deferred])
def _portStuff(ignored):
self._addPorts(l, c.transport, tcf.protocol.transport,
f.protocol.transport)
return self.cleanPorts(*self.ports)
def _check(ignored):
self.failIf(lockfile.isLocked(filename + ".lock"), 'locked')
d.addCallback(_portStuff)
d.addCallback(_check)
return d
def loopbackUNIX(server, client, noisy=True):
"""Run session between server and client protocol instances over UNIX socket."""
path = tempfile.mktemp()
from twisted.internet import reactor
f = policies.WrappingFactory(protocol.Factory())
serverWrapper = _FireOnClose(f, server)
f.noisy = noisy
f.buildProtocol = lambda addr: serverWrapper
serverPort = reactor.listenUNIX(path, f)
clientF = LoopbackClientFactory(client)
clientF.noisy = noisy
reactor.connectUNIX(path, clientF)
d = clientF.deferred
d.addCallback(lambda x: serverWrapper.deferred)
d.addCallback(lambda x: serverPort.stopListening())
return d
def render(self, request):
"""Render this request, from my server.
This will always be asynchronous, and therefore return NOT_DONE_YET.
It spins off a request to the pb client, and either adds it to the list
of pending issues or requests it immediately, depending on if the
client is already connected.
"""
if not self.publisher:
self.pending.append(request)
if not self.waiting:
self.waiting = 1
bf = pb.PBClientFactory()
timeout = 10
if self.host == "unix":
reactor.connectUNIX(self.port, bf, timeout)
else:
reactor.connectTCP(self.host, self.port, bf, timeout)
d = bf.getRootObject()
d.addCallbacks(self.connected, self.notConnected)
else:
i = Issue(request)
self.publisher.callRemote('request', request).addCallbacks(i.finished, i.failed)
return server.NOT_DONE_YET
def connect(host, port, options, verifyHostKey, userAuthObject):
if options['nocache']:
return defer.fail(ConchError('not using connection caching'))
d = defer.Deferred()
filename = os.path.expanduser("~/.conch-%s-%s-%i" % (userAuthObject.user, host, port))
factory = SSHUnixClientFactory(d, options, userAuthObject)
reactor.connectUNIX(filename, factory, timeout=2, checkPID=1)
return d
def testDumber(self):
filename = self.mktemp()
f = Factory(self, filename)
l = reactor.listenUNIX(filename, f)
tcf = TestClientFactory(self, filename)
c = reactor.connectUNIX(filename, tcf)
d = defer.gatherResults([f.deferred, tcf.deferred])
d.addCallback(lambda x : self._addPorts(l, c.transport,
tcf.protocol.transport,
f.protocol.transport))
return d
def testMode(self):
filename = self.mktemp()
f = Factory(self, filename)
l = reactor.listenUNIX(filename, f, mode = 0600)
self.assertEquals(stat.S_IMODE(os.stat(filename)[0]), 0600)
tcf = TestClientFactory(self, filename)
c = reactor.connectUNIX(filename, tcf)
self._addPorts(l, c.transport)
def testStoppingServer(self):
if not interfaces.IReactorUNIX(reactor, None):
raise unittest.SkipTest, "This reactor does not support UNIX domain sockets"
factory = protocol.ServerFactory()
factory.protocol = wire.Echo
t = internet.UNIXServer('echo.skt', factory)
t.startService()
t.stopService()
self.failIf(t.running)
factory = protocol.ClientFactory()
d = defer.Deferred()
factory.clientConnectionFailed = lambda *args: d.callback(None)
reactor.connectUNIX('echo.skt', factory)
return d
def test_connect(self):
"""
The L{RemoteObject} resulting form the deferred returned by
L{MethodCallClientFactory.getRemoteObject} is properly connected
to the remote peer.
"""
connector = reactor.connectUNIX(self.socket, self.client)
remote = yield self.client.getRemoteObject()
result = yield remote.method("john")
self.assertEqual(result, "John")
self.client.stopTrying()
connector.disconnect()
def test_connect_with_max_retries(self):
"""
If L{MethodCallClientFactory.maxRetries} is set, then the factory
will give up trying to connect after that amout of times.
"""
self.port.stopListening()
self.client.maxRetries = 0
reactor.connectUNIX(self.socket, self.client)
yield self.assertFailure(self.client.getRemoteObject(), ConnectError)
def connect(host, port, options, verifyHostKey, userAuthObject):
if options['nocache']:
return defer.fail(ConchError('not using connection caching'))
d = defer.Deferred()
filename = os.path.expanduser("~/.conch-%s-%s-%i" % (userAuthObject.user, host, port))
factory = SSHUnixClientFactory(d, options, userAuthObject)
reactor.connectUNIX(filename, factory, timeout=2, checkPID=1)
return d
def testDumber(self):
filename = self.mktemp()
f = Factory(self, filename)
l = reactor.listenUNIX(filename, f)
tcf = TestClientFactory(self, filename)
c = reactor.connectUNIX(filename, tcf)
d = defer.gatherResults([f.deferred, tcf.deferred])
d.addCallback(lambda x : self._addPorts(l, c.transport,
tcf.protocol.transport,
f.protocol.transport))
return d
def testMode(self):
filename = self.mktemp()
f = Factory(self, filename)
l = reactor.listenUNIX(filename, f, mode = 0600)
self.assertEquals(stat.S_IMODE(os.stat(filename)[0]), 0600)
tcf = TestClientFactory(self, filename)
c = reactor.connectUNIX(filename, tcf)
self._addPorts(l, c.transport)
def testStoppingServer(self):
if not interfaces.IReactorUNIX(reactor, None):
raise unittest.SkipTest, "This reactor does not support UNIX domain sockets"
factory = protocol.ServerFactory()
factory.protocol = wire.Echo
t = internet.UNIXServer('echo.skt', factory)
t.startService()
t.stopService()
self.failIf(t.running)
factory = protocol.ClientFactory()
d = defer.Deferred()
factory.clientConnectionFailed = lambda *args: d.callback(None)
reactor.connectUNIX('echo.skt', factory)
return d
def test_dumber(self):
"""
L{IReactorUNIX.connectUNIX} can be used to connect a client to a server
started with L{IReactorUNIX.listenUNIX}.
"""
filename = self.mktemp()
serverFactory = MyServerFactory()
serverConnMade = defer.Deferred()
serverFactory.protocolConnectionMade = serverConnMade
unixPort = reactor.listenUNIX(filename, serverFactory)
self.addCleanup(unixPort.stopListening)
clientFactory = MyClientFactory()
clientConnMade = defer.Deferred()
clientFactory.protocolConnectionMade = clientConnMade
reactor.connectUNIX(filename, clientFactory)
d = defer.gatherResults([serverConnMade, clientConnMade])
def allConnected(args):
serverProtocol, clientProtocol = args
# Incidental assertion which may or may not be redundant with some
# other test. This probably deserves its own test method.
self.assertEqual(clientFactory.peerAddresses,
[address.UNIXAddress(filename)])
clientProtocol.transport.loseConnection()
serverProtocol.transport.loseConnection()
d.addCallback(allConnected)
return d
def test_pidFile(self):
"""
A lockfile is created and locked when L{IReactorUNIX.listenUNIX} is
called and released when the Deferred returned by the L{IListeningPort}
provider's C{stopListening} method is called back.
"""
filename = self.mktemp()
serverFactory = MyServerFactory()
serverConnMade = defer.Deferred()
serverFactory.protocolConnectionMade = serverConnMade
unixPort = reactor.listenUNIX(filename, serverFactory, wantPID=True)
self.assertTrue(lockfile.isLocked(filename + ".lock"))
# XXX This part would test something about the checkPID parameter, but
# it doesn't actually. It should be rewritten to test the several
# different possible behaviors. -exarkun
clientFactory = MyClientFactory()
clientConnMade = defer.Deferred()
clientFactory.protocolConnectionMade = clientConnMade
reactor.connectUNIX(filename, clientFactory, checkPID=1)
d = defer.gatherResults([serverConnMade, clientConnMade])
def _portStuff(args):
serverProtocol, clientProto = args
# Incidental assertion which may or may not be redundant with some
# other test. This probably deserves its own test method.
self.assertEqual(clientFactory.peerAddresses,
[address.UNIXAddress(filename)])
clientProto.transport.loseConnection()
serverProtocol.transport.loseConnection()
return unixPort.stopListening()
d.addCallback(_portStuff)
def _check(ignored):
self.assertFalse(lockfile.isLocked(filename + ".lock"), 'locked')
d.addCallback(_check)
return d
def test_connectToUncleanServer(self):
"""
If passed C{True} for the C{checkPID} parameter, a client connection
attempt made with L{IReactorUNIX.connectUNIX} fails with
L{error.BadFileError}.
"""
def ranStupidChild(ign):
d = defer.Deferred()
f = FailedConnectionClientFactory(d)
reactor.connectUNIX(self.filename, f, checkPID=True)
return self.assertFailure(d, error.BadFileError)
return self._uncleanSocketTest(ranStupidChild)
def testStoppingServer(self):
factory = protocol.ServerFactory()
factory.protocol = wire.Echo
t = internet.UNIXServer('echo.skt', factory)
t.startService()
t.stopService()
self.assertFalse(t.running)
factory = protocol.ClientFactory()
d = defer.Deferred()
factory.clientConnectionFailed = lambda *args: d.callback(None)
reactor.connectUNIX('echo.skt', factory)
return d
def connect(self):
if self._node["type"] == "tcp":
reactor.connectTCP(self._node["host"], self._node["port"], self)
elif self._node["type"] == "unix":
reactor.connectUNIX(self._node["addr"], self)
else:
log.err("Unknown connect type")
self.getRootObject().addCallback(self._proxy.insert, self._node["name"])