python类BytesIO()的实例源码

import_cards.py 文件源码 项目:django-magic-cards 作者: pbaranay 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def fetch_data():
    try:
        r = requests.get(MTG_JSON_URL)
    except requests.ConnectionError:
        r = requests.get(FALLBACK_MTG_JSON_URL)
    with closing(r), zipfile.ZipFile(io.BytesIO(r.content)) as archive:
        unzipped_files = archive.infolist()
        if len(unzipped_files) != 1:
            raise RuntimeError("Found an unexpected number of files in the MTGJSON archive.")
        data = archive.read(archive.infolist()[0])
    decoded_data = data.decode('utf-8')
    sets_data = json.loads(decoded_data)
    return sets_data
util.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def zip_dir(directory):
    """zip a directory tree into a BytesIO object"""
    result = io.BytesIO()
    dlen = len(directory)
    with ZipFile(result, "w") as zf:
        for root, dirs, files in os.walk(directory):
            for name in files:
                full = os.path.join(root, name)
                rel = root[dlen:]
                dest = os.path.join(rel, name)
                zf.write(full, dest)
    return result

#
# Simple progress bar
#
chunks.py 文件源码 项目:cloud-volume 作者: seung-lab 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def encode_jpeg(arr):
    assert arr.dtype == np.uint8

    # simulate multi-channel array for single channel arrays
    if len(arr.shape) == 3:
        arr = np.expand_dims(arr, 3) # add channels to end of x,y,z

    arr = arr.transpose((3,2,1,0)) # channels, z, y, x
    reshaped = arr.reshape(arr.shape[3] * arr.shape[2], arr.shape[1] * arr.shape[0])
    if arr.shape[0] == 1:
        img = Image.fromarray(reshaped, mode='L')
    elif arr.shape[0] == 3:
        img = Image.fromarray(reshaped, mode='RGB')
    else:
        raise ValueError("Number of image channels should be 1 or 3. Got: {}".format(arr.shape[3]))

    f = io.BytesIO()
    img.save(f, "JPEG")
    return f.getvalue()
ImageQt.py 文件源码 项目:imagepaste 作者: robinchenyu 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def fromqimage(im):
    buffer = QBuffer()
    buffer.open(QIODevice.ReadWrite)
    # preserve alha channel with png
    # otherwise ppm is more friendly with Image.open
    if im.hasAlphaChannel():
        im.save(buffer, 'png')
    else:
        im.save(buffer, 'ppm')

    b = BytesIO()
    try:
        b.write(buffer.data())
    except TypeError:
        # workaround for Python 2
        b.write(str(buffer.data()))
    buffer.close()
    b.seek(0)

    return Image.open(b)
NNTPContent.py 文件源码 项目:newsreap 作者: caronc 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def tell(self):
        """
        Allows reference to our object from within a Codec()

        """
        if not self.filepath:
            # If there is no filepath, then we're probably dealing with a
            # stream in memory like a StringIO or BytesIO stream.
            if self.stream:
                # Advance to the end of the file
                return self.stream.tell()

        else:
            if self.stream and self._dirty is True:
                self.stream.flush()
                self._dirty = False

        if not self.stream:
            if not self.open(mode=NNTPFileMode.BINARY_RO):
                return None

        return self.stream.tell()
_inputstream.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def openStream(self, source):
        """Produces a file object from source.

        source can be either a file object, local filename or a string.

        """
        # Already a file object
        if hasattr(source, 'read'):
            stream = source
        else:
            stream = BytesIO(source)

        try:
            stream.seek(stream.tell())
        except:  # pylint:disable=bare-except
            stream = BufferedStream(stream)

        return stream
tempfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def rollover(self):
        if self._rolled: return
        file = self._file
        newfile = self._file = TemporaryFile(**self._TemporaryFileArgs)
        del self._TemporaryFileArgs

        newfile.write(file.getvalue())
        newfile.seek(file.tell(), 0)

        self._rolled = True

    # The method caching trick from NamedTemporaryFile
    # won't work here, because _file may change from a
    # BytesIO/StringIO instance to a real file. So we list
    # all the methods directly.

    # Context management protocol
test_usermanager.py 文件源码 项目:ownbot 作者: michaelimfeld 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_load_config(self):
        """
            Test loading of the config attribute
        """
        usrmgr = self.__get_dummy_object()
        with patch("os.path.exists", return_value=True),\
                patch("ownbot.usermanager.open") as open_mock:

            open_mock.return_value = io.BytesIO(b"""
foogroup:
  unverified:
    - '@foouser'""")

            expected_config = {"foogroup": {"unverified": ["@foouser"]}}

            self.assertEqual(usrmgr.config, expected_config)
            self.assertTrue(open_mock.called)
