python类BytesIO()的实例源码

test_driver.py 文件源码 项目:nova-lxd 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_get_console_output(self, execute, _open, _):
        ctx = context.get_admin_context()
        instance = fake_instance.fake_instance_obj(
            ctx, name='test', memory_mb=0)
        expected_calls = [
            mock.call(
                'chown', '1234:1234', '/var/log/lxd/{}/console.log'.format(
                    instance.name),
                run_as_root=True),
            mock.call(
                'chmod', '755', '/lxd/containers/{}'.format(
                    instance.name),
                run_as_root=True),
        ]
        _open.return_value.__enter__.return_value = six.BytesIO(b'output')

        lxd_driver = driver.LXDDriver(None)

        contents = lxd_driver.get_console_output(context, instance)

        self.assertEqual(b'output', contents)
        self.assertEqual(expected_calls, execute.call_args_list)
postgres.py 文件源码 项目:fabricio 作者: renskiy 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def update_config(self, content, path):
        old_file = six.BytesIO()
        if files.exists(path, use_sudo=self.sudo):
            fab.get(remote_path=path, local_path=old_file, use_sudo=self.sudo)
        old_content = old_file.getvalue()
        need_update = content != old_content
        if need_update:
            fabricio.move_file(
                path_from=path,
                path_to=path + '.backup',
                sudo=self.sudo,
                ignore_errors=True,
            )
            fab.put(six.BytesIO(content), path, use_sudo=self.sudo, mode='0644')
            fabricio.log('{path} updated'.format(path=path))
        else:
            fabricio.log('{path} not changed'.format(path=path))
        return need_update
service.py 文件源码 项目:fabricio 作者: renskiy 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _update(self, compose_file, new_settings, force=False):
        if not force:
            settings, digests = self.current_settings
            digests = digests and json.loads(b64decode(digests).decode())
            if settings == new_settings and digests is not None:
                new_digests = self._get_digests(digests)
                if digests == new_digests:
                    return False
        with fab.cd(self.temp_dir):
            fab.put(six.BytesIO(compose_file), self.actual_compose_file)
            fabricio.run('docker stack deploy {options} {name}'.format(
                options=utils.Options(self.options),
                name=self.name,
            ))
        self.stack_updated.set()
        return True
test_base.py 文件源码 项目:deb-python-pecan 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_streaming_response(self):

        class RootController(object):
            @expose(content_type='text/plain')
            def test(self, foo):
                if foo == 'stream':
                    # mimic large file
                    contents = six.BytesIO(b_('stream'))
                    response.content_type = 'application/octet-stream'
                    contents.seek(0, os.SEEK_END)
                    response.content_length = contents.tell()
                    contents.seek(0, os.SEEK_SET)
                    response.app_iter = contents
                    return response
                else:
                    return 'plain text'

        app = TestApp(Pecan(RootController()))
        r = app.get('/test/stream')
        assert r.content_type == 'application/octet-stream'
        assert r.body == b_('stream')

        r = app.get('/test/plain')
        assert r.content_type == 'text/plain'
        assert r.body == b_('plain text')
main_test.py 文件源码 项目:appbackendapi 作者: codesdk 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_upload(client):
    # Upload a simple file
    file_content = b"This is some test content."

    r = client.post(
        '/upload',
        data={
            'file': (BytesIO(file_content), 'example.txt')
        }
    )

    assert r.status_code == 200

    # The app should return the public cloud storage URL for the uploaded
    # file. Download and verify it.
    cloud_storage_url = r.data.decode('utf-8')
    r = requests.get(cloud_storage_url)
    assert r.text.encode('utf-8') == file_content
http.py 文件源码 项目:alfredToday 作者: jeeftor 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self, body, mimetype='application/octet-stream',
               chunksize=DEFAULT_CHUNK_SIZE, resumable=False):
    """Create a new MediaInMemoryUpload.

  DEPRECATED: Use MediaIoBaseUpload with either io.TextIOBase or StringIO for
  the stream.

  Args:
    body: string, Bytes of body content.
    mimetype: string, Mime-type of the file or default of
      'application/octet-stream'.
    chunksize: int, File will be uploaded in chunks of this many bytes. Only
      used if resumable=True.
    resumable: bool, True if this is a resumable upload. False means upload
      in a single request.
    """
    fd = BytesIO(body)
    super(MediaInMemoryUpload, self).__init__(fd, mimetype, chunksize=chunksize,
                                              resumable=resumable)
