python类BytesIO()的实例源码

serialization_utils.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def loads_with_persistent_ids(str, env):
    """
    Performs a pickle loads on the given string, substituting the given
    TradingEnvironment in to any tokenized representations of a
    TradingEnvironment or AssetFinder.

    Parameters
    ----------
    str : String
        The string representation of the object to be unpickled.
    env : TradingEnvironment
        The TradingEnvironment to be inserted to the unpickled object.

    Returns
    -------
    obj
       An unpickled object formed from the parameter 'str'.
    """
    file = BytesIO(str)
    unpickler = pickle.Unpickler(file)
    unpickler.persistent_load = partial(_persistent_load, env=env)
    return unpickler.load()
http.py 文件源码 项目:oscars2016 作者: 0x0ece 项目源码 文件源码 阅读 124 收藏 0 点赞 0 评论 0
def __init__(self, body, mimetype='application/octet-stream',
               chunksize=DEFAULT_CHUNK_SIZE, resumable=False):
    """Create a new MediaInMemoryUpload.

  DEPRECATED: Use MediaIoBaseUpload with either io.TextIOBase or StringIO for
  the stream.

  Args:
    body: string, Bytes of body content.
    mimetype: string, Mime-type of the file or default of
      'application/octet-stream'.
    chunksize: int, File will be uploaded in chunks of this many bytes. Only
      used if resumable=True.
    resumable: bool, True if this is a resumable upload. False means upload
      in a single request.
    """
    fd = BytesIO(body)
    super(MediaInMemoryUpload, self).__init__(fd, mimetype, chunksize=chunksize,
                                              resumable=resumable)
test_connection.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_bad_protocol_version(self, *args):
        c = self.make_connection()
        c._requests = Mock()
        c.defunct = Mock()

        # read in a SupportedMessage response
        header = self.make_header_prefix(SupportedMessage, version=0x7f)
        options = self.make_options_body()
        message = self.make_msg(header, options)
        c._iobuf = BytesIO()
        c._iobuf.write(message)
        c.process_io_buffer()

        # make sure it errored correctly
        c.defunct.assert_called_once_with(ANY)
        args, kwargs = c.defunct.call_args
        self.assertIsInstance(args[0], ProtocolError)
test_connection.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_negative_body_length(self, *args):
        c = self.make_connection()
        c._requests = Mock()
        c.defunct = Mock()

        # read in a SupportedMessage response
        header = self.make_header_prefix(SupportedMessage)
        message = header + int32_pack(-13)
        c._iobuf = BytesIO()
        c._iobuf.write(message)
        c.process_io_buffer()

        # make sure it errored correctly
        c.defunct.assert_called_once_with(ANY)
        args, kwargs = c.defunct.call_args
        self.assertIsInstance(args[0], ProtocolError)
test_connection.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_unsupported_cql_version(self, *args):
        c = self.make_connection()
        c._requests = {0: (c._handle_options_response, ProtocolHandler.decode_message, [])}
        c.defunct = Mock()
        c.cql_version = "3.0.3"

        # read in a SupportedMessage response
        header = self.make_header_prefix(SupportedMessage)

        options_buf = BytesIO()
        write_stringmultimap(options_buf, {
            'CQL_VERSION': ['7.8.9'],
            'COMPRESSION': []
        })
        options = options_buf.getvalue()

        c.process_msg(_Frame(version=4, flags=0, stream=0, opcode=SupportedMessage.opcode, body_offset=9, end_pos=9 + len(options)), options)

        # make sure it errored correctly
        c.defunct.assert_called_once_with(ANY)
        args, kwargs = c.defunct.call_args
        self.assertIsInstance(args[0], ProtocolError)
