def listen_unix(factory, config):
'''
Internal implementation that opens unix socket listeners for factory based
on config. See listen() for details.
'''
# upgrade config to a list if it has been passed as a single item
config = _listify(config)
# now process the list
listeners = []
for item in config:
if not validate_connection_config(item):
# warn but skip invalid configs
continue
if 'unix' in item:
path = item['unix']
listeners.append(reactor.listenUNIX(path, factory))
return _unlistify(listeners)
python类listenUNIX()的实例源码
def setService(self, service):
log.msg('setting client server to %s' % service)
transport.SSHClientTransport.setService(self, service)
if service.name != 'ssh-userauth' and self.factory.d:
d = self.factory.d
self.factory.d = None
d.callback(None)
if service.name == 'ssh-connection':
# listen for UNIX
if not self.factory.options['nocache']:
user = self.factory.userAuthObject.user
peer = self.transport.getPeer()
filename = os.path.expanduser("~/.conch-%s-%s-%i" % (user, peer.host, peer.port))
try:
u = unix.SSHUnixServerFactory(service)
try:
os.unlink(filename)
except OSError:
pass
self.unixServer = reactor.listenUNIX(filename, u, mode=0600, wantPID=1)
except Exception, e:
log.msg('error trying to listen on %s' % filename)
log.err(e)
def testPeerBind(self):
"""assert the remote endpoint (getPeer) on the receiving end matches
the local endpoint (bind) on the connecting end, for unix sockets"""
filename = self.mktemp()
peername = self.mktemp()
f = Factory(self, filename, peername=peername)
l = reactor.listenUNIX(filename, f)
self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self._sock.bind(peername)
self._sock.connect(filename)
d = f.deferred
def done(x):
self._addPorts(l)
self._sock.close()
del self._sock
return x
d.addBoth(done)
return d
def testPIDFile(self):
filename = self.mktemp()
f = Factory(self, filename)
l = reactor.listenUNIX(filename, f, mode = 0600, wantPID=1)
self.failUnless(lockfile.isLocked(filename + ".lock"))
tcf = TestClientFactory(self, filename)
c = reactor.connectUNIX(filename, tcf, checkPID=1)
d = defer.gatherResults([f.deferred, tcf.deferred])
def _portStuff(ignored):
self._addPorts(l, c.transport, tcf.protocol.transport,
f.protocol.transport)
return self.cleanPorts(*self.ports)
def _check(ignored):
self.failIf(lockfile.isLocked(filename + ".lock"), 'locked')
d.addCallback(_portStuff)
d.addCallback(_check)
return d
def loopbackUNIX(server, client, noisy=True):
"""Run session between server and client protocol instances over UNIX socket."""
path = tempfile.mktemp()
from twisted.internet import reactor
f = policies.WrappingFactory(protocol.Factory())
serverWrapper = _FireOnClose(f, server)
f.noisy = noisy
f.buildProtocol = lambda addr: serverWrapper
serverPort = reactor.listenUNIX(path, f)
clientF = LoopbackClientFactory(client)
clientF.noisy = noisy
reactor.connectUNIX(path, clientF)
d = clientF.deferred
d.addCallback(lambda x: serverWrapper.deferred)
d.addCallback(lambda x: serverPort.stopListening())
return d
def setService(self, service):
log.msg('setting client server to %s' % service)
transport.SSHClientTransport.setService(self, service)
if service.name != 'ssh-userauth' and self.factory.d:
d = self.factory.d
self.factory.d = None
d.callback(None)
if service.name == 'ssh-connection':
# listen for UNIX
if not self.factory.options['nocache']:
user = self.factory.userAuthObject.user
peer = self.transport.getPeer()
filename = os.path.expanduser("~/.conch-%s-%s-%i" % (user, peer.host, peer.port))
try:
u = unix.SSHUnixServerFactory(service)
try:
os.unlink(filename)
except OSError:
pass
self.unixServer = reactor.listenUNIX(filename, u, mode=0600, wantPID=1)
except Exception, e:
log.msg('error trying to listen on %s' % filename)
log.err(e)
def testPeerBind(self):
"""assert the remote endpoint (getPeer) on the receiving end matches
the local endpoint (bind) on the connecting end, for unix sockets"""
filename = self.mktemp()
peername = self.mktemp()
f = Factory(self, filename, peername=peername)
l = reactor.listenUNIX(filename, f)
self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self._sock.bind(peername)
self._sock.connect(filename)
d = f.deferred
def done(x):
self._addPorts(l)
self._sock.close()
del self._sock
return x
d.addBoth(done)
return d
def testPIDFile(self):
filename = self.mktemp()
f = Factory(self, filename)
l = reactor.listenUNIX(filename, f, mode = 0600, wantPID=1)
self.failUnless(lockfile.isLocked(filename + ".lock"))
tcf = TestClientFactory(self, filename)
c = reactor.connectUNIX(filename, tcf, checkPID=1)
d = defer.gatherResults([f.deferred, tcf.deferred])
def _portStuff(ignored):
self._addPorts(l, c.transport, tcf.protocol.transport,
f.protocol.transport)
return self.cleanPorts(*self.ports)
def _check(ignored):
self.failIf(lockfile.isLocked(filename + ".lock"), 'locked')
d.addCallback(_portStuff)
d.addCallback(_check)
return d
def loopbackUNIX(server, client, noisy=True):
"""Run session between server and client protocol instances over UNIX socket."""
path = tempfile.mktemp()
from twisted.internet import reactor
f = policies.WrappingFactory(protocol.Factory())
serverWrapper = _FireOnClose(f, server)
f.noisy = noisy
f.buildProtocol = lambda addr: serverWrapper
serverPort = reactor.listenUNIX(path, f)
clientF = LoopbackClientFactory(client)
clientF.noisy = noisy
reactor.connectUNIX(path, clientF)
d = clientF.deferred
d.addCallback(lambda x: serverWrapper.deferred)
d.addCallback(lambda x: serverPort.stopListening())
return d
def test_peerBind(self):
"""
The address passed to the server factory's C{buildProtocol} method and
the address returned by the connected protocol's transport's C{getPeer}
method match the address the client socket is bound to.
"""
filename = self.mktemp()
peername = self.mktemp()
serverFactory = MyServerFactory()
connMade = serverFactory.protocolConnectionMade = defer.Deferred()
unixPort = reactor.listenUNIX(filename, serverFactory)
self.addCleanup(unixPort.stopListening)
unixSocket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.addCleanup(unixSocket.close)
unixSocket.bind(peername)
unixSocket.connect(filename)
def cbConnMade(proto):
expected = address.UNIXAddress(peername)
self.assertEqual(serverFactory.peerAddresses, [expected])
self.assertEqual(proto.transport.getPeer(), expected)
connMade.addCallback(cbConnMade)
return connMade
def test_socketLocking(self):
"""
L{IReactorUNIX.listenUNIX} raises L{error.CannotListenError} if passed
the name of a file on which a server is already listening.
"""
filename = self.mktemp()
serverFactory = MyServerFactory()
unixPort = reactor.listenUNIX(filename, serverFactory, wantPID=True)
self.assertRaises(
error.CannotListenError,
reactor.listenUNIX, filename, serverFactory, wantPID=True)
def stoppedListening(ign):
unixPort = reactor.listenUNIX(filename, serverFactory, wantPID=True)
return unixPort.stopListening()
return unixPort.stopListening().addCallback(stoppedListening)
def _reprTest(self, serverFactory, factoryName):
"""
Test the C{__str__} and C{__repr__} implementations of a UNIX port when
used with the given factory.
"""
filename = self.mktemp()
unixPort = reactor.listenUNIX(filename, serverFactory)
connectedString = "<%s on %r>" % (factoryName, filename)
self.assertEqual(repr(unixPort), connectedString)
self.assertEqual(str(unixPort), connectedString)
d = defer.maybeDeferred(unixPort.stopListening)
def stoppedListening(ign):
unconnectedString = "<%s (not listening)>" % (factoryName,)
self.assertEqual(repr(unixPort), unconnectedString)
self.assertEqual(str(unixPort), unconnectedString)
d.addCallback(stoppedListening)
return d
def test_reprWithClassicFactory(self):
"""
The two string representations of the L{IListeningPort} returned by
L{IReactorUNIX.listenUNIX} contains the name of the classic factory
class being used and the filename on which the port is listening or
indicates that the port is not listening.
"""
class ClassicFactory:
def doStart(self):
pass
def doStop(self):
pass
# Sanity check
self.assertIsInstance(ClassicFactory, types.ClassType)
return self._reprTest(
ClassicFactory(), "twisted.test.test_unix.ClassicFactory")
def test_reprWithNewStyleFactory(self):
"""
The two string representations of the L{IListeningPort} returned by
L{IReactorUNIX.listenUNIX} contains the name of the new-style factory
class being used and the filename on which the port is listening or
indicates that the port is not listening.
"""
class NewStyleFactory(object):
def doStart(self):
pass
def doStop(self):
pass
# Sanity check
self.assertIsInstance(NewStyleFactory, type)
return self._reprTest(
NewStyleFactory(), "twisted.test.test_unix.NewStyleFactory")
def loopbackUNIX(server, client, noisy=True):
"""Run session between server and client protocol instances over UNIX socket."""
path = tempfile.mktemp()
from twisted.internet import reactor
f = policies.WrappingFactory(protocol.Factory())
serverWrapper = _FireOnClose(f, server)
f.noisy = noisy
f.buildProtocol = lambda addr: serverWrapper
serverPort = reactor.listenUNIX(path, f)
clientF = LoopbackClientFactory(client)
clientF.noisy = noisy
reactor.connectUNIX(path, clientF)
d = clientF.deferred
d.addCallback(lambda x: serverWrapper.deferred)
d.addCallback(lambda x: serverPort.stopListening())
return d
def testDumber(self):
filename = self.mktemp()
f = Factory(self, filename)
l = reactor.listenUNIX(filename, f)
tcf = TestClientFactory(self, filename)
c = reactor.connectUNIX(filename, tcf)
d = defer.gatherResults([f.deferred, tcf.deferred])
d.addCallback(lambda x : self._addPorts(l, c.transport,
tcf.protocol.transport,
f.protocol.transport))
return d
def testMode(self):
filename = self.mktemp()
f = Factory(self, filename)
l = reactor.listenUNIX(filename, f, mode = 0600)
self.assertEquals(stat.S_IMODE(os.stat(filename)[0]), 0600)
tcf = TestClientFactory(self, filename)
c = reactor.connectUNIX(filename, tcf)
self._addPorts(l, c.transport)
def _uncleanSocketTest(self, callback):
self.filename = self.mktemp()
source = ("from twisted.internet import protocol, reactor\n"
"reactor.listenUNIX(%r, protocol.ServerFactory(), wantPID=True)\n") % (self.filename,)
env = {'PYTHONPATH': os.pathsep.join(sys.path)}
d = utils.getProcessOutput(sys.executable, ("-u", "-c", source), env=env)
d.addCallback(callback)
return d
def testUncleanServerSocketLocking(self):
def ranStupidChild(ign):
# If this next call succeeds, our lock handling is correct.
p = reactor.listenUNIX(self.filename, Factory(self, self.filename), wantPID=True)
return p.stopListening()
return self._uncleanSocketTest(ranStupidChild)
def testRepr(self):
filename = self.mktemp()
f = Factory(self, filename)
p = reactor.listenUNIX(filename, f)
self.failIf(str(p).find(filename) == -1)
def stoppedListening(ign):
self.failIf(str(p).find(filename) != -1)
return defer.maybeDeferred(p.stopListening).addCallback(stoppedListening)
def autostart(reason, **kwargs):
if reason == 0:
from twisted.internet import reactor
try:
os.remove("/tmp/hotplug.socket")
except OSError:
pass
factory = Factory()
factory.protocol = Hotplug
reactor.listenUNIX("/tmp/hotplug.socket", factory)
def autostart(reason, **kwargs):
if reason == 0:
from twisted.internet import reactor
try:
os.remove("/tmp/hotplug.socket")
except OSError:
pass
factory = Factory()
factory.protocol = Hotplug
reactor.listenUNIX("/tmp/hotplug.socket", factory)
def setUp(self):
super(MethodCallFunctionalTest, self).setUp()
self.methods = ["method"]
self.object = DummyObject()
self.object.method = lambda word: word.capitalize()
self.socket = self.mktemp()
self.server = MethodCallServerFactory(self.object, self.methods)
self.client = MethodCallClientFactory(reactor)
self.port = reactor.listenUNIX(self.socket, self.server)
def testDumber(self):
filename = self.mktemp()
f = Factory(self, filename)
l = reactor.listenUNIX(filename, f)
tcf = TestClientFactory(self, filename)
c = reactor.connectUNIX(filename, tcf)
d = defer.gatherResults([f.deferred, tcf.deferred])
d.addCallback(lambda x : self._addPorts(l, c.transport,
tcf.protocol.transport,
f.protocol.transport))
return d
def testMode(self):
filename = self.mktemp()
f = Factory(self, filename)
l = reactor.listenUNIX(filename, f, mode = 0600)
self.assertEquals(stat.S_IMODE(os.stat(filename)[0]), 0600)
tcf = TestClientFactory(self, filename)
c = reactor.connectUNIX(filename, tcf)
self._addPorts(l, c.transport)
def _uncleanSocketTest(self, callback):
self.filename = self.mktemp()
source = ("from twisted.internet import protocol, reactor\n"
"reactor.listenUNIX(%r, protocol.ServerFactory(), wantPID=True)\n") % (self.filename,)
env = {'PYTHONPATH': os.pathsep.join(sys.path)}
d = utils.getProcessOutput(sys.executable, ("-u", "-c", source), env=env)
d.addCallback(callback)
return d
def testUncleanServerSocketLocking(self):
def ranStupidChild(ign):
# If this next call succeeds, our lock handling is correct.
p = reactor.listenUNIX(self.filename, Factory(self, self.filename), wantPID=True)
return p.stopListening()
return self._uncleanSocketTest(ranStupidChild)
def testRepr(self):
filename = self.mktemp()
f = Factory(self, filename)
p = reactor.listenUNIX(filename, f)
self.failIf(str(p).find(filename) == -1)
def stoppedListening(ign):
self.failIf(str(p).find(filename) != -1)
return defer.maybeDeferred(p.stopListening).addCallback(stoppedListening)
def autostart(reason, **kwargs):
if reason == 0:
print "[Hotplug] starting hotplug handler"
from twisted.internet import reactor
import os
try:
os.remove("/tmp/hotplug.socket")
except OSError:
pass
factory = Factory()
factory.protocol = Hotplug
reactor.listenUNIX("/tmp/hotplug.socket", factory)
def test_dumber(self):
"""
L{IReactorUNIX.connectUNIX} can be used to connect a client to a server
started with L{IReactorUNIX.listenUNIX}.
"""
filename = self.mktemp()
serverFactory = MyServerFactory()
serverConnMade = defer.Deferred()
serverFactory.protocolConnectionMade = serverConnMade
unixPort = reactor.listenUNIX(filename, serverFactory)
self.addCleanup(unixPort.stopListening)
clientFactory = MyClientFactory()
clientConnMade = defer.Deferred()
clientFactory.protocolConnectionMade = clientConnMade
reactor.connectUNIX(filename, clientFactory)
d = defer.gatherResults([serverConnMade, clientConnMade])
def allConnected(args):
serverProtocol, clientProtocol = args
# Incidental assertion which may or may not be redundant with some
# other test. This probably deserves its own test method.
self.assertEqual(clientFactory.peerAddresses,
[address.UNIXAddress(filename)])
clientProtocol.transport.loseConnection()
serverProtocol.transport.loseConnection()
d.addCallback(allConnected)
return d