python类Factory()的实例源码

erroneous.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_socketsLeftOpen(self):
        f = protocol.Factory()
        f.protocol = protocol.Protocol
        reactor.listenTCP(0, f)
loopback.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def loopbackTCP(server, client, port=0, noisy=True):
    """Run session between server and client protocol instances over TCP."""
    from twisted.internet import reactor
    f = policies.WrappingFactory(protocol.Factory())
    serverWrapper = _FireOnClose(f, server)
    f.noisy = noisy
    f.buildProtocol = lambda addr: serverWrapper
    serverPort = reactor.listenTCP(port, f, interface='127.0.0.1')
    clientF = LoopbackClientFactory(client)
    clientF.noisy = noisy
    reactor.connectTCP('127.0.0.1', serverPort.getHost().port, clientF)
    d = clientF.deferred
    d.addCallback(lambda x: serverWrapper.deferred)
    d.addCallback(lambda x: serverPort.stopListening())
    return d
pop3testserver.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def main():

    if len(sys.argv) < 2:
        printMessage("POP3 with no messages")
    else:
        args = sys.argv[1:]

        for arg in args:
            processArg(arg)

    f = Factory()
    f.protocol = POP3TestServer
    reactor.listenTCP(PORT, f)
    reactor.run()
plugin.py 文件源码 项目:enigma2 作者: OpenLD 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def autostart(reason, **kwargs):
    if reason == 0:
        from twisted.internet import reactor
        try:
            os.remove("/tmp/hotplug.socket")
        except OSError:
            pass
        factory = Factory()
        factory.protocol = Hotplug
        reactor.listenUNIX("/tmp/hotplug.socket", factory)
test_iscp.py 文件源码 项目:onkyo_serial 作者: blaedd 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def setUp(self):
        factory = protocol.Factory()
        factory.protocol = iscp.ISCP
        self.proto = factory.buildProtocol('/dev/ttyUSB0')
        self.tr = proto_helpers.StringTransport()
        self.proto.makeConnection(self.tr)
        self.tr.clear()
iscp.py 文件源码 项目:onkyo_serial 作者: blaedd 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, eiscp_port=60128):
        """

        Args:
            eiscp_port (int): UDP port to listen for discovery requests on.
        """
        protocol.Factory.__init__(self)
        self.eiscp_port = eiscp_port
plugin.py 文件源码 项目:enigma2 作者: Openeight 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def autostart(reason, **kwargs):
    if reason == 0:
        from twisted.internet import reactor
        try:
            os.remove("/tmp/hotplug.socket")
        except OSError:
            pass
        factory = Factory()
        factory.protocol = Hotplug
        reactor.listenUNIX("/tmp/hotplug.socket", factory)
test_unix.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def testDumber(self):
        filename = self.mktemp()
        f = Factory(self, filename)
        l = reactor.listenUNIX(filename, f)
        tcf = TestClientFactory(self, filename)
        c = reactor.connectUNIX(filename, tcf)
        d = defer.gatherResults([f.deferred, tcf.deferred])
        d.addCallback(lambda x : self._addPorts(l, c.transport,
                                                tcf.protocol.transport,
                                                f.protocol.transport))
        return d
test_unix.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def testMode(self):
        filename = self.mktemp()
        f = Factory(self, filename)
        l = reactor.listenUNIX(filename, f, mode = 0600)
        self.assertEquals(stat.S_IMODE(os.stat(filename)[0]), 0600)
        tcf = TestClientFactory(self, filename)
        c = reactor.connectUNIX(filename, tcf)
        self._addPorts(l, c.transport)
test_unix.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def testSocketLocking(self):
        filename = self.mktemp()
        f = Factory(self, filename)
        l = reactor.listenUNIX(filename, f, wantPID=True)

        self.assertRaises(
            error.CannotListenError,
            reactor.listenUNIX, filename, f, wantPID=True)

        def stoppedListening(ign):
            l = reactor.listenUNIX(filename, f, wantPID=True)
            return l.stopListening()

        return l.stopListening().addCallback(stoppedListening)
test_unix.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def testUncleanServerSocketLocking(self):
        def ranStupidChild(ign):
            # If this next call succeeds, our lock handling is correct.
            p = reactor.listenUNIX(self.filename, Factory(self, self.filename), wantPID=True)
            return p.stopListening()
        return self._uncleanSocketTest(ranStupidChild)
test_unix.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def testRepr(self):
        filename = self.mktemp()
        f = Factory(self, filename)
        p = reactor.listenUNIX(filename, f)
        self.failIf(str(p).find(filename) == -1)

        def stoppedListening(ign):
            self.failIf(str(p).find(filename) != -1)

        return defer.maybeDeferred(p.stopListening).addCallback(stoppedListening)
erroneous.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_socketsLeftOpen(self):
        f = protocol.Factory()
        f.protocol = protocol.Protocol
        reactor.listenTCP(0, f)
loopback.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 49 收藏 0 点赞 0 评论 0
def loopbackTCP(server, client, port=0, noisy=True):
    """Run session between server and client protocol instances over TCP."""
    from twisted.internet import reactor
    f = policies.WrappingFactory(protocol.Factory())
    serverWrapper = _FireOnClose(f, server)
    f.noisy = noisy
    f.buildProtocol = lambda addr: serverWrapper
    serverPort = reactor.listenTCP(port, f, interface='127.0.0.1')
    clientF = LoopbackClientFactory(client)
    clientF.noisy = noisy
    reactor.connectTCP('127.0.0.1', serverPort.getHost().port, clientF)
    d = clientF.deferred
    d.addCallback(lambda x: serverWrapper.deferred)
    d.addCallback(lambda x: serverPort.stopListening())
    return d
pop3testserver.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def main():

    if len(sys.argv) < 2:
        printMessage("POP3 with no messages")
    else:
        args = sys.argv[1:]

        for arg in args:
            processArg(arg)

    f = Factory()
    f.protocol = POP3TestServer
    reactor.listenTCP(PORT, f)
    reactor.run()
rsa_sigs.py 文件源码 项目:modernsectopics 作者: cnsatuva 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def main():
    logging.basicConfig(filename='rsa.log', level=logging.DEBUG)

    # Create the server
    f = Factory()
    f.protocol = Notary
    reactor.listenTCP(1977, f)
    reactor.run()
plugin.py 文件源码 项目:enigma2 作者: BlackHole 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def autostart(reason, **kwargs):
    if reason == 0:
        print "[Hotplug] starting hotplug handler"
        from twisted.internet import reactor
        import os
        try:
            os.remove("/tmp/hotplug.socket")
        except OSError:
            pass
        factory = Factory()
        factory.protocol = Hotplug
        reactor.listenUNIX("/tmp/hotplug.socket", factory)
factory.py 文件源码 项目:congredi 作者: toxik-io 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def startedConnecting(self, connector):  # test
        logger.info('Factory - Connecting')


问题


面经


文章

微信
公众号

扫码关注公众号