def testPeerBind(self):
"""assert the remote endpoint (getPeer) on the receiving end matches
the local endpoint (bind) on the connecting end, for unix sockets"""
filename = self.mktemp()
peername = self.mktemp()
f = Factory(self, filename, peername=peername)
l = reactor.listenUNIX(filename, f)
self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self._sock.bind(peername)
self._sock.connect(filename)
d = f.deferred
def done(x):
self._addPorts(l)
self._sock.close()
del self._sock
return x
d.addBoth(done)
return d
评论列表
文章目录