python类BytesIO()的实例源码

copy_file_test.py 文件源码 项目:gransk 作者: pcbje 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_simple(self):
    mock_pipeline = test_helper.get_mock_pipeline([])

    data_root = os.path.join('local_data', 'unittests')

    if os.path.exists(data_root):
      shutil.rmtree(data_root)

    _copy = copy_file.Subscriber(mock_pipeline)
    _copy.setup({
        helper.DATA_ROOT: data_root,
        'workers': 1,
        'tag': 'default',
        helper.COPY_EXT: ['xyz']
    })

    _copy.consume(document.get_document('mock.xyz'), BytesIO(b'juba.'))
    _copy.consume(document.get_document('ignore.doc'), BytesIO(b'mock'))

    expected = ['39bbf948-mock.xyz']

    actual = os.listdir(os.path.join(data_root, 'files', 'xyz'))

    self.assertEqual(expected, actual)
su3file.py 文件源码 项目:pyseeder 作者: PurpleI2P 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def reseed(self, netdb):
        """Compress netdb entries and set content"""
        zip_file = io.BytesIO()
        dat_files = []

        for root, dirs, files in os.walk(netdb):
            for f in files:
                if f.endswith(".dat"):
                    # TODO check modified time
                    # may be not older than 10h
                    dat_files.append(os.path.join(root, f))

        if len(dat_files) == 0:
            raise PyseederException("Can't get enough netDb entries")
        elif len(dat_files) > 75:
            dat_files = random.sample(dat_files, 75)

        with ZipFile(zip_file, "w", compression=ZIP_DEFLATED) as zf:
            for f in dat_files: 
                zf.write(f, arcname=os.path.split(f)[1])

        self.FILE_TYPE = 0x00
        self.CONTENT_TYPE = 0x03
        self.CONTENT = zip_file.getvalue()
        self.CONTENT_LENGTH = len(self.CONTENT)
test_python_driver.py 文件源码 项目:python-driver 作者: bblfsh 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _restart_data(self, format_: str='json') -> None:
        assert format_ == 'json'

        with open(join(CURDIR, 'data', 'helloworld.py')) as f:
            testcode = f.read()

        self.data = Request({
            'filepath': 'test.py',
            'action': 'ParseAST',
            'content': testcode,
            'language': 'python',
        })

        bufferclass = io.StringIO if format_ == 'json' else io.BytesIO

        # This will mock the python_driver stdin
        self.sendbuffer = bufferclass()
        # This will mock the python_driver stdout
        self.recvbuffer = bufferclass()
astimprove.py 文件源码 项目:python-driver 作者: bblfsh 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, codestr: str, astdict: AstDict) -> None:
        self._astdict = astdict
        # Tokenize and create the noop extractor and the position fixer
        self._tokens: List[Token] = [Token(*i) for i in tokenize.tokenize(BytesIO(codestr.encode('utf-8')).readline)]
        token_lines = _create_tokenized_lines(codestr, self._tokens)
        self.noops_sync = NoopExtractor(codestr, token_lines)
        self.pos_sync   = LocationFixer(codestr, token_lines)
        self.codestr    = codestr

        # This will store a dict of nodes to end positions, it will be filled
        # on parse()
        self._node2endpos = None

        self.visit_Global = self.visit_Nonlocal = self._promote_names
NNTPContent.py 文件源码 项目:newsreap 作者: caronc 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __len__(self):
        """
        Returns the length of the content
        """
        if not self.filepath:
            # If there is no filepath, then we're probably dealing with a
            # stream in memory like a StringIO or BytesIO stream.
            if self.stream:
                # Advance to the end of the file
                ptr = self.stream.tell()
                # Advance to the end of the file and get our length
                length = self.stream.seek(0L, SEEK_END)
                if length != ptr:
                    # Return our pointer
                    self.stream.seek(ptr, SEEK_SET)
            else:
                # No Stream or Filepath; nothing has been initialized
                # yet at all so just return 0
                length = 0
        else:
            if self.stream and self._dirty is True:
                self.stream.flush()
                self._dirty = False

            # Get the size
            length = getsize(self.filepath)

        return length