test_docker.py 文件源码 项目:girder_worker 作者: girder 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_docker_run_file_upload_to_item(session, girder_client, test_item):

    contents = 'Balaenoptera musculus'
    params = {
        'itemId': test_item['_id'],
        'contents': contents
    }
    r = session.post('integration_tests/docker/test_docker_run_file_upload_to_item',
                     params=params)
    assert r.status_code == 200, r.content

    with session.wait_for_success(r.json()['_id']) as job:
        assert [ts['status'] for ts in job['timestamps']] == \
            [JobStatus.RUNNING, JobStatus.SUCCESS]

    files = list(girder_client.listFile(test_item['_id']))

    assert len(files) == 1

    file_contents = six.BytesIO()
    girder_client.downloadFile(files[0]['_id'], file_contents)
    file_contents.seek(0)

    assert file_contents.read().strip() == contents
downloader.py 文件源码 项目:icrawler 作者: hellock 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def keep_file(self, response, min_size=None, max_size=None):
        """Decide whether to keep the image

        Compare image size with ``min_size`` and ``max_size`` to decide.

        Args:
            response (Response): response of requests.
            min_size (tuple or None): minimum size of required images.
            max_size (tuple or None): maximum size of required images.
        Returns:
            bool: whether to keep the image.
        """
        try:
            img = Image.open(BytesIO(response.content))
        except (IOError, OSError):
            return False
        if min_size and not self._size_gt(img.size, min_size):
            return False
        if max_size and not self._size_lt(img.size, max_size):
            return False
        return True
heat_tests.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_get_template_files(self):
        tmpl = '''
    # comment

    heat_template_version: 2013-05-23
    resources:
      server1:
        type: OS::Nova::Server
        properties:
            flavor: m1.medium
            image: cirros
            user_data_format: RAW
            user_data:
              get_file: http://test.example/example
    '''
        expected_files = {u'http://test.example/example': b'echo "test"'}
        url = 'http://test.example/example'
        data = b'echo "test"'
        self.mox.StubOutWithMock(six.moves.urllib.request, 'urlopen')
        six.moves.urllib.request.urlopen(url).AndReturn(
            six.BytesIO(data))
        self.mox.ReplayAll()
        files = api.heat.get_template_files(template_data=tmpl)[0]
        self.assertEqual(files, expected_files)
protocol.py 文件源码 项目:txamqp 作者: txamqp 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def read_content(queue):
    frame = yield queue.get()
    header = frame.payload
    children = []
    for i in range(header.weight):
        content = yield read_content(queue)
        children.append(content)
    size = header.size
    read = 0
    buf = six.StringIO()
    while read < size:
        body = yield queue.get()
        content = body.payload.content

        # if this is the first instance of real binary content, convert the string buffer to BytesIO
        # Not a nice fix but it preserves the original behaviour
        if six.PY3 and isinstance(content, bytes) and isinstance(buf, six.StringIO):
            buf = six.BytesIO()

        buf.write(content)
        read += len(content)
    defer.returnValue(Content(buf.getvalue(), children, header.properties.copy()))
http.py 文件源码 项目:GAMADV-XTD 作者: taers232c 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, body, mimetype='application/octet-stream',
               chunksize=DEFAULT_CHUNK_SIZE, resumable=False):
    """Create a new MediaInMemoryUpload.

  DEPRECATED: Use MediaIoBaseUpload with either io.TextIOBase or StringIO for
  the stream.

  Args:
    body: string, Bytes of body content.
    mimetype: string, Mime-type of the file or default of
      'application/octet-stream'.
    chunksize: int, File will be uploaded in chunks of this many bytes. Only
      used if resumable=True.
    resumable: bool, True if this is a resumable upload. False means upload
      in a single request.
    """
    fd = BytesIO(body)
    super(MediaInMemoryUpload, self).__init__(fd, mimetype, chunksize=chunksize,
                                              resumable=resumable)