util.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def zip_dir(directory):
    """zip a directory tree into a BytesIO object"""
    result = io.BytesIO()
    dlen = len(directory)
    with ZipFile(result, "w") as zf:
        for root, dirs, files in os.walk(directory):
            for name in files:
                full = os.path.join(root, name)
                rel = root[dlen:]
                dest = os.path.join(rel, name)
                zf.write(full, dest)
    return result

#
# Simple progress bar
#
base.py 文件源码 项目:zappa-django-utils 作者: Miserlou 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def close(self, *args, **kwargs):
        """
        Engine closed, copy file to DB
        """
        super(DatabaseWrapper, self).close(*args, **kwargs)

        signature_version = self.settings_dict.get("SIGNATURE_VERSION", "s3v4")
        s3 = boto3.resource('s3',
                config=botocore.client.Config(signature_version=signature_version))

        try:
            with open(self.settings_dict['NAME'], 'rb') as f:
                fb = f.read()
                bytesIO = BytesIO()
                bytesIO.write(fb)
                bytesIO.seek(0)

                s3_object = s3.Object(self.settings_dict['BUCKET'], self.settings_dict['REMOTE_NAME'])
                result = s3_object.put('rb', Body=bytesIO)

        except Exception as e:
            print(e)

        logging.debug("Saved to remote DB!")
steamgifts.py 文件源码 项目:AutoSteamGifts 作者: joaopsys 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def getWebPage(url, headers, cookies, postData=None):
    try:
        if (postData):
            params = urllib.parse.urlencode(postData)
            params = params.encode('utf-8')
            request = urllib.request.Request(url, data=params, headers=headers)
        else:
            print('Fetching '+url)
            request = urllib.request.Request(url, None, headers)
        request.add_header('Cookie', cookies)
        if (postData):
            response = urllib.request.build_opener(urllib.request.HTTPCookieProcessor).open(request)
        else:
            response = urllib.request.urlopen(request)
        if response.info().get('Content-Encoding') == 'gzip':
            buf = BytesIO(response.read())
            f = gzip.GzipFile(fileobj=buf)
            r = f.read()
        else:
            r = response.read()

        return r
    except Exception as e:
        print("Error processing webpage: "+str(e))
        return None

## https://stackoverflow.com/questions/480214/how-do-you-remove-duplicates-from-a-list-in-python-whilst-preserving-order
visualize.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def display_graph(g, format='svg', include_asset_exists=False):
    """
    Display a TermGraph interactively from within IPython.
    """
    try:
        import IPython.display as display
    except ImportError:
        raise NoIPython("IPython is not installed.  Can't display graph.")

    if format == 'svg':
        display_cls = display.SVG
    elif format in ("jpeg", "png"):
        display_cls = partial(display.Image, format=format, embed=True)

    out = BytesIO()
    _render(g, out, format, include_asset_exists=include_asset_exists)
    return display_cls(data=out.getvalue())
bottle.py 文件源码 项目:dabdabrevolution 作者: harryparkdotio 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _body(self):
        try:
            read_func = self.environ['wsgi.input'].read
        except KeyError:
            self.environ['wsgi.input'] = BytesIO()
            return self.environ['wsgi.input']
        body_iter = self._iter_chunked if self.chunked else self._iter_body
        body, body_size, is_temp_file = BytesIO(), 0, False
        for part in body_iter(read_func, self.MEMFILE_MAX):
            body.write(part)
            body_size += len(part)
            if not is_temp_file and body_size > self.MEMFILE_MAX:
                body, tmp = TemporaryFile(mode='w+b'), body
                body.write(tmp.getvalue())
                del tmp
                is_temp_file = True
        self.environ['wsgi.input'] = body
        body.seek(0)
        return body