resources.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_stream(self, resource):
        return io.BytesIO(self.get_bytes(resource))
__init__.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_resource_stream(self, manager, resource_name):
        return io.BytesIO(self.get_resource_string(manager, resource_name))
serialize.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def prepare_response(self, request, cached):
        """Verify our vary headers match and construct a real urllib3
        HTTPResponse object.
        """
        # Special case the '*' Vary value as it means we cannot actually
        # determine if the cached response is suitable for this request.
        if "*" in cached.get("vary", {}):
            return

        # Ensure that the Vary headers for the cached response match our
        # request
        for header, value in cached.get("vary", {}).items():
            if request.headers.get(header, None) != value:
                return

        body_raw = cached["response"].pop("body")

        headers = CaseInsensitiveDict(data=cached['response']['headers'])
        if headers.get('transfer-encoding', '') == 'chunked':
            headers.pop('transfer-encoding')

        cached['response']['headers'] = headers

        try:
            body = io.BytesIO(body_raw)
        except TypeError:
            # This can happen if cachecontrol serialized to v1 format (pickle)
            # using Python 2. A Python 2 str(byte string) will be unpickled as
            # a Python 3 str (unicode string), which will cause the above to
            # fail with:
            #
            #     TypeError: 'str' does not support the buffer interface
            body = io.BytesIO(body_raw.encode('utf8'))

        return HTTPResponse(
            body=body,
            preload_content=False,
            **cached["response"]
        )
filewrapper.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 53 收藏 0 点赞 0 评论 0
def __init__(self, fp, callback):
        self.__buf = BytesIO()
        self.__fp = fp
        self.__callback = callback
__init__.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def captured_output(stream_name):
    """Return a context manager used by captured_stdout/stdin/stderr
    that temporarily replaces the sys stream *stream_name* with a StringIO.

    Taken from Lib/support/__init__.py in the CPython repo.
    """
    orig_stdout = getattr(sys, stream_name)
    setattr(sys, stream_name, StreamWrapper.from_stream(orig_stdout))
    try:
        yield getattr(sys, stream_name)
    finally:
        setattr(sys, stream_name, orig_stdout)
test_wheelfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_verifying_zipfile():
    if not hasattr(zipfile.ZipExtFile, '_update_crc'):
        pytest.skip('No ZIP verification. Missing ZipExtFile._update_crc.')

    sio = StringIO()
    zf = zipfile.ZipFile(sio, 'w')
    zf.writestr("one", b"first file")
    zf.writestr("two", b"second file")
    zf.writestr("three", b"third file")
    zf.close()

    # In default mode, VerifyingZipFile checks the hash of any read file
    # mentioned with set_expected_hash(). Files not mentioned with
    # set_expected_hash() are not checked.
    vzf = wheel.install.VerifyingZipFile(sio, 'r')
    vzf.set_expected_hash("one", hashlib.sha256(b"first file").digest())
    vzf.set_expected_hash("three", "blurble")
    vzf.open("one").read()
    vzf.open("two").read()
    try:
        vzf.open("three").read()
    except wheel.install.BadWheelFile:
        pass
    else:
        raise Exception("expected exception 'BadWheelFile()'")

    # In strict mode, VerifyingZipFile requires every read file to be
    # mentioned with set_expected_hash().
    vzf.strict = True
    try:
        vzf.open("two").read()
    except wheel.install.BadWheelFile:
        pass
    else:
        raise Exception("expected exception 'BadWheelFile()'")

    vzf.set_expected_hash("two", None)
    vzf.open("two").read()
__init__.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_resource_stream(self, manager, resource_name):
        return io.BytesIO(self.get_resource_string(manager, resource_name))