statusserver.py 文件源码 项目:piqueserver 作者: piqueserver 项目源码 文件源码 阅读 74 收藏 0 点赞 0 评论 0
def get_overview(self):
        current_time = reactor.seconds()
        if (self.last_overview is None or
                self.last_map_name != self.protocol.map_info.name or
                current_time - self.last_overview > OVERVIEW_UPDATE_INTERVAL):
            overview = self.protocol.map.get_overview(rgba=True)
            image = Image.frombytes('RGBA', (512, 512), overview)
            data = BytesIO()
            image.save(data, 'png')
            self.overview = data.getvalue()
            self.last_overview = current_time
            self.last_map_name = self.protocol.map_info.name
        return self.overview
dataset.py 文件源码 项目:sceneReco 作者: bear63 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __getitem__(self, index):
        assert index <= len(self), 'index range error'
        index += 1
        with self.env.begin(write=False) as txn:
            img_key = 'image-%09d' % index
            imgbuf = txn.get(img_key)

            buf = six.BytesIO()
            buf.write(imgbuf)
            buf.seek(0)
            try:
                img = Image.open(buf).convert('L')
            except IOError:
                print('Corrupted image for %d' % index)
                return self[index + 1]

            if self.transform is not None:
                img = self.transform(img)

            label_key = 'label-%09d' % index
            label = str(txn.get(label_key))
            if self.target_transform is not None:
                label = self.target_transform(label)

        return (img, label)
test_npz.py 文件源码 项目:cupy 作者: cupy 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def check_savez(self, savez, dtype):
        a1 = testing.shaped_arange((2, 3, 4), dtype=dtype)
        a2 = testing.shaped_arange((3, 4, 5), dtype=dtype)

        sio = six.BytesIO()
        savez(sio, a1, a2)
        s = sio.getvalue()
        sio.close()

        sio = six.BytesIO(s)
        with cupy.load(sio) as d:
            b1 = d['arr_0']
            b2 = d['arr_1']
        sio.close()

        testing.assert_array_equal(a1, b1)
        testing.assert_array_equal(a2, b2)
test_pharmacophores.py 文件源码 项目:kripodb 作者: 3D-e-Chem 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def example1_sdfile():
    body = b'''
 OpenBabel07051617103D

 14  0  0  0  0  0  0  0  0  0999 V2000
   22.5699   -6.3076   36.8593 O   0  6  0  0  0
   23.7871   -3.9004   36.3395 P   0  7  0  0  0
   23.7871   -3.9004   36.3395 P   0  7  0  0  0
   23.1923   -6.6223   36.0325 P   0  7  0  0  0
   23.1923   -6.6223   36.0325 P   0  7  0  0  0
   18.7201   -9.8937   40.4312 As  0  7  0  0  0
   18.3503   -9.5392   39.3836 Ne  0  0  0  0  0
   20.4171  -10.6185   40.3362 Ne  0  0  0  0  0
   19.7922   -6.5243   39.4320 He  0  0  0  0  0
   17.9406   -2.4043   34.9401 O   0  6  0  0  0
   14.6641   -7.4275   36.2138 As  0  7  0  0  0
   15.4420   -8.2931   36.1398 Ne  0  0  0  0  0
   14.4007   -6.8416   35.2404 Ne  0  0  0  0  0
   22.3608   -5.1679   39.2345 Rn  0  0  0  0  0
M  CHG  8   1  -2   2  -3   3  -3   4  -3   5  -3   6  -3  10  -2  11  -3
M  END
$$$$
'''
    return BytesIO(body)
test_serde_weights.py 文件源码 项目:ngraph 作者: NervanaSystems 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_serialize_and_deserialize_multi_np():
    x = np.random.random((1, ))
    y = np.random.random((2, 3))
    z = np.random.random((1, 5, 2))
    values = {'x': x, 'y': y, 'z': z}

    # write out values in x
    f = BytesIO()
    serde_weights.write_np_values(values, f)

    # reset file so it appears to be freshly opened for deserialize_weights
    f.seek(0)
    de_values = serde_weights.read_np_values(f)

    assert values.keys() == de_values.keys()
    for k, v in values.items():
        assert (de_values[k] == v).all()