usbhid.py 文件源码 项目:rivalcfg 作者: flozz 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def open_device(vendor_id, product_id, interface_number):
    """Opens and returns the HID device (file-like object). Raise IOError if
    the device or interface is not available.

    Arguments:
    vendor_id -- the mouse vendor id (e.g. 0x1038)
    product_id -- the mouse product id (e.g. 0x1710)
    interface_number -- the interface number (e.g. 0x00)
    """
    # Dry run
    if debug.DEBUG and debug.DRY and is_device_plugged(vendor_id, product_id):
        device = BytesIO()  # Moke the device
        device.send_feature_report = device.write
        return device

    # Real device
    for interface in hid.enumerate(vendor_id, product_id):
        if interface["interface_number"] != interface_number:
            continue
        device = hid.device()
        device.open_path(interface["path"])
        return device

    raise IOError("Unable to find the requested device: %04X:%04X:%02X" % (
        vendor_id, product_id, interface_number))
comic.py 文件源码 项目:ComicSpider 作者: QuantumLiu 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def print_chapters(self,show=False):
        '''
        ??????
        Display infos of chapters.
        '''
        headers={'use-agent':"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36",'Referer':'http://manhua.dmzj.com/tags/s.shtml'}
        text='There are {n} chapters in comic {c}:\n{chs}'.format(n=self.chapter_num,c=self.comic_title,chs='\n'.join([info[0] for info in self.chapter_urls]))
        print(text)
        if show:
            try:
                res=requests.get(self.cover,headers=headers)
                if b'403' in res.content:
                    raise ValueError('Got cover img failed')
                out=BytesIO(res.content)
                out.seek(0)
                Image.open(out).show()
            except (ConnectionError,ValueError):
                traceback.print_exc()
        return text
models.py 文件源码 项目:cbapi-python 作者: carbonblack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def file(self):
        """
        Returns a file pointer to this binary

        :example:

        >>> process_obj = c.select(Process).where("process_name:svch0st.exe").first()
        >>> binary_obj = process_obj.binary
        >>> print(binary_obj.file.read(2))
        MZ
        """
        # TODO: I don't like reaching through to the session...
        with closing(self._cb.session.get("/api/v1/binary/{0:s}".format(self.md5sum), stream=True)) as r:
            z = StringIO(r.content)
            zf = ZipFile(z)
            fp = zf.open('filedata')
            return fp
socks.py 文件源码 项目:LFISuite 作者: D35m0nd142 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def sendto(self, bytes, *args, **kwargs):
        if self.type != socket.SOCK_DGRAM:
            return super(socksocket, self).sendto(bytes, *args, **kwargs)
        if not self._proxyconn:
            self.bind(("", 0))

        address = args[-1]
        flags = args[:-1]

        header = BytesIO()
        RSV = b"\x00\x00"
        header.write(RSV)
        STANDALONE = b"\x00"
        header.write(STANDALONE)
        self._write_SOCKS5_address(address, header)

        sent = super(socksocket, self).send(header.getvalue() + bytes, *flags, **kwargs)
        return sent - header.tell()
socks.py 文件源码 项目:LFISuite 作者: D35m0nd142 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def recvfrom(self, bufsize, flags=0):
        if self.type != socket.SOCK_DGRAM:
            return super(socksocket, self).recvfrom(bufsize, flags)
        if not self._proxyconn:
            self.bind(("", 0))

        buf = BytesIO(super(socksocket, self).recv(bufsize + 1024, flags))
        buf.seek(2, SEEK_CUR)
        frag = buf.read(1)
        if ord(frag):
            raise NotImplementedError("Received UDP packet fragment")
        fromhost, fromport = self._read_SOCKS5_address(buf)

        if self.proxy_peername:
            peerhost, peerport = self.proxy_peername
            if fromhost != peerhost or peerport not in (0, fromport):
                raise socket.error(EAGAIN, "Packet filtered")

        return (buf.read(bufsize), (fromhost, fromport))
test_svg.py 文件源码 项目:segno 作者: heuer 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def test_write_svg():
    # Test with default options
    qr = segno.make_qr('test')
    out = io.BytesIO()
    qr.save(out, kind='svg')
    xml_str = out.getvalue()
    assert xml_str.startswith(b'<?xml')
    root = _parse_xml(out)
    assert 'viewBox' not in root.attrib
    assert 'height' in root.attrib
    assert 'width' in root.attrib
    css_class = root.attrib.get('class')
    assert css_class
    assert 'segno' == css_class
    path_el = _get_path(root)
    assert path_el is not None
    path_class = path_el.get('class')
    assert 'qrline' == path_class
    stroke = path_el.get('stroke')
    assert stroke == '#000'
    title_el = _get_title(root)
    assert title_el is None
    desc_el = _get_desc(root)
    assert desc_el is None