quopri_codec.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def quopri_encode(input, errors='strict'):
    assert errors == 'strict'
    f = BytesIO(input)
    g = BytesIO()
    quopri.encode(f, g, quotetabs=True)
    return (g.getvalue(), len(input))
quopri_codec.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def quopri_decode(input, errors='strict'):
    assert errors == 'strict'
    f = BytesIO(input)
    g = BytesIO()
    quopri.decode(f, g)
    return (g.getvalue(), len(input))
uu_codec.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def uu_encode(input, errors='strict', filename='<data>', mode=0o666):
    assert errors == 'strict'
    infile = BytesIO(input)
    outfile = BytesIO()
    read = infile.read
    write = outfile.write

    # Encode
    write(('begin %o %s\n' % (mode & 0o777, filename)).encode('ascii'))
    chunk = read(45)
    while chunk:
        write(binascii.b2a_uu(chunk))
        chunk = read(45)
    write(b' \nend\n')

    return (outfile.getvalue(), len(input))
uu_codec.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def uu_decode(input, errors='strict'):
    assert errors == 'strict'
    infile = BytesIO(input)
    outfile = BytesIO()
    readline = infile.readline
    write = outfile.write

    # Find start of encoded data
    while 1:
        s = readline()
        if not s:
            raise ValueError('Missing "begin" line in input data')
        if s[:5] == b'begin':
            break

    # Decode
    while True:
        s = readline()
        if not s or s == b'end\n':
            break
        try:
            data = binascii.a2b_uu(s)
        except binascii.Error as v:
            # Workaround for broken uuencoders by /Fredrik Lundh
            nbytes = (((s[0]-32) & 63) * 4 + 5) // 3
            data = binascii.a2b_uu(s[:nbytes])
            #sys.stderr.write("Warning: %s\n" % str(v))
        write(data)
    if not s:
        raise ValueError('Truncated input data')

    return (outfile.getvalue(), len(input))
test_usermanager.py 文件源码 项目:ownbot 作者: michaelimfeld 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_load_config_empty_file(self):
        """
            Test loading of the config attr if the config file is empty
        """
        usrmgr = self.__get_dummy_object()
        with patch("os.path.exists", return_value=True),\
                patch("ownbot.usermanager.open") as open_mock:

            open_mock.return_value = io.BytesIO(b"")

            self.assertEqual(usrmgr.config, {})
            self.assertTrue(open_mock.called)
mapprint.py 文件源码 项目:geo-pyprint 作者: ioda-net 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _create_map_image(self):
        images = self._get_images()

        self._map_image = Image.new('RGBA', self._map_size)

        for img in images:
            if not isinstance(img, Image.Image):
                img = Image.open(BytesIO(img.content)).convert('RGBA')

            self._map_image = Image.alpha_composite(self._map_image, img)
mapprint.py 文件源码 项目:geo-pyprint 作者: ioda-net 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _create_pdf_libreoffice(self):
        output_image = BytesIO()
        self._map_image.save(output_image, 'PNG')

        render = Renderer(media_path='.')
        # TODO: use the configuration to select the template
        # TODO: use the configuration to select the name of the key in the template
        result = render.render('template.odt', my_map=output_image)
        with NamedTemporaryFile(
                mode='wb+',
                prefix='geo-pyprint_',
                delete=True
        ) as generated_odt:
            generated_odt.write(result)
            generated_odt.flush()

            output_name = generated_odt.name + '.pdf'
            cmd = [
                'unoconv',
                '-f',
                'pdf',
                '-o',
                output_name,
                generated_odt.name
            ]
            subprocess.call(cmd, timeout=None)

            return output_name
pyopenssl.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def load_verify_locations(self, cafile=None, capath=None, cadata=None):
        if cafile is not None:
            cafile = cafile.encode('utf-8')
        if capath is not None:
            capath = capath.encode('utf-8')
        self._ctx.load_verify_locations(cafile, capath)
        if cadata is not None:
            self._ctx.load_verify_locations(BytesIO(cadata))


问题


面经


文章

微信
公众号

扫码关注公众号