dicomparser.py 文件源码 项目:dicompyler-core 作者: dicompyler 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, dataset):

        if isinstance(dataset, Dataset):
            self.ds = dataset
        elif isinstance(dataset, (string_types, BytesIO)):
            try:
                self.ds = \
                    read_file(dataset, defer_size=100, force=True)
            except:
                # Raise the error for the calling method to handle
                raise
            else:
                # Sometimes DICOM files may not have headers,
                # but they should always have a SOPClassUID
                # to declare what type of file it is.
                # If the file doesn't have a SOPClassUID,
                # then it probably isn't DICOM.
                if not "SOPClassUID" in self.ds:
                    raise AttributeError
        else:
            raise AttributeError

######################## SOP Class and Instance Methods #######################
serialization_utils.py 文件源码 项目:catalyst 作者: enigmampc 项目源码 文件源码 阅读 83 收藏 0 点赞 0 评论 0
def loads_with_persistent_ids(str, env):
    """
    Performs a pickle loads on the given string, substituting the given
    TradingEnvironment in to any tokenized representations of a
    TradingEnvironment or AssetFinder.

    Parameters
    ----------
    str : String
        The string representation of the object to be unpickled.
    env : TradingEnvironment
        The TradingEnvironment to be inserted to the unpickled object.

    Returns
    -------
    obj
       An unpickled object formed from the parameter 'str'.
    """
    file = BytesIO(str)
    unpickler = pickle.Unpickler(file)
    unpickler.persistent_load = partial(_persistent_load, env=env)
    return unpickler.load()
test_computeengine.py 文件源码 项目:loman 作者: janusassetallocation 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_serialization_skip_flag():
    comp = Computation()
    comp.add_node("a")
    comp.add_node("b", lambda a: a + 1, serialize=False)
    comp.add_node("c", lambda b: b + 1)

    comp.insert("a", 1)
    comp.compute_all()
    f = six.BytesIO()
    comp.write_dill(f)

    assert comp.state("a") == States.UPTODATE
    assert comp.state("b") == States.UPTODATE
    assert comp.state("c") == States.UPTODATE
    assert comp.value("a") == 1
    assert comp.value("b") == 2
    assert comp.value("c") == 3

    f.seek(0)
    comp2 = Computation.read_dill(f)
    assert comp2.state("a") == States.UPTODATE
    assert comp2.state("b") == States.UNINITIALIZED
    assert comp2.state("c") == States.UPTODATE
    assert comp2.value("a") == 1
    assert comp2.value("c") == 3
http.py 文件源码 项目:REMAP 作者: REMAPApp 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, body, mimetype='application/octet-stream',
               chunksize=DEFAULT_CHUNK_SIZE, resumable=False):
    """Create a new MediaInMemoryUpload.

  DEPRECATED: Use MediaIoBaseUpload with either io.TextIOBase or StringIO for
  the stream.

  Args:
    body: string, Bytes of body content.
    mimetype: string, Mime-type of the file or default of
      'application/octet-stream'.
    chunksize: int, File will be uploaded in chunks of this many bytes. Only
      used if resumable=True.
    resumable: bool, True if this is a resumable upload. False means upload
      in a single request.
    """
    fd = BytesIO(body)
    super(MediaInMemoryUpload, self).__init__(fd, mimetype, chunksize=chunksize,
                                              resumable=resumable)
conftest.py 文件源码 项目:invenio-stats 作者: inveniosoftware 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def objects(bucket):
    """File system location."""
    # Create older versions first
    for key, content in [
            ('LICENSE', b'old license'),
            ('README.rst', b'old readme')]:
        ObjectVersion.create(
            bucket, key, stream=BytesIO(content), size=len(content)
        )

    # Create new versions
    objs = []
    for key, content in [
            ('LICENSE', b'license file'),
            ('README.rst', b'readme file')]:
        objs.append(
            ObjectVersion.create(
                bucket, key, stream=BytesIO(content),
                size=len(content)
            )
        )

    yield objs