test_svg.py 文件源码 项目:segno 作者: heuer 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_write_svg_black():
    # Test with default options
    qr = segno.make_qr('test')
    out = io.BytesIO()
    qr.save(out, kind='svg', color='bLacK')
    xml_str = out.getvalue()
    assert xml_str.startswith(b'<?xml')
    root = _parse_xml(out)
    assert 'viewBox' not in root.attrib
    assert 'height' in root.attrib
    assert 'width' in root.attrib
    css_class = root.attrib.get('class')
    assert css_class
    assert 'segno' == css_class
    path_el = _get_path(root)
    assert path_el is not None
    path_class = path_el.get('class')
    assert 'qrline' == path_class
    stroke = path_el.get('stroke')
    assert stroke == '#000'
    title_el = _get_title(root)
    assert title_el is None
    desc_el = _get_desc(root)
    assert desc_el is None
test_svg.py 文件源码 项目:segno 作者: heuer 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_write_svg_black3():
    # Test with default options
    qr = segno.make_qr('test')
    out = io.BytesIO()
    qr.save(out, kind='svg', color=(0, 0, 0))
    xml_str = out.getvalue()
    assert xml_str.startswith(b'<?xml')
    root = _parse_xml(out)
    assert 'viewBox' not in root.attrib
    assert 'height' in root.attrib
    assert 'width' in root.attrib
    css_class = root.attrib.get('class')
    assert css_class
    assert 'segno' == css_class
    path_el = _get_path(root)
    assert path_el is not None
    path_class = path_el.get('class')
    assert 'qrline' == path_class
    stroke = path_el.get('stroke')
    assert stroke == '#000'
    title_el = _get_title(root)
    assert title_el is None
    desc_el = _get_desc(root)
    assert desc_el is None
test_pam.py 文件源码 项目:segno 作者: heuer 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def pam_bw_as_matrix(buff, border):
    """\
    Returns the QR code as list of [0, 1] lists.

    :param io.BytesIO buff: Buffer to read the matrix from.
    """
    res = []
    data, size = _image_data(buff)
    for i, offset in enumerate(range(0, len(data), size)):
        if i < border:
            continue
        if i >= size - border:
            break
        row_data = bytearray(data[offset + border:offset + size - border])
        # Invert bytes since PAM uses 0x0 = black, 0x1 = white
        res.append([b ^ 0x1 for b in row_data])
    return res
test_pdf.py 文件源码 项目:segno 作者: heuer 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def pdf_as_matrix(buff, border):
    """\
    Reads the path in the PDF and returns it as list of 0, 1 lists.

    :param io.BytesIO buff: Buffer to read the matrix from.
    """
    pdf = buff.getvalue()
    h, w = re.search(br'/MediaBox \[0 0 ([0-9]+) ([0-9]+)\]', pdf,
                     flags=re.MULTILINE).groups()
    if h != w:
        raise ValueError('Expected equal height/width, got height="{}" width="{}"'.format(h, w))
    size = int(w) - 2 * border

    graphic = _find_graphic(buff)
    res = [[0] * size for i in range(size)]
    for x1, y1, x2, y2 in re.findall(r'\s*(\-?\d+)\s+(\-?\d+)\s+m\s+'
                                        r'(\-?\d+)\s+(\-?\d+)\s+l', graphic):
        x1, y1, x2, y2 = [int(i) for i in (x1, y1, x2, y2)]
        y = abs(y1)
        res[y][x1:x2] = [1] * (x2 - x1)
    return res
test_wire.py 文件源码 项目:py-abci 作者: davebryson 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_flow():
    from io import BytesIO
    # info + flush
    inbound = b'\x01\x02\x1a\x00\x01\x02\x12\x00'
    data = BytesIO(inbound)

    req_type,_  = read_message(data, types.Request)
    assert 'info' == req_type.WhichOneof("value")

    req_type2, _  = read_message(data, types.Request)
    assert 'flush' == req_type2.WhichOneof("value")

    assert data.read() == b''
    data.close()

    data2 = BytesIO(b'')
    req_type, fail  = read_message(data2, types.Request)
    assert fail == 0
    assert req_type == None

    data3 = BytesIO(b'\x01')
    req_type, fail  = read_message(data3, types.Request)
    assert fail == 0