test_actions.py 文件源码 项目:edx-enterprise 作者: edx 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def setUp(self):
        """
        Test suite set up method.
        """
        super(TestCSVExportAction, self).setUp()
        self.output_stream = BytesIO()
        response_instance_mock = mock.MagicMock(wraps=self.output_stream)
        self.response_mock = self._make_patch(
            "enterprise.admin.actions.HttpResponse",
            return_value=response_instance_mock
        )

        self.model_admin_mock = mock.Mock()
        self.model_admin_mock.model._meta.fields = [
            self._make_field("code"), self._make_field("name"), self._make_field("description"),
        ]
http.py 文件源码 项目:Webradio_v2 作者: Acer54 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, body, mimetype='application/octet-stream',
               chunksize=DEFAULT_CHUNK_SIZE, resumable=False):
    """Create a new MediaInMemoryUpload.

  DEPRECATED: Use MediaIoBaseUpload with either io.TextIOBase or StringIO for
  the stream.

  Args:
    body: string, Bytes of body content.
    mimetype: string, Mime-type of the file or default of
      'application/octet-stream'.
    chunksize: int, File will be uploaded in chunks of this many bytes. Only
      used if resumable=True.
    resumable: bool, True if this is a resumable upload. False means upload
      in a single request.
    """
    fd = BytesIO(body)
    super(MediaInMemoryUpload, self).__init__(fd, mimetype, chunksize=chunksize,
                                              resumable=resumable)
http.py 文件源码 项目:GAMADV-X 作者: taers232c 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, body, mimetype='application/octet-stream',
               chunksize=DEFAULT_CHUNK_SIZE, resumable=False):
    """Create a new MediaInMemoryUpload.

  DEPRECATED: Use MediaIoBaseUpload with either io.TextIOBase or StringIO for
  the stream.

  Args:
    body: string, Bytes of body content.
    mimetype: string, Mime-type of the file or default of
      'application/octet-stream'.
    chunksize: int, File will be uploaded in chunks of this many bytes. Only
      used if resumable=True.
    resumable: bool, True if this is a resumable upload. False means upload
      in a single request.
    """
    fd = BytesIO(body)
    super(MediaInMemoryUpload, self).__init__(fd, mimetype, chunksize=chunksize,
                                              resumable=resumable)
dataset.py 文件源码 项目:crnn 作者: wulivicte 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __getitem__(self, index):
        assert index <= len(self), 'index range error'
        index += 1
        with self.env.begin(write=False) as txn:
            img_key = 'image-%09d' % index
            imgbuf = txn.get(img_key)

            buf = six.BytesIO()
            buf.write(imgbuf)
            buf.seek(0)
            try:
                img = Image.open(buf).convert('L')
            except IOError:
                print('Corrupted image for %d' % index)
                return self[index + 1]

            if self.transform is not None:
                img = self.transform(img)

            label_key = 'label-%09d' % index
            label = str(txn.get(label_key))

            if self.target_transform is not None:
                label = self.target_transform(label)

        return (img, label)
serialization_utils.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def dumps_with_persistent_ids(obj, protocol=None):
    """
    Performs a pickle dumps on the given object, substituting all references to
    a TradingEnvironment or AssetFinder with tokenized representations.

    All arguments are passed to pickle.Pickler and are described therein.
    """
    file = BytesIO()
    pickler = pickle.Pickler(file, protocol)
    pickler.persistent_id = _persistent_id
    pickler.dump(obj)
    return file.getvalue()
storage.py 文件源码 项目:cloud-volume 作者: seung-lab 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def _compress(content):
        stringio = BytesIO()
        gzip_obj = gzip.GzipFile(mode='wb', fileobj=stringio)
        gzip_obj.write(content)
        gzip_obj.close()
        return stringio.getvalue()
storage.py 文件源码 项目:cloud-volume 作者: seung-lab 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _uncompress(content):
        stringio = BytesIO(content)
        with gzip.GzipFile(mode='rb', fileobj=stringio) as gfile:
            return gfile.read()