test_http.py 文件源码 项目:python-moganclient 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_kwargs_with_files(self, mock_dumps):
        fake = fakes.FakeHTTPResponse(
            200, 'OK', {'Content-Type': 'application/json'}, '{}')
        mock_dumps.return_value = "{'files': test}}"
        data = six.BytesIO(b'test')
        kwargs = {'endpoint_override': 'http://no.where/',
                  'data': {'files': data}}
        client = http.SessionClient(mock.ANY)

        self.request.return_value = (fake, {})

        resp, body = client.request('', 'GET', **kwargs)

        self.assertEqual({'endpoint_override': 'http://no.where/',
                          'json': {'files': data},
                          'user_agent': 'python-moganclient',
                          'raise_exc': False}, self.request.call_args[1])
        self.assertEqual(200, resp.status_code)
        self.assertEqual({}, body)
        self.assertEqual({}, utils.get_response_body(resp))
test_npz.py 文件源码 项目:chainer-deconv 作者: germanRos 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def check_savez(self, savez, dtype):
        a1 = testing.shaped_arange((2, 3, 4), dtype=dtype)
        a2 = testing.shaped_arange((3, 4, 5), dtype=dtype)

        sio = six.BytesIO()
        savez(sio, a1, a2)
        s = sio.getvalue()
        sio.close()

        sio = six.BytesIO(s)
        with cupy.load(sio) as d:
            b1 = d['arr_0']
            b2 = d['arr_1']
        sio.close()

        testing.assert_array_equal(a1, b1)
        testing.assert_array_equal(a2, b2)
objects.py 文件源码 项目:gwyfile 作者: tuxu 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def serialize(self):
        """Return the binary representation."""
        io = BytesIO()
        for k in self.keys():
            try:
                typecode = self.typecodes[k]
            except KeyError:
                typecode = None
            io.write(serialize_component(k, self[k], typecode))
        buf = io.getvalue()
        return b''.join([
            self.name.encode('utf-8'),
            b'\0',
            struct.pack('<I', len(buf)),
            buf
        ])
http.py 文件源码 项目:OneClickDTU 作者: satwikkansal 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, body, mimetype='application/octet-stream',
               chunksize=DEFAULT_CHUNK_SIZE, resumable=False):
    """Create a new MediaInMemoryUpload.

  DEPRECATED: Use MediaIoBaseUpload with either io.TextIOBase or StringIO for
  the stream.

  Args:
    body: string, Bytes of body content.
    mimetype: string, Mime-type of the file or default of
      'application/octet-stream'.
    chunksize: int, File will be uploaded in chunks of this many bytes. Only
      used if resumable=True.
    resumable: bool, True if this is a resumable upload. False means upload
      in a single request.
    """
    fd = BytesIO(body)
    super(MediaInMemoryUpload, self).__init__(fd, mimetype, chunksize=chunksize,
                                              resumable=resumable)
test_models.py 文件源码 项目:gthnk 作者: iandennismiller 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_crud(self):
        "Go through CRUD operations to exercise the models."
        # test create
        day = Day.create(date=datetime.date.today())
        self.assertIsNotNone(day, "Day is created")

        entry = Entry.create(day=day, content="empty", timestamp=datetime.datetime.now())
        self.assertIsNotNone(entry, "Entry is created")

        # attach an image to a day
        with open("gthnk/tests/data/gthnk-big.jpg", "rb") as f:
            buf = six.BytesIO(f.read())
            page = day.attach(buf.getvalue())
            self.assertIsNotNone(page, "Page is created")

        # test read
        day_find = Day.find(date=datetime.date.today())
        self.assertIsNotNone(day_find, "Day can be retrieved")
        self.assertEqual(day_find.entries[0].content, "empty", "contains correct value")

        entry_find = Entry.find(content="empty")
        self.assertIsNotNone(entry_find, "Entry can be retrieved")
        self.assertEqual(entry_find.content, "empty", "contains correct value")

    # special testing for Page class
