python类connectUNIXDatagram()的实例源码

test_unix.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testExchange(self):
        clientaddr = self.mktemp()
        serveraddr = self.mktemp()
        sp = ServerProto()
        cp = ClientProto()
        s = reactor.listenUNIXDatagram(serveraddr, sp)
        c = reactor.connectUNIXDatagram(serveraddr, cp, bindAddress = clientaddr)

        d = defer.gatherResults([sp.deferredStarted, cp.deferredStarted])
        def write(ignored):
            cp.transport.write("hi")
            return defer.gatherResults([sp.deferredGotWhat,
                                        cp.deferredGotBack])

        def cleanup(ignored):
            d1 = defer.maybeDeferred(s.stopListening)
            d1.addCallback(lambda x : os.unlink(clientaddr))
            d2 = defer.maybeDeferred(c.stopListening)
            d2.addCallback(lambda x : os.unlink(serveraddr))
            return defer.gatherResults([d1, d2])

        def _cbTestExchange(ignored):
            self.failUnlessEqual("hi", sp.gotwhat)
            self.failUnlessEqual(clientaddr, sp.gotfrom)
            self.failUnlessEqual("hi back", cp.gotback)

        d.addCallback(write)
        d.addCallback(cleanup)
        d.addCallback(_cbTestExchange)
        return d
unix.py 文件源码 项目:pysnmp 作者: etingof 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def openClientMode(self, iface=''):
        try:
            self._lport = reactor.connectUNIXDatagram(iface, self)
        except Exception:
            raise error.CarrierError(sys.exc_info()[1])
        return self
test_unix.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testExchange(self):
        clientaddr = self.mktemp()
        serveraddr = self.mktemp()
        sp = ServerProto()
        cp = ClientProto()
        s = reactor.listenUNIXDatagram(serveraddr, sp)
        c = reactor.connectUNIXDatagram(serveraddr, cp, bindAddress = clientaddr)

        d = defer.gatherResults([sp.deferredStarted, cp.deferredStarted])
        def write(ignored):
            cp.transport.write("hi")
            return defer.gatherResults([sp.deferredGotWhat,
                                        cp.deferredGotBack])

        def cleanup(ignored):
            d1 = defer.maybeDeferred(s.stopListening)
            d1.addCallback(lambda x : os.unlink(clientaddr))
            d2 = defer.maybeDeferred(c.stopListening)
            d2.addCallback(lambda x : os.unlink(serveraddr))
            return defer.gatherResults([d1, d2])

        def _cbTestExchange(ignored):
            self.failUnlessEqual("hi", sp.gotwhat)
            self.failUnlessEqual(clientaddr, sp.gotfrom)
            self.failUnlessEqual("hi back", cp.gotback)

        d.addCallback(write)
        d.addCallback(cleanup)
        d.addCallback(_cbTestExchange)
        return d
unix.py 文件源码 项目:kekescan 作者: xiaoxiaoleo 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def openClientMode(self, iface=''):
        try:
            self._lport = reactor.connectUNIXDatagram(iface, self)
        except Exception:
            raise error.CarrierError(sys.exc_info()[1])
        return self
test_unix.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_exchange(self):
        """
        Test that a datagram can be sent to and received by a server and vice
        versa.
        """
        clientaddr = self.mktemp()
        serveraddr = self.mktemp()
        sp = ServerProto()
        cp = ClientProto()
        s = reactor.listenUNIXDatagram(serveraddr, sp)
        self.addCleanup(s.stopListening)
        c = reactor.connectUNIXDatagram(serveraddr, cp, bindAddress=clientaddr)
        self.addCleanup(c.stopListening)

        d = defer.gatherResults([sp.deferredStarted, cp.deferredStarted])
        def write(ignored):
            cp.transport.write(b"hi")
            return defer.gatherResults([sp.deferredGotWhat,
                                        cp.deferredGotBack])

        def _cbTestExchange(ignored):
            self.assertEqual(b"hi", sp.gotwhat)
            self.assertEqual(clientaddr, sp.gotfrom)
            self.assertEqual(b"hi back", cp.gotback)

        d.addCallback(write)
        d.addCallback(_cbTestExchange)
        return d
unix.py 文件源码 项目:PyQYT 作者: collinsctk 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def openClientMode(self, iface=''):
        try:
            self._lport = reactor.connectUNIXDatagram(iface, self)
        except Exception:
            raise error.CarrierError(sys.exc_info()[1])
        return self


问题


面经


文章

微信
公众号

扫码关注公众号