def testExchange(self):
clientaddr = self.mktemp()
serveraddr = self.mktemp()
sp = ServerProto()
cp = ClientProto()
s = reactor.listenUNIXDatagram(serveraddr, sp)
c = reactor.connectUNIXDatagram(serveraddr, cp, bindAddress = clientaddr)
d = defer.gatherResults([sp.deferredStarted, cp.deferredStarted])
def write(ignored):
cp.transport.write("hi")
return defer.gatherResults([sp.deferredGotWhat,
cp.deferredGotBack])
def cleanup(ignored):
d1 = defer.maybeDeferred(s.stopListening)
d1.addCallback(lambda x : os.unlink(clientaddr))
d2 = defer.maybeDeferred(c.stopListening)
d2.addCallback(lambda x : os.unlink(serveraddr))
return defer.gatherResults([d1, d2])
def _cbTestExchange(ignored):
self.failUnlessEqual("hi", sp.gotwhat)
self.failUnlessEqual(clientaddr, sp.gotfrom)
self.failUnlessEqual("hi back", cp.gotback)
d.addCallback(write)
d.addCallback(cleanup)
d.addCallback(_cbTestExchange)
return d
python类connectUNIXDatagram()的实例源码
def openClientMode(self, iface=''):
try:
self._lport = reactor.connectUNIXDatagram(iface, self)
except Exception:
raise error.CarrierError(sys.exc_info()[1])
return self
def testExchange(self):
clientaddr = self.mktemp()
serveraddr = self.mktemp()
sp = ServerProto()
cp = ClientProto()
s = reactor.listenUNIXDatagram(serveraddr, sp)
c = reactor.connectUNIXDatagram(serveraddr, cp, bindAddress = clientaddr)
d = defer.gatherResults([sp.deferredStarted, cp.deferredStarted])
def write(ignored):
cp.transport.write("hi")
return defer.gatherResults([sp.deferredGotWhat,
cp.deferredGotBack])
def cleanup(ignored):
d1 = defer.maybeDeferred(s.stopListening)
d1.addCallback(lambda x : os.unlink(clientaddr))
d2 = defer.maybeDeferred(c.stopListening)
d2.addCallback(lambda x : os.unlink(serveraddr))
return defer.gatherResults([d1, d2])
def _cbTestExchange(ignored):
self.failUnlessEqual("hi", sp.gotwhat)
self.failUnlessEqual(clientaddr, sp.gotfrom)
self.failUnlessEqual("hi back", cp.gotback)
d.addCallback(write)
d.addCallback(cleanup)
d.addCallback(_cbTestExchange)
return d
def openClientMode(self, iface=''):
try:
self._lport = reactor.connectUNIXDatagram(iface, self)
except Exception:
raise error.CarrierError(sys.exc_info()[1])
return self
def test_exchange(self):
"""
Test that a datagram can be sent to and received by a server and vice
versa.
"""
clientaddr = self.mktemp()
serveraddr = self.mktemp()
sp = ServerProto()
cp = ClientProto()
s = reactor.listenUNIXDatagram(serveraddr, sp)
self.addCleanup(s.stopListening)
c = reactor.connectUNIXDatagram(serveraddr, cp, bindAddress=clientaddr)
self.addCleanup(c.stopListening)
d = defer.gatherResults([sp.deferredStarted, cp.deferredStarted])
def write(ignored):
cp.transport.write(b"hi")
return defer.gatherResults([sp.deferredGotWhat,
cp.deferredGotBack])
def _cbTestExchange(ignored):
self.assertEqual(b"hi", sp.gotwhat)
self.assertEqual(clientaddr, sp.gotfrom)
self.assertEqual(b"hi back", cp.gotback)
d.addCallback(write)
d.addCallback(_cbTestExchange)
return d
def openClientMode(self, iface=''):
try:
self._lport = reactor.connectUNIXDatagram(iface, self)
except Exception:
raise error.CarrierError(sys.exc_info()[1])
return self