def test_get_console_output(self, execute, _open, _):
ctx = context.get_admin_context()
instance = fake_instance.fake_instance_obj(
ctx, name='test', memory_mb=0)
expected_calls = [
mock.call(
'chown', '1234:1234', '/var/log/lxd/{}/console.log'.format(
instance.name),
run_as_root=True),
mock.call(
'chmod', '755', '/lxd/containers/{}'.format(
instance.name),
run_as_root=True),
]
_open.return_value.__enter__.return_value = six.BytesIO(b'output')
lxd_driver = driver.LXDDriver(None)
contents = lxd_driver.get_console_output(context, instance)
self.assertEqual(b'output', contents)
self.assertEqual(expected_calls, execute.call_args_list)
python类BytesIO()的实例源码
def update_config(self, content, path):
old_file = six.BytesIO()
if files.exists(path, use_sudo=self.sudo):
fab.get(remote_path=path, local_path=old_file, use_sudo=self.sudo)
old_content = old_file.getvalue()
need_update = content != old_content
if need_update:
fabricio.move_file(
path_from=path,
path_to=path + '.backup',
sudo=self.sudo,
ignore_errors=True,
)
fab.put(six.BytesIO(content), path, use_sudo=self.sudo, mode='0644')
fabricio.log('{path} updated'.format(path=path))
else:
fabricio.log('{path} not changed'.format(path=path))
return need_update
def _update(self, compose_file, new_settings, force=False):
if not force:
settings, digests = self.current_settings
digests = digests and json.loads(b64decode(digests).decode())
if settings == new_settings and digests is not None:
new_digests = self._get_digests(digests)
if digests == new_digests:
return False
with fab.cd(self.temp_dir):
fab.put(six.BytesIO(compose_file), self.actual_compose_file)
fabricio.run('docker stack deploy {options} {name}'.format(
options=utils.Options(self.options),
name=self.name,
))
self.stack_updated.set()
return True
def test_streaming_response(self):
class RootController(object):
@expose(content_type='text/plain')
def test(self, foo):
if foo == 'stream':
# mimic large file
contents = six.BytesIO(b_('stream'))
response.content_type = 'application/octet-stream'
contents.seek(0, os.SEEK_END)
response.content_length = contents.tell()
contents.seek(0, os.SEEK_SET)
response.app_iter = contents
return response
else:
return 'plain text'
app = TestApp(Pecan(RootController()))
r = app.get('/test/stream')
assert r.content_type == 'application/octet-stream'
assert r.body == b_('stream')
r = app.get('/test/plain')
assert r.content_type == 'text/plain'
assert r.body == b_('plain text')
def test_upload(client):
# Upload a simple file
file_content = b"This is some test content."
r = client.post(
'/upload',
data={
'file': (BytesIO(file_content), 'example.txt')
}
)
assert r.status_code == 200
# The app should return the public cloud storage URL for the uploaded
# file. Download and verify it.
cloud_storage_url = r.data.decode('utf-8')
r = requests.get(cloud_storage_url)
assert r.text.encode('utf-8') == file_content
def __init__(self, body, mimetype='application/octet-stream',
chunksize=DEFAULT_CHUNK_SIZE, resumable=False):
"""Create a new MediaInMemoryUpload.
DEPRECATED: Use MediaIoBaseUpload with either io.TextIOBase or StringIO for
the stream.
Args:
body: string, Bytes of body content.
mimetype: string, Mime-type of the file or default of
'application/octet-stream'.
chunksize: int, File will be uploaded in chunks of this many bytes. Only
used if resumable=True.
resumable: bool, True if this is a resumable upload. False means upload
in a single request.
"""
fd = BytesIO(body)
super(MediaInMemoryUpload, self).__init__(fd, mimetype, chunksize=chunksize,
resumable=resumable)
def setUp(self):
"""
Test suite set up method.
"""
super(TestCSVExportAction, self).setUp()
self.output_stream = BytesIO()
response_instance_mock = mock.MagicMock(wraps=self.output_stream)
self.response_mock = self._make_patch(
"enterprise.admin.actions.HttpResponse",
return_value=response_instance_mock
)
self.model_admin_mock = mock.Mock()
self.model_admin_mock.model._meta.fields = [
self._make_field("code"), self._make_field("name"), self._make_field("description"),
]
def __init__(self, body, mimetype='application/octet-stream',
chunksize=DEFAULT_CHUNK_SIZE, resumable=False):
"""Create a new MediaInMemoryUpload.
DEPRECATED: Use MediaIoBaseUpload with either io.TextIOBase or StringIO for
the stream.
Args:
body: string, Bytes of body content.
mimetype: string, Mime-type of the file or default of
'application/octet-stream'.
chunksize: int, File will be uploaded in chunks of this many bytes. Only
used if resumable=True.
resumable: bool, True if this is a resumable upload. False means upload
in a single request.
"""
fd = BytesIO(body)
super(MediaInMemoryUpload, self).__init__(fd, mimetype, chunksize=chunksize,
resumable=resumable)
def __init__(self, body, mimetype='application/octet-stream',
chunksize=DEFAULT_CHUNK_SIZE, resumable=False):
"""Create a new MediaInMemoryUpload.
DEPRECATED: Use MediaIoBaseUpload with either io.TextIOBase or StringIO for
the stream.
Args:
body: string, Bytes of body content.
mimetype: string, Mime-type of the file or default of
'application/octet-stream'.
chunksize: int, File will be uploaded in chunks of this many bytes. Only
used if resumable=True.
resumable: bool, True if this is a resumable upload. False means upload
in a single request.
"""
fd = BytesIO(body)
super(MediaInMemoryUpload, self).__init__(fd, mimetype, chunksize=chunksize,
resumable=resumable)
def __getitem__(self, index):
assert index <= len(self), 'index range error'
index += 1
with self.env.begin(write=False) as txn:
img_key = 'image-%09d' % index
imgbuf = txn.get(img_key)
buf = six.BytesIO()
buf.write(imgbuf)
buf.seek(0)
try:
img = Image.open(buf).convert('L')
except IOError:
print('Corrupted image for %d' % index)
return self[index + 1]
if self.transform is not None:
img = self.transform(img)
label_key = 'label-%09d' % index
label = str(txn.get(label_key))
if self.target_transform is not None:
label = self.target_transform(label)
return (img, label)
def dumps_with_persistent_ids(obj, protocol=None):
"""
Performs a pickle dumps on the given object, substituting all references to
a TradingEnvironment or AssetFinder with tokenized representations.
All arguments are passed to pickle.Pickler and are described therein.
"""
file = BytesIO()
pickler = pickle.Pickler(file, protocol)
pickler.persistent_id = _persistent_id
pickler.dump(obj)
return file.getvalue()
def _compress(content):
stringio = BytesIO()
gzip_obj = gzip.GzipFile(mode='wb', fileobj=stringio)
gzip_obj.write(content)
gzip_obj.close()
return stringio.getvalue()
def _uncompress(content):
stringio = BytesIO(content)
with gzip.GzipFile(mode='rb', fileobj=stringio) as gfile:
return gfile.read()
def make_options_body(self):
options_buf = BytesIO()
write_stringmultimap(options_buf, {
'CQL_VERSION': ['3.0.1'],
'COMPRESSION': []
})
return options_buf.getvalue()
def make_error_body(self, code, msg):
buf = BytesIO()
write_int(buf, code)
write_string(buf, msg)
return buf.getvalue()
def test_requested_compression_not_available(self, *args):
c = self.make_connection()
c._requests = {0: (c._handle_options_response, ProtocolHandler.decode_message, [])}
c.defunct = Mock()
# request lz4 compression
c.compression = "lz4"
locally_supported_compressions.pop('lz4', None)
locally_supported_compressions.pop('snappy', None)
locally_supported_compressions['lz4'] = ('lz4compress', 'lz4decompress')
locally_supported_compressions['snappy'] = ('snappycompress', 'snappydecompress')
# read in a SupportedMessage response
header = self.make_header_prefix(SupportedMessage)
# the server only supports snappy
options_buf = BytesIO()
write_stringmultimap(options_buf, {
'CQL_VERSION': ['3.0.3'],
'COMPRESSION': ['snappy']
})
options = options_buf.getvalue()
c.process_msg(_Frame(version=4, flags=0, stream=0, opcode=SupportedMessage.opcode, body_offset=9, end_pos=9 + len(options)), options)
# make sure it errored correctly
c.defunct.assert_called_once_with(ANY)
args, kwargs = c.defunct.call_args
self.assertIsInstance(args[0], ProtocolError)
def test_use_requested_compression(self, *args):
c = self.make_connection()
c._requests = {0: (c._handle_options_response, ProtocolHandler.decode_message, [])}
c.defunct = Mock()
# request snappy compression
c.compression = "snappy"
locally_supported_compressions.pop('lz4', None)
locally_supported_compressions.pop('snappy', None)
locally_supported_compressions['lz4'] = ('lz4compress', 'lz4decompress')
locally_supported_compressions['snappy'] = ('snappycompress', 'snappydecompress')
# read in a SupportedMessage response
header = self.make_header_prefix(SupportedMessage)
# the server only supports snappy
options_buf = BytesIO()
write_stringmultimap(options_buf, {
'CQL_VERSION': ['3.0.3'],
'COMPRESSION': ['snappy', 'lz4']
})
options = options_buf.getvalue()
c.process_msg(_Frame(version=4, flags=0, stream=0, opcode=SupportedMessage.opcode, body_offset=9, end_pos=9 + len(options)), options)
self.assertEqual(c.decompressor, locally_supported_compressions['snappy'][1])
def test_disable_compression(self, *args):
c = self.make_connection()
c._requests = {0: (c._handle_options_response, ProtocolHandler.decode_message)}
c.defunct = Mock()
# disable compression
c.compression = False
locally_supported_compressions.pop('lz4', None)
locally_supported_compressions.pop('snappy', None)
locally_supported_compressions['lz4'] = ('lz4compress', 'lz4decompress')
locally_supported_compressions['snappy'] = ('snappycompress', 'snappydecompress')
# read in a SupportedMessage response
header = self.make_header_prefix(SupportedMessage)
# the server only supports snappy
options_buf = BytesIO()
write_stringmultimap(options_buf, {
'CQL_VERSION': ['3.0.3'],
'COMPRESSION': ['snappy', 'lz4']
})
options = options_buf.getvalue()
message = self.make_msg(header, options)
c.process_msg(message, len(message) - 8)
self.assertEqual(c.decompressor, None)
test_libevreactor.py 文件源码
项目:deb-python-cassandra-driver
作者: openstack
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def make_options_body(self):
options_buf = BytesIO()
write_stringmultimap(options_buf, {
'CQL_VERSION': ['3.0.1'],
'COMPRESSION': []
})
return options_buf.getvalue()
test_libevreactor.py 文件源码
项目:deb-python-cassandra-driver
作者: openstack
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def make_error_body(self, code, msg):
buf = BytesIO()
write_int(buf, code)
write_string(buf, msg)
return buf.getvalue()