def test_dumber(self):
"""
L{IReactorUNIX.connectUNIX} can be used to connect a client to a server
started with L{IReactorUNIX.listenUNIX}.
"""
filename = self.mktemp()
serverFactory = MyServerFactory()
serverConnMade = defer.Deferred()
serverFactory.protocolConnectionMade = serverConnMade
unixPort = reactor.listenUNIX(filename, serverFactory)
self.addCleanup(unixPort.stopListening)
clientFactory = MyClientFactory()
clientConnMade = defer.Deferred()
clientFactory.protocolConnectionMade = clientConnMade
reactor.connectUNIX(filename, clientFactory)
d = defer.gatherResults([serverConnMade, clientConnMade])
def allConnected(args):
serverProtocol, clientProtocol = args
# Incidental assertion which may or may not be redundant with some
# other test. This probably deserves its own test method.
self.assertEqual(clientFactory.peerAddresses,
[address.UNIXAddress(filename)])
clientProtocol.transport.loseConnection()
serverProtocol.transport.loseConnection()
d.addCallback(allConnected)
return d
评论列表
文章目录