def fetch_data():
try:
r = requests.get(MTG_JSON_URL)
except requests.ConnectionError:
r = requests.get(FALLBACK_MTG_JSON_URL)
with closing(r), zipfile.ZipFile(io.BytesIO(r.content)) as archive:
unzipped_files = archive.infolist()
if len(unzipped_files) != 1:
raise RuntimeError("Found an unexpected number of files in the MTGJSON archive.")
data = archive.read(archive.infolist()[0])
decoded_data = data.decode('utf-8')
sets_data = json.loads(decoded_data)
return sets_data
python类BytesIO()的实例源码
def zip_dir(directory):
"""zip a directory tree into a BytesIO object"""
result = io.BytesIO()
dlen = len(directory)
with ZipFile(result, "w") as zf:
for root, dirs, files in os.walk(directory):
for name in files:
full = os.path.join(root, name)
rel = root[dlen:]
dest = os.path.join(rel, name)
zf.write(full, dest)
return result
#
# Simple progress bar
#
def encode_jpeg(arr):
assert arr.dtype == np.uint8
# simulate multi-channel array for single channel arrays
if len(arr.shape) == 3:
arr = np.expand_dims(arr, 3) # add channels to end of x,y,z
arr = arr.transpose((3,2,1,0)) # channels, z, y, x
reshaped = arr.reshape(arr.shape[3] * arr.shape[2], arr.shape[1] * arr.shape[0])
if arr.shape[0] == 1:
img = Image.fromarray(reshaped, mode='L')
elif arr.shape[0] == 3:
img = Image.fromarray(reshaped, mode='RGB')
else:
raise ValueError("Number of image channels should be 1 or 3. Got: {}".format(arr.shape[3]))
f = io.BytesIO()
img.save(f, "JPEG")
return f.getvalue()
def fromqimage(im):
buffer = QBuffer()
buffer.open(QIODevice.ReadWrite)
# preserve alha channel with png
# otherwise ppm is more friendly with Image.open
if im.hasAlphaChannel():
im.save(buffer, 'png')
else:
im.save(buffer, 'ppm')
b = BytesIO()
try:
b.write(buffer.data())
except TypeError:
# workaround for Python 2
b.write(str(buffer.data()))
buffer.close()
b.seek(0)
return Image.open(b)
def tell(self):
"""
Allows reference to our object from within a Codec()
"""
if not self.filepath:
# If there is no filepath, then we're probably dealing with a
# stream in memory like a StringIO or BytesIO stream.
if self.stream:
# Advance to the end of the file
return self.stream.tell()
else:
if self.stream and self._dirty is True:
self.stream.flush()
self._dirty = False
if not self.stream:
if not self.open(mode=NNTPFileMode.BINARY_RO):
return None
return self.stream.tell()
def openStream(self, source):
"""Produces a file object from source.
source can be either a file object, local filename or a string.
"""
# Already a file object
if hasattr(source, 'read'):
stream = source
else:
stream = BytesIO(source)
try:
stream.seek(stream.tell())
except: # pylint:disable=bare-except
stream = BufferedStream(stream)
return stream
def rollover(self):
if self._rolled: return
file = self._file
newfile = self._file = TemporaryFile(**self._TemporaryFileArgs)
del self._TemporaryFileArgs
newfile.write(file.getvalue())
newfile.seek(file.tell(), 0)
self._rolled = True
# The method caching trick from NamedTemporaryFile
# won't work here, because _file may change from a
# BytesIO/StringIO instance to a real file. So we list
# all the methods directly.
# Context management protocol
def test_load_config(self):
"""
Test loading of the config attribute
"""
usrmgr = self.__get_dummy_object()
with patch("os.path.exists", return_value=True),\
patch("ownbot.usermanager.open") as open_mock:
open_mock.return_value = io.BytesIO(b"""
foogroup:
unverified:
- '@foouser'""")
expected_config = {"foogroup": {"unverified": ["@foouser"]}}
self.assertEqual(usrmgr.config, expected_config)
self.assertTrue(open_mock.called)
def zip_dir(directory):
"""zip a directory tree into a BytesIO object"""
result = io.BytesIO()
dlen = len(directory)
with ZipFile(result, "w") as zf:
for root, dirs, files in os.walk(directory):
for name in files:
full = os.path.join(root, name)
rel = root[dlen:]
dest = os.path.join(rel, name)
zf.write(full, dest)
return result
#
# Simple progress bar
#
def close(self, *args, **kwargs):
"""
Engine closed, copy file to DB
"""
super(DatabaseWrapper, self).close(*args, **kwargs)
signature_version = self.settings_dict.get("SIGNATURE_VERSION", "s3v4")
s3 = boto3.resource('s3',
config=botocore.client.Config(signature_version=signature_version))
try:
with open(self.settings_dict['NAME'], 'rb') as f:
fb = f.read()
bytesIO = BytesIO()
bytesIO.write(fb)
bytesIO.seek(0)
s3_object = s3.Object(self.settings_dict['BUCKET'], self.settings_dict['REMOTE_NAME'])
result = s3_object.put('rb', Body=bytesIO)
except Exception as e:
print(e)
logging.debug("Saved to remote DB!")
def getWebPage(url, headers, cookies, postData=None):
try:
if (postData):
params = urllib.parse.urlencode(postData)
params = params.encode('utf-8')
request = urllib.request.Request(url, data=params, headers=headers)
else:
print('Fetching '+url)
request = urllib.request.Request(url, None, headers)
request.add_header('Cookie', cookies)
if (postData):
response = urllib.request.build_opener(urllib.request.HTTPCookieProcessor).open(request)
else:
response = urllib.request.urlopen(request)
if response.info().get('Content-Encoding') == 'gzip':
buf = BytesIO(response.read())
f = gzip.GzipFile(fileobj=buf)
r = f.read()
else:
r = response.read()
return r
except Exception as e:
print("Error processing webpage: "+str(e))
return None
## https://stackoverflow.com/questions/480214/how-do-you-remove-duplicates-from-a-list-in-python-whilst-preserving-order
def display_graph(g, format='svg', include_asset_exists=False):
"""
Display a TermGraph interactively from within IPython.
"""
try:
import IPython.display as display
except ImportError:
raise NoIPython("IPython is not installed. Can't display graph.")
if format == 'svg':
display_cls = display.SVG
elif format in ("jpeg", "png"):
display_cls = partial(display.Image, format=format, embed=True)
out = BytesIO()
_render(g, out, format, include_asset_exists=include_asset_exists)
return display_cls(data=out.getvalue())
def _body(self):
try:
read_func = self.environ['wsgi.input'].read
except KeyError:
self.environ['wsgi.input'] = BytesIO()
return self.environ['wsgi.input']
body_iter = self._iter_chunked if self.chunked else self._iter_body
body, body_size, is_temp_file = BytesIO(), 0, False
for part in body_iter(read_func, self.MEMFILE_MAX):
body.write(part)
body_size += len(part)
if not is_temp_file and body_size > self.MEMFILE_MAX:
body, tmp = TemporaryFile(mode='w+b'), body
body.write(tmp.getvalue())
del tmp
is_temp_file = True
self.environ['wsgi.input'] = body
body.seek(0)
return body
def open_device(vendor_id, product_id, interface_number):
"""Opens and returns the HID device (file-like object). Raise IOError if
the device or interface is not available.
Arguments:
vendor_id -- the mouse vendor id (e.g. 0x1038)
product_id -- the mouse product id (e.g. 0x1710)
interface_number -- the interface number (e.g. 0x00)
"""
# Dry run
if debug.DEBUG and debug.DRY and is_device_plugged(vendor_id, product_id):
device = BytesIO() # Moke the device
device.send_feature_report = device.write
return device
# Real device
for interface in hid.enumerate(vendor_id, product_id):
if interface["interface_number"] != interface_number:
continue
device = hid.device()
device.open_path(interface["path"])
return device
raise IOError("Unable to find the requested device: %04X:%04X:%02X" % (
vendor_id, product_id, interface_number))
def print_chapters(self,show=False):
'''
??????
Display infos of chapters.
'''
headers={'use-agent':"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36",'Referer':'http://manhua.dmzj.com/tags/s.shtml'}
text='There are {n} chapters in comic {c}:\n{chs}'.format(n=self.chapter_num,c=self.comic_title,chs='\n'.join([info[0] for info in self.chapter_urls]))
print(text)
if show:
try:
res=requests.get(self.cover,headers=headers)
if b'403' in res.content:
raise ValueError('Got cover img failed')
out=BytesIO(res.content)
out.seek(0)
Image.open(out).show()
except (ConnectionError,ValueError):
traceback.print_exc()
return text
def file(self):
"""
Returns a file pointer to this binary
:example:
>>> process_obj = c.select(Process).where("process_name:svch0st.exe").first()
>>> binary_obj = process_obj.binary
>>> print(binary_obj.file.read(2))
MZ
"""
# TODO: I don't like reaching through to the session...
with closing(self._cb.session.get("/api/v1/binary/{0:s}".format(self.md5sum), stream=True)) as r:
z = StringIO(r.content)
zf = ZipFile(z)
fp = zf.open('filedata')
return fp
def sendto(self, bytes, *args, **kwargs):
if self.type != socket.SOCK_DGRAM:
return super(socksocket, self).sendto(bytes, *args, **kwargs)
if not self._proxyconn:
self.bind(("", 0))
address = args[-1]
flags = args[:-1]
header = BytesIO()
RSV = b"\x00\x00"
header.write(RSV)
STANDALONE = b"\x00"
header.write(STANDALONE)
self._write_SOCKS5_address(address, header)
sent = super(socksocket, self).send(header.getvalue() + bytes, *flags, **kwargs)
return sent - header.tell()
def recvfrom(self, bufsize, flags=0):
if self.type != socket.SOCK_DGRAM:
return super(socksocket, self).recvfrom(bufsize, flags)
if not self._proxyconn:
self.bind(("", 0))
buf = BytesIO(super(socksocket, self).recv(bufsize + 1024, flags))
buf.seek(2, SEEK_CUR)
frag = buf.read(1)
if ord(frag):
raise NotImplementedError("Received UDP packet fragment")
fromhost, fromport = self._read_SOCKS5_address(buf)
if self.proxy_peername:
peerhost, peerport = self.proxy_peername
if fromhost != peerhost or peerport not in (0, fromport):
raise socket.error(EAGAIN, "Packet filtered")
return (buf.read(bufsize), (fromhost, fromport))
def test_write_svg():
# Test with default options
qr = segno.make_qr('test')
out = io.BytesIO()
qr.save(out, kind='svg')
xml_str = out.getvalue()
assert xml_str.startswith(b'<?xml')
root = _parse_xml(out)
assert 'viewBox' not in root.attrib
assert 'height' in root.attrib
assert 'width' in root.attrib
css_class = root.attrib.get('class')
assert css_class
assert 'segno' == css_class
path_el = _get_path(root)
assert path_el is not None
path_class = path_el.get('class')
assert 'qrline' == path_class
stroke = path_el.get('stroke')
assert stroke == '#000'
title_el = _get_title(root)
assert title_el is None
desc_el = _get_desc(root)
assert desc_el is None
def test_write_svg_black():
# Test with default options
qr = segno.make_qr('test')
out = io.BytesIO()
qr.save(out, kind='svg', color='bLacK')
xml_str = out.getvalue()
assert xml_str.startswith(b'<?xml')
root = _parse_xml(out)
assert 'viewBox' not in root.attrib
assert 'height' in root.attrib
assert 'width' in root.attrib
css_class = root.attrib.get('class')
assert css_class
assert 'segno' == css_class
path_el = _get_path(root)
assert path_el is not None
path_class = path_el.get('class')
assert 'qrline' == path_class
stroke = path_el.get('stroke')
assert stroke == '#000'
title_el = _get_title(root)
assert title_el is None
desc_el = _get_desc(root)
assert desc_el is None
def test_write_svg_black3():
# Test with default options
qr = segno.make_qr('test')
out = io.BytesIO()
qr.save(out, kind='svg', color=(0, 0, 0))
xml_str = out.getvalue()
assert xml_str.startswith(b'<?xml')
root = _parse_xml(out)
assert 'viewBox' not in root.attrib
assert 'height' in root.attrib
assert 'width' in root.attrib
css_class = root.attrib.get('class')
assert css_class
assert 'segno' == css_class
path_el = _get_path(root)
assert path_el is not None
path_class = path_el.get('class')
assert 'qrline' == path_class
stroke = path_el.get('stroke')
assert stroke == '#000'
title_el = _get_title(root)
assert title_el is None
desc_el = _get_desc(root)
assert desc_el is None
def pam_bw_as_matrix(buff, border):
"""\
Returns the QR code as list of [0, 1] lists.
:param io.BytesIO buff: Buffer to read the matrix from.
"""
res = []
data, size = _image_data(buff)
for i, offset in enumerate(range(0, len(data), size)):
if i < border:
continue
if i >= size - border:
break
row_data = bytearray(data[offset + border:offset + size - border])
# Invert bytes since PAM uses 0x0 = black, 0x1 = white
res.append([b ^ 0x1 for b in row_data])
return res
def pdf_as_matrix(buff, border):
"""\
Reads the path in the PDF and returns it as list of 0, 1 lists.
:param io.BytesIO buff: Buffer to read the matrix from.
"""
pdf = buff.getvalue()
h, w = re.search(br'/MediaBox \[0 0 ([0-9]+) ([0-9]+)\]', pdf,
flags=re.MULTILINE).groups()
if h != w:
raise ValueError('Expected equal height/width, got height="{}" width="{}"'.format(h, w))
size = int(w) - 2 * border
graphic = _find_graphic(buff)
res = [[0] * size for i in range(size)]
for x1, y1, x2, y2 in re.findall(r'\s*(\-?\d+)\s+(\-?\d+)\s+m\s+'
r'(\-?\d+)\s+(\-?\d+)\s+l', graphic):
x1, y1, x2, y2 = [int(i) for i in (x1, y1, x2, y2)]
y = abs(y1)
res[y][x1:x2] = [1] * (x2 - x1)
return res
def test_flow():
from io import BytesIO
# info + flush
inbound = b'\x01\x02\x1a\x00\x01\x02\x12\x00'
data = BytesIO(inbound)
req_type,_ = read_message(data, types.Request)
assert 'info' == req_type.WhichOneof("value")
req_type2, _ = read_message(data, types.Request)
assert 'flush' == req_type2.WhichOneof("value")
assert data.read() == b''
data.close()
data2 = BytesIO(b'')
req_type, fail = read_message(data2, types.Request)
assert fail == 0
assert req_type == None
data3 = BytesIO(b'\x01')
req_type, fail = read_message(data3, types.Request)
assert fail == 0
def bytes2zip(bytes):
"""
RETURN COMPRESSED BYTES
"""
if hasattr(bytes, "read"):
buff = TemporaryFile()
archive = gzip.GzipFile(fileobj=buff, mode='w')
for b in bytes:
archive.write(b)
archive.close()
buff.seek(0)
from pyLibrary.env.big_data import FileString, safe_size
return FileString(buff)
buff = BytesIO()
archive = gzip.GzipFile(fileobj=buff, mode='w')
archive.write(bytes)
archive.close()
return buff.getvalue()
def openstream(self, filename):
"""
Open a stream as a read-only file object (BytesIO).
Note: filename is case-insensitive.
:param filename: path of stream in storage tree (except root entry), either:
- a string using Unix path syntax, for example:
'storage_1/storage_1.2/stream'
- or a list of storage filenames, path to the desired stream/storage.
Example: ['storage_1', 'storage_1.2', 'stream']
:returns: file object (read-only)
:exception IOError: if filename not found, or if this is not a stream.
"""
sid = self._find(filename)
entry = self.direntries[sid]
if entry.entry_type != STGTY_STREAM:
raise IOError("this file is not a stream")
return self._open(entry.isectStart, entry.size)
def __init__(self, image=None, **kw):
# Tk compatibility: file or data
if image is None:
if "file" in kw:
image = Image.open(kw["file"])
del kw["file"]
elif "data" in kw:
from io import BytesIO
image = Image.open(BytesIO(kw["data"]))
del kw["data"]
self.__mode = image.mode
self.__size = image.size
if _pilbitmap_check():
# fast way (requires the pilbitmap booster patch)
image.load()
kw["data"] = "PIL:%d" % image.im.id
self.__im = image # must keep a reference
else:
# slow but safe way
kw["data"] = image.tobitmap()
self.__photo = tkinter.BitmapImage(**kw)
def get_data_hash(self, data_bytes):
"""Calculate Merkle's root hash of the given data bytes"""
# Calculate tree parameters
data_len = len(data_bytes)
tree_populated_width = math.ceil(data_len / self._chunk_len)
tree_height = math.ceil(math.log2(tree_populated_width))
tree_width = int(math.pow(2, tree_height))
tree_bottom_layer = ['\x00'] * tree_width
with io.BytesIO(data_bytes) as b_data:
self._initial_hasher(
b_data,
tree_populated_width,
tree_bottom_layer
)
# Get Merkle's root hash
mrh = self._calculate_root_hash(tree_bottom_layer)
return mrh
def test_simple(self):
pipe = test_helper.get_mock_pipeline([])
mock_mod = test_helper.MockSubscriber()
mock_mod = test_helper.MockSubscriber()
pipe.register_magic(b'\xFF\xEE\xDD', ('mock', mock_mod.consume))
pipe.register_magic(b'\x00\x00\x00', ('mock', mock_mod.consume))
_magic = magic.Subscriber(pipe)
_magic.setup(None)
doc = document.get_document('mock')
content = b'\xFF\xEE\xDDMOCKMOCKMOCK'
_magic.consume(doc, BytesIO(content))
self.assertEquals(True, doc.magic_hit)
self.assertEquals(1, len(mock_mod.produced))
expected = content
actual = mock_mod.produced[0][1].read()
self.assertEquals(expected, actual)
def test_simple(self):
_file_meta = file_meta.Subscriber(test_helper.get_mock_pipeline([]))
response = json.dumps({'Content-Type': 'image/jpeg'}).encode('utf-8')
_file_meta.setup({
'data_root': 'local_data',
'code_root': '.',
'worker_id': 1,
'host': 'mock',
helper.INJECTOR: test_helper.MockInjector(response)
})
doc = document.get_document('mock.txt')
_file_meta.consume(doc, BytesIO(b'mock'))
expected = 'picture'
actual = doc.doctype
self.assertEqual(expected, actual)