def _reprTest(self, serverProto, protocolName):
"""
Test the C{__str__} and C{__repr__} implementations of a UNIX datagram
port when used with the given protocol.
"""
filename = self.mktemp()
unixPort = reactor.listenUNIXDatagram(filename, serverProto)
connectedString = "<%s on %r>" % (protocolName, filename)
self.assertEqual(repr(unixPort), connectedString)
self.assertEqual(str(unixPort), connectedString)
stopDeferred = defer.maybeDeferred(unixPort.stopListening)
def stoppedListening(ign):
unconnectedString = "<%s (not listening)>" % (protocolName,)
self.assertEqual(repr(unixPort), unconnectedString)
self.assertEqual(str(unixPort), unconnectedString)
stopDeferred.addCallback(stoppedListening)
return stopDeferred
python类listenUNIXDatagram()的实例源码
def test_reprWithClassicProtocol(self):
"""
The two string representations of the L{IListeningPort} returned by
L{IReactorUNIXDatagram.listenUNIXDatagram} contains the name of the
classic protocol class being used and the filename on which the port is
listening or indicates that the port is not listening.
"""
class ClassicProtocol:
def makeConnection(self, transport):
pass
def doStop(self):
pass
# Sanity check
self.assertIsInstance(ClassicProtocol, types.ClassType)
return self._reprTest(
ClassicProtocol(), "twisted.test.test_unix.ClassicProtocol")
def test_reprWithNewStyleProtocol(self):
"""
The two string representations of the L{IListeningPort} returned by
L{IReactorUNIXDatagram.listenUNIXDatagram} contains the name of the
new-style protocol class being used and the filename on which the port
is listening or indicates that the port is not listening.
"""
class NewStyleProtocol(object):
def makeConnection(self, transport):
pass
def doStop(self):
pass
# Sanity check
self.assertIsInstance(NewStyleProtocol, type)
return self._reprTest(
NewStyleProtocol(), "twisted.test.test_unix.NewStyleProtocol")
def testExchange(self):
clientaddr = self.mktemp()
serveraddr = self.mktemp()
sp = ServerProto()
cp = ClientProto()
s = reactor.listenUNIXDatagram(serveraddr, sp)
c = reactor.connectUNIXDatagram(serveraddr, cp, bindAddress = clientaddr)
d = defer.gatherResults([sp.deferredStarted, cp.deferredStarted])
def write(ignored):
cp.transport.write("hi")
return defer.gatherResults([sp.deferredGotWhat,
cp.deferredGotBack])
def cleanup(ignored):
d1 = defer.maybeDeferred(s.stopListening)
d1.addCallback(lambda x : os.unlink(clientaddr))
d2 = defer.maybeDeferred(c.stopListening)
d2.addCallback(lambda x : os.unlink(serveraddr))
return defer.gatherResults([d1, d2])
def _cbTestExchange(ignored):
self.failUnlessEqual("hi", sp.gotwhat)
self.failUnlessEqual(clientaddr, sp.gotfrom)
self.failUnlessEqual("hi back", cp.gotback)
d.addCallback(write)
d.addCallback(cleanup)
d.addCallback(_cbTestExchange)
return d
def testCannotListen(self):
addr = self.mktemp()
p = ServerProto()
s = reactor.listenUNIXDatagram(addr, p)
self.failUnlessRaises(error.CannotListenError, reactor.listenUNIXDatagram, addr, p)
s.stopListening()
os.unlink(addr)
# test connecting to bound and connected (somewhere else) address
def testRepr(self):
filename = self.mktemp()
f = ServerProto()
p = reactor.listenUNIXDatagram(filename, f)
self.failIf(str(p).find(filename) == -1)
def stoppedListening(ign):
self.failIf(str(p).find(filename) != -1)
return defer.maybeDeferred(p.stopListening).addCallback(stoppedListening)
def openServerMode(self, iface):
try:
self._lport = reactor.listenUNIXDatagram(iface, self)
except Exception:
raise error.CarrierError(sys.exc_info()[1])
return self
def testExchange(self):
clientaddr = self.mktemp()
serveraddr = self.mktemp()
sp = ServerProto()
cp = ClientProto()
s = reactor.listenUNIXDatagram(serveraddr, sp)
c = reactor.connectUNIXDatagram(serveraddr, cp, bindAddress = clientaddr)
d = defer.gatherResults([sp.deferredStarted, cp.deferredStarted])
def write(ignored):
cp.transport.write("hi")
return defer.gatherResults([sp.deferredGotWhat,
cp.deferredGotBack])
def cleanup(ignored):
d1 = defer.maybeDeferred(s.stopListening)
d1.addCallback(lambda x : os.unlink(clientaddr))
d2 = defer.maybeDeferred(c.stopListening)
d2.addCallback(lambda x : os.unlink(serveraddr))
return defer.gatherResults([d1, d2])
def _cbTestExchange(ignored):
self.failUnlessEqual("hi", sp.gotwhat)
self.failUnlessEqual(clientaddr, sp.gotfrom)
self.failUnlessEqual("hi back", cp.gotback)
d.addCallback(write)
d.addCallback(cleanup)
d.addCallback(_cbTestExchange)
return d
def testCannotListen(self):
addr = self.mktemp()
p = ServerProto()
s = reactor.listenUNIXDatagram(addr, p)
self.failUnlessRaises(error.CannotListenError, reactor.listenUNIXDatagram, addr, p)
s.stopListening()
os.unlink(addr)
# test connecting to bound and connected (somewhere else) address
def testRepr(self):
filename = self.mktemp()
f = ServerProto()
p = reactor.listenUNIXDatagram(filename, f)
self.failIf(str(p).find(filename) == -1)
def stoppedListening(ign):
self.failIf(str(p).find(filename) != -1)
return defer.maybeDeferred(p.stopListening).addCallback(stoppedListening)
def openServerMode(self, iface=None):
try:
self._lport = reactor.listenUNIXDatagram(iface, self)
except Exception:
raise error.CarrierError(sys.exc_info()[1])
return self
def test_exchange(self):
"""
Test that a datagram can be sent to and received by a server and vice
versa.
"""
clientaddr = self.mktemp()
serveraddr = self.mktemp()
sp = ServerProto()
cp = ClientProto()
s = reactor.listenUNIXDatagram(serveraddr, sp)
self.addCleanup(s.stopListening)
c = reactor.connectUNIXDatagram(serveraddr, cp, bindAddress=clientaddr)
self.addCleanup(c.stopListening)
d = defer.gatherResults([sp.deferredStarted, cp.deferredStarted])
def write(ignored):
cp.transport.write(b"hi")
return defer.gatherResults([sp.deferredGotWhat,
cp.deferredGotBack])
def _cbTestExchange(ignored):
self.assertEqual(b"hi", sp.gotwhat)
self.assertEqual(clientaddr, sp.gotfrom)
self.assertEqual(b"hi back", cp.gotback)
d.addCallback(write)
d.addCallback(_cbTestExchange)
return d
def test_cannotListen(self):
"""
L{IReactorUNIXDatagram.listenUNIXDatagram} raises
L{error.CannotListenError} if the unix socket specified is already in
use.
"""
addr = self.mktemp()
p = ServerProto()
s = reactor.listenUNIXDatagram(addr, p)
self.assertRaises(error.CannotListenError,
reactor.listenUNIXDatagram, addr, p)
s.stopListening()
os.unlink(addr)
# test connecting to bound and connected (somewhere else) address
def openServerMode(self, iface=None):
try:
self._lport = reactor.listenUNIXDatagram(iface, self)
except Exception:
raise error.CarrierError(sys.exc_info()[1])
return self