catmaid.py 文件源码 项目:ingest-client 作者: jhuapl-boss 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def process(self, file_path, x_index, y_index, z_index, t_index=0):
        """
        Method to load the configuration file and select the correct validator and backend

        Args:
            file_path(str): An absolute file path for the specified tile
            x_index(int): The tile index in the X dimension
            y_index(int): The tile index in the Y dimension
            z_index(int): The tile index in the Z dimension
            t_index(int): The time index

        Returns:
            (io.BufferedReader): A file handle for the specified tile

        """
        # Save img to png and return handle
        tile_data = Image.open(file_path)

        output = six.BytesIO()
        tile_data.save(output, format=self.parameters["filetype"].upper())

        # Send handle back
        return output
catmaid.py 文件源码 项目:ingest-client 作者: jhuapl-boss 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def process(self, file_path, x_index, y_index, z_index, t_index=0):
        """
        Method to load the configuration file and select the correct validator and backend

        Args:
            file_path(str): An absolute file path for the specified tile
            x_index(int): The tile index in the X dimension
            y_index(int): The tile index in the Y dimension
            z_index(int): The tile index in the Z dimension
            t_index(int): The time index

        Returns:
            (io.BufferedReader): A file handle for the specified tile

        """
        # Save img to png and return handle
        tile_data = Image.open(file_path)

        output = six.BytesIO()
        tile_data.save(output, format=self.parameters["filetype"].upper())

        # Send handle back
        return output
testing.py 文件源码 项目:django-icekit 作者: ic-labs 项目源码 文件源码 阅读 63 收藏 0 点赞 0 评论 0
def new_test_image():
    """
    Creates an automatically generated test image.
    In your testing `tearDown` method make sure to delete the test
    image with the helper function `delete_test_image`.
    The recommended way of using this helper function is as follows:
        object_1.image_property.save(*new_test_image())
    :return: Image name and image content file.
    """
    warnings.warn(DeprecationWarning(
        "new_test_image() is deprecated in favour of the get_sample_image() "
        "context manager."), stacklevel=2)
    image_name = 'test-{}.png'.format(uuid.uuid4())
    image = Image.new('RGBA', size=(50, 50), color=(256, 0, 0))
    ImageDraw.Draw(image)
    byte_io = BytesIO()
    image.save(byte_io, 'png')
    byte_io.seek(0)
    return image_name, ContentFile(byte_io.read(), image_name)
table_remote.py 文件源码 项目:agate-remote 作者: wireservice 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def from_url(cls, url, callback=agate.Table.from_csv, binary=False, **kwargs):
    """
    Download a remote file and pass it to a :class:`.Table` parser.

    :param url:
        URL to a file to load.
    :param callback:
        The method to invoke to create the table. Typically either
        :meth:`agate.Table.from_csv` or :meth:`agate.Table.from_json`, but
        it could also be a method provided by an extension.
    :param binary:
        If :code:`False` the downloaded data will be processed as a string,
        otherwise it will be treated as binary data. (e.g. for Excel files)
    """
    r = requests.get(url)

    if binary:
        content = six.BytesIO(r.content)
    else:
        if six.PY2:
            content = six.StringIO(r.content.decode('utf-8'))
        else:
            content = six.StringIO(r.text)

    return callback(content, **kwargs)
ofxparse.py 文件源码 项目:biweeklybudget 作者: jantman 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __init__(self, fh):
        """
        fh should be a seekable file-like byte stream object
        """
        self.headers = odict.OrderedDict()
        self.fh = fh

        if not is_iterable(self.fh):
            return
        if not hasattr(self.fh, "seek"):
            return  # fh is not a file object, we're doomed.

        # If the file handler is text stream, convert to bytes one:
        first = self.fh.read(1)
        self.fh.seek(0)
        if type(first) != bytes:
            self.fh = six.BytesIO(six.b(self.fh.read()))

        with save_pos(self.fh):
            self.read_headers()
            self.handle_encoding()
            self.replace_NONE_headers()


问题


面经


文章

微信
公众号

扫码关注公众号