python类sendfile()的实例源码

test_os.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def test_headers(self):
            total_sent = 0
            sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
                               headers=[b"x" * 512])
            total_sent += sent
            offset = 4096
            nbytes = 4096
            while 1:
                sent = self.sendfile_wrapper(self.sockno, self.fileno,
                                                     offset, nbytes)
                if sent == 0:
                    break
                total_sent += sent
                offset += sent

            expected_data = b"x" * 512 + self.DATA
            self.assertEqual(total_sent, len(expected_data))
            self.client.close()
            self.server.wait()
            data = self.server.handler_instance.get_data()
            self.assertEqual(hash(data), hash(expected_data))
test_os.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_trailers(self):
            TESTFN2 = support.TESTFN + "2"
            file_data = b"abcdef"
            with open(TESTFN2, 'wb') as f:
                f.write(file_data)
            with open(TESTFN2, 'rb')as f:
                self.addCleanup(os.remove, TESTFN2)
                os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
                            trailers=[b"1234"])
                self.client.close()
                self.server.wait()
                data = self.server.handler_instance.get_data()
                self.assertEqual(data, b"abcdef1234")
wsgi.py 文件源码 项目:infiblog 作者: RajuKoushik 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def can_sendfile(self):
        return self.cfg.sendfile is not False and sendfile is not None
wsgi.py 文件源码 项目:infiblog 作者: RajuKoushik 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def write_file(self, respiter):
        if not self.sendfile(respiter):
            for item in respiter:
                self.write(item)
test_os.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_invalid_offset(self):
        with self.assertRaises(OSError) as cm:
            os.sendfile(self.sockno, self.fileno, -1, 4096)
        self.assertEqual(cm.exception.errno, errno.EINVAL)
test_os.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_keywords(self):
        # Keyword arguments should be supported
        os.sendfile(out=self.sockno, offset=0, count=4096,
            **{'in': self.fileno})
        if self.SUPPORT_HEADERS_TRAILERS:
            os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
                headers=(), trailers=(), flags=0)

    # --- headers / trailers tests
test_os.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_flags(self):
        try:
            os.sendfile(self.sockno, self.fileno, 0, 4096,
                        flags=os.SF_NODISKIO)
        except OSError as err:
            if err.errno not in (errno.EBUSY, errno.EAGAIN):
                raise
wsgi.py 文件源码 项目:metrics 作者: Jeremy-Friedman 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def can_sendfile(self):
        return self.cfg.sendfile is not False and sendfile is not None
wsgi.py 文件源码 项目:metrics 作者: Jeremy-Friedman 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def write_file(self, respiter):
        if not self.sendfile(respiter):
            for item in respiter:
                self.write(item)
wsgi.py 文件源码 项目:metrics 作者: Jeremy-Friedman 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def can_sendfile(self):
        return self.cfg.sendfile is not False and sendfile is not None
wsgi.py 文件源码 项目:logo-gen 作者: jellene4eva 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def can_sendfile(self):
        return self.cfg.sendfile is not False and sendfile is not None
wsgi.py 文件源码 项目:logo-gen 作者: jellene4eva 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def write_file(self, respiter):
        if not self.sendfile(respiter):
            for item in respiter:
                self.write(item)
wsgi.py 文件源码 项目:liberator 作者: libscie 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def can_sendfile(self):
        return self.cfg.sendfile is not False and sendfile is not None
wsgi.py 文件源码 项目:liberator 作者: libscie 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def write_file(self, respiter):
        if not self.sendfile(respiter):
            for item in respiter:
                self.write(item)
wsgi.py 文件源码 项目:compatify 作者: hatooku 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def can_sendfile(self):
        return self.cfg.sendfile is not False and sendfile is not None
wsgi.py 文件源码 项目:compatify 作者: hatooku 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def write_file(self, respiter):
        if not self.sendfile(respiter):
            for item in respiter:
                self.write(item)
wsgi.py 文件源码 项目:djanoDoc 作者: JustinChavez 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def can_sendfile(self):
        return self.cfg.sendfile is not False and sendfile is not None
wsgi.py 文件源码 项目:djanoDoc 作者: JustinChavez 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def write_file(self, respiter):
        if not self.sendfile(respiter):
            for item in respiter:
                self.write(item)
handlers.py 文件源码 项目:sdk-samples 作者: cradlepoint 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def initiate_sendfile(self):
        """A wrapper around sendfile."""
        try:
            sent = sendfile(self._fileno, self._filefd, self._offset,
                            self.ac_out_buffer_size)
        except OSError as err:
            if err.errno in _ERRNOS_RETRY or err.errno == errno.EBUSY:
                return
            elif err.errno in _ERRNOS_DISCONNECTED:
                self.handle_close()
            else:
                if self.tot_bytes_sent == 0:
                    logger.warning(
                        "sendfile() failed; falling back on using plain send")
                    raise _GiveUpOnSendfile
                else:
                    raise
        else:
            if sent == 0:
                # this signals the channel that the transfer is completed
                self.discard_buffers()
                self.handle_close()
            else:
                self._offset += sent
                self.tot_bytes_sent += sent

    # --- utility methods
test_functional.py 文件源码 项目:sdk-samples 作者: cradlepoint 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def test_fallback(self):
        # Makes sure that if sendfile() fails and no bytes were
        # transmitted yet the server falls back on using plain
        # send()
        data = b'abcde12345' * 100000
        self.dummy_sendfile.write(data)
        self.dummy_sendfile.seek(0)
        self.client.storbinary('stor ' + TESTFN, self.dummy_sendfile)
        with mock.patch('pyftpdlib.handlers.sendfile',
                        side_effect=OSError(errno.EINVAL)) as fun:
            try:
                self.client.retrbinary(
                    'retr ' + TESTFN, self.dummy_recvfile.write)
                assert fun.called
                self.dummy_recvfile.seek(0)
                datafile = self.dummy_recvfile.read()
                self.assertEqual(len(data), len(datafile))
                self.assertEqual(hash(data), hash(datafile))
            finally:
                # We do not use os.remove() because file could still be
                # locked by ftpd thread.  If DELE through FTP fails try
                # os.remove() as last resort.
                if os.path.exists(TESTFN):
                    try:
                        self.client.delete(TESTFN)
                    except (ftplib.Error, EOFError, socket.error):
                        safe_remove(TESTFN)


问题


面经


文章

微信
公众号

扫码关注公众号