python类FileSender()的实例源码

ftp.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def send(self, consumer):
        assert not self._send, "Can only call IReadFile.send *once* per instance"
        self._send = True
        d = basic.FileSender().beginFileTransfer(self.fObj, consumer)
        d.addBoth(self._close)
        return d
test_basic.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_interface(self):
        """
        L{basic.FileSender} implements the L{IPullProducer} interface.
        """
        sender = basic.FileSender()
        self.assertTrue(verifyObject(IProducer, sender))
test_basic.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_producerRegistered(self):
        """
        When L{basic.FileSender.beginFileTransfer} is called, it registers
        itself with provided consumer, as a non-streaming producer.
        """
        source = BytesIO(b"Test content")
        consumer = proto_helpers.StringTransport()
        sender = basic.FileSender()
        sender.beginFileTransfer(source, consumer)
        self.assertEqual(consumer.producer, sender)
        self.assertFalse(consumer.streaming)
imap4.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __cbContinueAppend(self, lines, message):
        s = basic.FileSender()
        return s.beginFileTransfer(message, self.transport, None
            ).addCallback(self.__cbFinishAppend)
maildir.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def startUp(self):
        """
        Start transferring the message to the mailbox.
        """
        self.createTempFile()
        if self.fh != -1:
            self.filesender = basic.FileSender()
            self.filesender.beginFileTransfer(self.msg, self)
smtp.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def smtpState_data(self, code, resp):
        s = basic.FileSender()
        d = s.beginFileTransfer(
            self.getMailData(), self.transport, self.transformChunk)
        def ebTransfer(err):
            self.sendError(err.value)
        d.addCallbacks(self.finishedFileTransfer, ebTransfer)
        self._expected = SUCCESS
        self._okresponse = self.smtpState_msgSent
pop3.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _sendMessageContent(self, i, fpWrapper, successResponse):
        """
        Send the contents of a message.

        @type i: L{bytes}
        @param i: A 1-based message index.

        @type fpWrapper: callable that takes a file-like object and returns
            a file-like object
        @param fpWrapper:

        @type successResponse: callable that takes L{int} and returns
            L{bytes}
        @param successResponse:

        @rtype: L{Deferred}
        @return: A deferred which triggers after the message has been sent.
        """
        d = self._getMessageFile(i)
        def cbMessageFile(info):
            if info is None:
                # Some error occurred - a failure response has been sent
                # already, just give up.
                return

            self._highest = max(self._highest, int(i))
            resp, fp = info
            fp = fpWrapper(fp)
            self.successResponse(successResponse(resp))
            s = basic.FileSender()
            d = s.beginFileTransfer(fp, self.transport, self.transformChunk)

            def cbFileTransfer(lastsent):
                if lastsent != '\n':
                    line = '\r\n.'
                else:
                    line = '.'
                self.sendLine(line)

            def ebFileTransfer(err):
                self.transport.loseConnection()
                log.msg("Unexpected error in _sendMessageContent:")
                log.err(err)

            d.addCallback(cbFileTransfer)
            d.addErrback(ebFileTransfer)
            return d
        return self._longOperation(d.addCallback(cbMessageFile))


问题


面经


文章

微信
公众号

扫码关注公众号