test_connection.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def make_options_body(self):
        options_buf = BytesIO()
        write_stringmultimap(options_buf, {
            'CQL_VERSION': ['3.0.1'],
            'COMPRESSION': []
        })
        return options_buf.getvalue()
test_connection.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def make_error_body(self, code, msg):
        buf = BytesIO()
        write_int(buf, code)
        write_string(buf, msg)
        return buf.getvalue()
test_connection.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_requested_compression_not_available(self, *args):
        c = self.make_connection()
        c._requests = {0: (c._handle_options_response, ProtocolHandler.decode_message, [])}
        c.defunct = Mock()
        # request lz4 compression
        c.compression = "lz4"

        locally_supported_compressions.pop('lz4', None)
        locally_supported_compressions.pop('snappy', None)
        locally_supported_compressions['lz4'] = ('lz4compress', 'lz4decompress')
        locally_supported_compressions['snappy'] = ('snappycompress', 'snappydecompress')

        # read in a SupportedMessage response
        header = self.make_header_prefix(SupportedMessage)

        # the server only supports snappy
        options_buf = BytesIO()
        write_stringmultimap(options_buf, {
            'CQL_VERSION': ['3.0.3'],
            'COMPRESSION': ['snappy']
        })
        options = options_buf.getvalue()

        c.process_msg(_Frame(version=4, flags=0, stream=0, opcode=SupportedMessage.opcode, body_offset=9, end_pos=9 + len(options)), options)

        # make sure it errored correctly
        c.defunct.assert_called_once_with(ANY)
        args, kwargs = c.defunct.call_args
        self.assertIsInstance(args[0], ProtocolError)
test_connection.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_use_requested_compression(self, *args):
        c = self.make_connection()
        c._requests = {0: (c._handle_options_response, ProtocolHandler.decode_message, [])}
        c.defunct = Mock()
        # request snappy compression
        c.compression = "snappy"

        locally_supported_compressions.pop('lz4', None)
        locally_supported_compressions.pop('snappy', None)
        locally_supported_compressions['lz4'] = ('lz4compress', 'lz4decompress')
        locally_supported_compressions['snappy'] = ('snappycompress', 'snappydecompress')

        # read in a SupportedMessage response
        header = self.make_header_prefix(SupportedMessage)

        # the server only supports snappy
        options_buf = BytesIO()
        write_stringmultimap(options_buf, {
            'CQL_VERSION': ['3.0.3'],
            'COMPRESSION': ['snappy', 'lz4']
        })
        options = options_buf.getvalue()

        c.process_msg(_Frame(version=4, flags=0, stream=0, opcode=SupportedMessage.opcode, body_offset=9, end_pos=9 + len(options)), options)

        self.assertEqual(c.decompressor, locally_supported_compressions['snappy'][1])
test_connection.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_disable_compression(self, *args):
        c = self.make_connection()
        c._requests = {0: (c._handle_options_response, ProtocolHandler.decode_message)}
        c.defunct = Mock()
        # disable compression
        c.compression = False

        locally_supported_compressions.pop('lz4', None)
        locally_supported_compressions.pop('snappy', None)
        locally_supported_compressions['lz4'] = ('lz4compress', 'lz4decompress')
        locally_supported_compressions['snappy'] = ('snappycompress', 'snappydecompress')

        # read in a SupportedMessage response
        header = self.make_header_prefix(SupportedMessage)

        # the server only supports snappy
        options_buf = BytesIO()
        write_stringmultimap(options_buf, {
            'CQL_VERSION': ['3.0.3'],
            'COMPRESSION': ['snappy', 'lz4']
        })
        options = options_buf.getvalue()

        message = self.make_msg(header, options)
        c.process_msg(message, len(message) - 8)

        self.assertEqual(c.decompressor, None)
test_libevreactor.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def make_options_body(self):
        options_buf = BytesIO()
        write_stringmultimap(options_buf, {
            'CQL_VERSION': ['3.0.1'],
            'COMPRESSION': []
        })
        return options_buf.getvalue()
test_libevreactor.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def make_error_body(self, code, msg):
        buf = BytesIO()
        write_int(buf, code)
        write_string(buf, msg)
        return buf.getvalue()


问题


面经


文章

微信
公众号

扫码关注公众号