convert.py 文件源码 项目:jx-sqlite 作者: mozilla 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def bytes2zip(bytes):
    """
    RETURN COMPRESSED BYTES
    """
    if hasattr(bytes, "read"):
        buff = TemporaryFile()
        archive = gzip.GzipFile(fileobj=buff, mode='w')
        for b in bytes:
            archive.write(b)
        archive.close()
        buff.seek(0)
        from pyLibrary.env.big_data import FileString, safe_size
        return FileString(buff)

    buff = BytesIO()
    archive = gzip.GzipFile(fileobj=buff, mode='w')
    archive.write(bytes)
    archive.close()
    return buff.getvalue()
OleFileIO.py 文件源码 项目:imagepaste 作者: robinchenyu 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def openstream(self, filename):
        """
        Open a stream as a read-only file object (BytesIO).
        Note: filename is case-insensitive.

        :param filename: path of stream in storage tree (except root entry), either:

            - a string using Unix path syntax, for example:
              'storage_1/storage_1.2/stream'
            - or a list of storage filenames, path to the desired stream/storage.
              Example: ['storage_1', 'storage_1.2', 'stream']

        :returns: file object (read-only)
        :exception IOError: if filename not found, or if this is not a stream.
        """
        sid = self._find(filename)
        entry = self.direntries[sid]
        if entry.entry_type != STGTY_STREAM:
            raise IOError("this file is not a stream")
        return self._open(entry.isectStart, entry.size)
ImageTk.py 文件源码 项目:imagepaste 作者: robinchenyu 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, image=None, **kw):

        # Tk compatibility: file or data
        if image is None:
            if "file" in kw:
                image = Image.open(kw["file"])
                del kw["file"]
            elif "data" in kw:
                from io import BytesIO
                image = Image.open(BytesIO(kw["data"]))
                del kw["data"]

        self.__mode = image.mode
        self.__size = image.size

        if _pilbitmap_check():
            # fast way (requires the pilbitmap booster patch)
            image.load()
            kw["data"] = "PIL:%d" % image.im.id
            self.__im = image  # must keep a reference
        else:
            # slow but safe way
            kw["data"] = image.tobitmap()
        self.__photo = tkinter.BitmapImage(**kw)
MerkleHashTree.py 文件源码 项目:PyPPSPP 作者: justas- 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_data_hash(self, data_bytes):
        """Calculate Merkle's root hash of the given data bytes"""

        # Calculate tree parameters
        data_len = len(data_bytes)
        tree_populated_width = math.ceil(data_len / self._chunk_len)
        tree_height = math.ceil(math.log2(tree_populated_width))
        tree_width = int(math.pow(2, tree_height))

        tree_bottom_layer = ['\x00'] * tree_width
        with io.BytesIO(data_bytes) as b_data:
            self._initial_hasher(
                b_data,
                tree_populated_width,
                tree_bottom_layer
            )

        # Get Merkle's root hash
        mrh = self._calculate_root_hash(tree_bottom_layer)
        return mrh
magic_test.py 文件源码 项目:gransk 作者: pcbje 项目源码 文件源码 阅读 55 收藏 0 点赞 0 评论 0
def test_simple(self):
    pipe = test_helper.get_mock_pipeline([])

    mock_mod = test_helper.MockSubscriber()
    mock_mod = test_helper.MockSubscriber()

    pipe.register_magic(b'\xFF\xEE\xDD', ('mock', mock_mod.consume))
    pipe.register_magic(b'\x00\x00\x00', ('mock', mock_mod.consume))

    _magic = magic.Subscriber(pipe)
    _magic.setup(None)

    doc = document.get_document('mock')

    content = b'\xFF\xEE\xDDMOCKMOCKMOCK'

    _magic.consume(doc, BytesIO(content))

    self.assertEquals(True, doc.magic_hit)
    self.assertEquals(1, len(mock_mod.produced))

    expected = content
    actual = mock_mod.produced[0][1].read()

    self.assertEquals(expected, actual)
file_meta_test.py 文件源码 项目:gransk 作者: pcbje 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_simple(self):
    _file_meta = file_meta.Subscriber(test_helper.get_mock_pipeline([]))
    response = json.dumps({'Content-Type': 'image/jpeg'}).encode('utf-8')
    _file_meta.setup({
        'data_root': 'local_data',
        'code_root': '.',
        'worker_id': 1,
        'host': 'mock',
        helper.INJECTOR: test_helper.MockInjector(response)
    })

    doc = document.get_document('mock.txt')

    _file_meta.consume(doc, BytesIO(b'mock'))

    expected = 'picture'
    actual = doc.doctype

    self.assertEqual(expected, actual)


问题


面经


文章

微信
公众号

扫码关注公众号