def test_simple(self):
mock_pipeline = test_helper.get_mock_pipeline([])
data_root = os.path.join('local_data', 'unittests')
if os.path.exists(data_root):
shutil.rmtree(data_root)
_copy = copy_file.Subscriber(mock_pipeline)
_copy.setup({
helper.DATA_ROOT: data_root,
'workers': 1,
'tag': 'default',
helper.COPY_EXT: ['xyz']
})
_copy.consume(document.get_document('mock.xyz'), BytesIO(b'juba.'))
_copy.consume(document.get_document('ignore.doc'), BytesIO(b'mock'))
expected = ['39bbf948-mock.xyz']
actual = os.listdir(os.path.join(data_root, 'files', 'xyz'))
self.assertEqual(expected, actual)
python类BytesIO()的实例源码
def reseed(self, netdb):
"""Compress netdb entries and set content"""
zip_file = io.BytesIO()
dat_files = []
for root, dirs, files in os.walk(netdb):
for f in files:
if f.endswith(".dat"):
# TODO check modified time
# may be not older than 10h
dat_files.append(os.path.join(root, f))
if len(dat_files) == 0:
raise PyseederException("Can't get enough netDb entries")
elif len(dat_files) > 75:
dat_files = random.sample(dat_files, 75)
with ZipFile(zip_file, "w", compression=ZIP_DEFLATED) as zf:
for f in dat_files:
zf.write(f, arcname=os.path.split(f)[1])
self.FILE_TYPE = 0x00
self.CONTENT_TYPE = 0x03
self.CONTENT = zip_file.getvalue()
self.CONTENT_LENGTH = len(self.CONTENT)
def _restart_data(self, format_: str='json') -> None:
assert format_ == 'json'
with open(join(CURDIR, 'data', 'helloworld.py')) as f:
testcode = f.read()
self.data = Request({
'filepath': 'test.py',
'action': 'ParseAST',
'content': testcode,
'language': 'python',
})
bufferclass = io.StringIO if format_ == 'json' else io.BytesIO
# This will mock the python_driver stdin
self.sendbuffer = bufferclass()
# This will mock the python_driver stdout
self.recvbuffer = bufferclass()
def __init__(self, codestr: str, astdict: AstDict) -> None:
self._astdict = astdict
# Tokenize and create the noop extractor and the position fixer
self._tokens: List[Token] = [Token(*i) for i in tokenize.tokenize(BytesIO(codestr.encode('utf-8')).readline)]
token_lines = _create_tokenized_lines(codestr, self._tokens)
self.noops_sync = NoopExtractor(codestr, token_lines)
self.pos_sync = LocationFixer(codestr, token_lines)
self.codestr = codestr
# This will store a dict of nodes to end positions, it will be filled
# on parse()
self._node2endpos = None
self.visit_Global = self.visit_Nonlocal = self._promote_names
def __len__(self):
"""
Returns the length of the content
"""
if not self.filepath:
# If there is no filepath, then we're probably dealing with a
# stream in memory like a StringIO or BytesIO stream.
if self.stream:
# Advance to the end of the file
ptr = self.stream.tell()
# Advance to the end of the file and get our length
length = self.stream.seek(0L, SEEK_END)
if length != ptr:
# Return our pointer
self.stream.seek(ptr, SEEK_SET)
else:
# No Stream or Filepath; nothing has been initialized
# yet at all so just return 0
length = 0
else:
if self.stream and self._dirty is True:
self.stream.flush()
self._dirty = False
# Get the size
length = getsize(self.filepath)
return length
def get_stream(self, resource):
return io.BytesIO(self.get_bytes(resource))
def get_resource_stream(self, manager, resource_name):
return io.BytesIO(self.get_resource_string(manager, resource_name))
def prepare_response(self, request, cached):
"""Verify our vary headers match and construct a real urllib3
HTTPResponse object.
"""
# Special case the '*' Vary value as it means we cannot actually
# determine if the cached response is suitable for this request.
if "*" in cached.get("vary", {}):
return
# Ensure that the Vary headers for the cached response match our
# request
for header, value in cached.get("vary", {}).items():
if request.headers.get(header, None) != value:
return
body_raw = cached["response"].pop("body")
headers = CaseInsensitiveDict(data=cached['response']['headers'])
if headers.get('transfer-encoding', '') == 'chunked':
headers.pop('transfer-encoding')
cached['response']['headers'] = headers
try:
body = io.BytesIO(body_raw)
except TypeError:
# This can happen if cachecontrol serialized to v1 format (pickle)
# using Python 2. A Python 2 str(byte string) will be unpickled as
# a Python 3 str (unicode string), which will cause the above to
# fail with:
#
# TypeError: 'str' does not support the buffer interface
body = io.BytesIO(body_raw.encode('utf8'))
return HTTPResponse(
body=body,
preload_content=False,
**cached["response"]
)
def __init__(self, fp, callback):
self.__buf = BytesIO()
self.__fp = fp
self.__callback = callback
def captured_output(stream_name):
"""Return a context manager used by captured_stdout/stdin/stderr
that temporarily replaces the sys stream *stream_name* with a StringIO.
Taken from Lib/support/__init__.py in the CPython repo.
"""
orig_stdout = getattr(sys, stream_name)
setattr(sys, stream_name, StreamWrapper.from_stream(orig_stdout))
try:
yield getattr(sys, stream_name)
finally:
setattr(sys, stream_name, orig_stdout)
def test_verifying_zipfile():
if not hasattr(zipfile.ZipExtFile, '_update_crc'):
pytest.skip('No ZIP verification. Missing ZipExtFile._update_crc.')
sio = StringIO()
zf = zipfile.ZipFile(sio, 'w')
zf.writestr("one", b"first file")
zf.writestr("two", b"second file")
zf.writestr("three", b"third file")
zf.close()
# In default mode, VerifyingZipFile checks the hash of any read file
# mentioned with set_expected_hash(). Files not mentioned with
# set_expected_hash() are not checked.
vzf = wheel.install.VerifyingZipFile(sio, 'r')
vzf.set_expected_hash("one", hashlib.sha256(b"first file").digest())
vzf.set_expected_hash("three", "blurble")
vzf.open("one").read()
vzf.open("two").read()
try:
vzf.open("three").read()
except wheel.install.BadWheelFile:
pass
else:
raise Exception("expected exception 'BadWheelFile()'")
# In strict mode, VerifyingZipFile requires every read file to be
# mentioned with set_expected_hash().
vzf.strict = True
try:
vzf.open("two").read()
except wheel.install.BadWheelFile:
pass
else:
raise Exception("expected exception 'BadWheelFile()'")
vzf.set_expected_hash("two", None)
vzf.open("two").read()
def get_resource_stream(self, manager, resource_name):
return io.BytesIO(self.get_resource_string(manager, resource_name))
def quopri_encode(input, errors='strict'):
assert errors == 'strict'
f = BytesIO(input)
g = BytesIO()
quopri.encode(f, g, quotetabs=True)
return (g.getvalue(), len(input))
def quopri_decode(input, errors='strict'):
assert errors == 'strict'
f = BytesIO(input)
g = BytesIO()
quopri.decode(f, g)
return (g.getvalue(), len(input))
def uu_encode(input, errors='strict', filename='<data>', mode=0o666):
assert errors == 'strict'
infile = BytesIO(input)
outfile = BytesIO()
read = infile.read
write = outfile.write
# Encode
write(('begin %o %s\n' % (mode & 0o777, filename)).encode('ascii'))
chunk = read(45)
while chunk:
write(binascii.b2a_uu(chunk))
chunk = read(45)
write(b' \nend\n')
return (outfile.getvalue(), len(input))
def uu_decode(input, errors='strict'):
assert errors == 'strict'
infile = BytesIO(input)
outfile = BytesIO()
readline = infile.readline
write = outfile.write
# Find start of encoded data
while 1:
s = readline()
if not s:
raise ValueError('Missing "begin" line in input data')
if s[:5] == b'begin':
break
# Decode
while True:
s = readline()
if not s or s == b'end\n':
break
try:
data = binascii.a2b_uu(s)
except binascii.Error as v:
# Workaround for broken uuencoders by /Fredrik Lundh
nbytes = (((s[0]-32) & 63) * 4 + 5) // 3
data = binascii.a2b_uu(s[:nbytes])
#sys.stderr.write("Warning: %s\n" % str(v))
write(data)
if not s:
raise ValueError('Truncated input data')
return (outfile.getvalue(), len(input))
def test_load_config_empty_file(self):
"""
Test loading of the config attr if the config file is empty
"""
usrmgr = self.__get_dummy_object()
with patch("os.path.exists", return_value=True),\
patch("ownbot.usermanager.open") as open_mock:
open_mock.return_value = io.BytesIO(b"")
self.assertEqual(usrmgr.config, {})
self.assertTrue(open_mock.called)
def _create_map_image(self):
images = self._get_images()
self._map_image = Image.new('RGBA', self._map_size)
for img in images:
if not isinstance(img, Image.Image):
img = Image.open(BytesIO(img.content)).convert('RGBA')
self._map_image = Image.alpha_composite(self._map_image, img)
def _create_pdf_libreoffice(self):
output_image = BytesIO()
self._map_image.save(output_image, 'PNG')
render = Renderer(media_path='.')
# TODO: use the configuration to select the template
# TODO: use the configuration to select the name of the key in the template
result = render.render('template.odt', my_map=output_image)
with NamedTemporaryFile(
mode='wb+',
prefix='geo-pyprint_',
delete=True
) as generated_odt:
generated_odt.write(result)
generated_odt.flush()
output_name = generated_odt.name + '.pdf'
cmd = [
'unoconv',
'-f',
'pdf',
'-o',
output_name,
generated_odt.name
]
subprocess.call(cmd, timeout=None)
return output_name
def load_verify_locations(self, cafile=None, capath=None, cadata=None):
if cafile is not None:
cafile = cafile.encode('utf-8')
if capath is not None:
capath = capath.encode('utf-8')
self._ctx.load_verify_locations(cafile, capath)
if cadata is not None:
self._ctx.load_verify_locations(BytesIO(cadata))