def DecompressBrotli(string):
"""
DecompressBrotli(string) -> str
Returns the original form of the given string compressed \
using Google's brotli compression method., passed without delimiters.
"""
number = 0
for character in string:
ordinal = OrdinalLookup.get(character, ord(character))
number = number * 255 + ordinal - (ordinal > gap)
compressed = []
while number > 1:
compressed = [number % 256] + compressed
number //= 256
return brotli.decompress(bytes(compressed)).decode("ascii")
python类decompress()的实例源码
def _consume_actions(buffer):
compressed_byte_count = _consume_int(buffer)
compressed_data = buffer[:compressed_byte_count]
del buffer[:compressed_byte_count]
decompressed_data = lzma.decompress(compressed_data)
out = []
offset = 0
for raw_action in decompressed_data.split(b','):
if not raw_action:
continue
raw_offset, x, y, raw_action_mask = raw_action.split(b'|')
action_mask = ActionBitMask.unpack(int(raw_action_mask))
offset += int(raw_offset)
out.append(Action(
datetime.timedelta(milliseconds=offset),
Position(float(x), float(y)),
action_mask['m1'],
action_mask['m2'],
action_mask['k1'],
action_mask['k2'],
))
return out
def import_to_store(self, compressed_nar):
"""Given a compressed NAR, extract it and import it into the nix store.
:param compressed_nar: The bytes of a NAR, compressed.
:type compressed_nar: ``str``
"""
# Figure out how to extract the content.
if self.compression.lower() in ("xz", "xzip"):
data = lzma.decompress(compressed_nar)
elif self.compression.lower() in ("bz2", "bzip2"):
data = bz2.decompress(compressed_nar)
else:
data = gzip.decompress(compressed_nar)
# Once extracted, convert it into a nix export object and import.
export = self.nar_to_export(data)
imported_path = export.import_to_store()
def DecompressLZMA(string):
"""
DecompressBrotli(string) -> str
Returns the original form of the given string compressed \
using Google's brotli compression method., passed without delimiters.
"""
number = 0
for character in string:
ordinal = OrdinalLookup.get(character, ord(character))
number = number * 255 + ordinal - (ordinal > gap)
compressed = []
while number > 1:
compressed = [number % 256] + compressed
number //= 256
return lzma.decompress(bytes(compressed)).decode("ascii")
def decompress(data):
return lzma.decompress(data)
def _lzma(self):
'''LZMA processor'''
try:
archive = lzma.decompress(self.cur_attachment.file_obj.read())
new_fn, ext = os.path.splitext(self.cur_attachment.orig_filename)
cur_file = File(archive, new_fn)
self.process_payload(cur_file)
except:
self.cur_attachment.make_dangerous()
return self.cur_attachment
def _bzip(self):
'''BZip2 processor'''
try:
archive = bz2.decompress(self.cur_attachment.file_obj.read())
new_fn, ext = os.path.splitext(self.cur_attachment.orig_filename)
cur_file = File(archive, new_fn)
self.process_payload(cur_file)
except:
self.cur_attachment.make_dangerous()
return self.cur_attachment
def bz2_pack(source):
"""
Returns 'source' as a bzip2-compressed, self-extracting python script.
.. note::
This method uses up more space than the zip_pack method but it has the
advantage in that the resulting .py file can still be imported into a
python program.
"""
import bz2, base64
out = ""
# Preserve shebangs (don't care about encodings for this)
first_line = source.split('\n')[0]
if analyze.shebang.match(first_line):
if py3:
if first_line.rstrip().endswith('python'): # Make it python3
first_line = first_line.rstrip()
first_line += '3' #!/usr/bin/env python3
out = first_line + '\n'
compressed_source = bz2.compress(source.encode('utf-8'))
out += 'import bz2, base64\n'
out += "exec(bz2.decompress(base64.b64decode('"
out += base64.b64encode(compressed_source).decode('utf-8')
out += "')))\n"
return out
def gz_pack(source):
"""
Returns 'source' as a gzip-compressed, self-extracting python script.
.. note::
This method uses up more space than the zip_pack method but it has the
advantage in that the resulting .py file can still be imported into a
python program.
"""
import zlib, base64
out = ""
# Preserve shebangs (don't care about encodings for this)
first_line = source.split('\n')[0]
if analyze.shebang.match(first_line):
if py3:
if first_line.rstrip().endswith('python'): # Make it python3
first_line = first_line.rstrip()
first_line += '3' #!/usr/bin/env python3
out = first_line + '\n'
compressed_source = zlib.compress(source.encode('utf-8'))
out += 'import zlib, base64\n'
out += "exec(zlib.decompress(base64.b64decode('"
out += base64.b64encode(compressed_source).decode('utf-8')
out += "')))\n"
return out
def lzma_pack(source):
"""
Returns 'source' as a lzma-compressed, self-extracting python script.
.. note::
This method uses up more space than the zip_pack method but it has the
advantage in that the resulting .py file can still be imported into a
python program.
"""
import lzma, base64
out = ""
# Preserve shebangs (don't care about encodings for this)
first_line = source.split('\n')[0]
if analyze.shebang.match(first_line):
if py3:
if first_line.rstrip().endswith('python'): # Make it python3
first_line = first_line.rstrip()
first_line += '3' #!/usr/bin/env python3
out = first_line + '\n'
compressed_source = lzma.compress(source.encode('utf-8'))
out += 'import lzma, base64\n'
out += "exec(lzma.decompress(base64.b64decode('"
out += base64.b64encode(compressed_source).decode('utf-8')
out += "')))\n"
return out
def get(self, key: str, format: str = 'html') -> Optional[bytes]:
loop = asyncio.get_event_loop()
cache_get = self._cache.get
data = await loop.run_in_executor(None, cache_get, key + format)
if data is not None:
res = await loop.run_in_executor(None, lzma.decompress, data)
return res
def __decompress_string(cls, string, compressions=None):
# type: (Any, bytes, Union[None, Iterable[int]]) -> Tuple[bytes, bool]
"""Returns a tuple containing the decompressed :py:class:`bytes` and a
:py:class:`bool` as to whether decompression failed or not
Args:
string: The possibly-compressed message you wish to parse
compressions: A list of the standard compression methods this
message may be under (default: ``[]``)
Returns:
A decompressed version of the message
Raises:
ValueError: Unrecognized compression method fed in compressions
Warning:
Do not feed it with the size header, it will throw errors
"""
compression_fail = False
# second is module scope compression
for method in intersect(compressions, compression):
try:
string = decompress(string, method)
compression_fail = False
break
except:
compression_fail = True
continue
return (string, compression_fail)
def parse_play_data(self, replay_data):
offset_end = self.offset+self.__replay_length
if self.game_mode != GameMode.Standard:
self.play_data = None
else:
datastring = lzma.decompress(replay_data[self.offset:offset_end], format=lzma.FORMAT_AUTO).decode('ascii')[:-1]
events = [eventstring.split('|') for eventstring in datastring.split(',')]
self.play_data = [ReplayEvent(int(event[0]), float(event[1]), float(event[2]), int(event[3])) for event in events]
self.offset = offset_end
def test_incremental_compress():
basic_test_c(lzma.Compressor(), decompress)
def decompress(
data,
):
decompressed_object = lzma.decompress(data)
return decompressed_object
def parseReplay(filename):
osr = open(filename, 'rb').read()
offset = 0
data = {}
data['mode'], offset = parseNum(osr, offset, 1)
data['version'], offset = parseNum(osr, offset, 4)
data['beatmap_md5'], offset = parseString(osr, offset)
data['player'], offset = parseString(osr, offset)
data['player_lower'] = data['player'].lower()
data['replay_md5'], offset = parseString(osr, offset)
data['num_300'], offset = parseNum(osr, offset, 2)
data['num_100'], offset = parseNum(osr, offset, 2)
data['num_50'], offset = parseNum(osr, offset, 2)
data['num_geki'], offset = parseNum(osr, offset, 2)
data['num_katu'], offset = parseNum(osr, offset, 2)
data['num_miss'], offset = parseNum(osr, offset, 2)
data['score'], offset = parseNum(osr, offset, 4)
data['max_combo'], offset = parseNum(osr, offset, 2)
data['perfect_combo'], offset = parseNum(osr, offset, 1)
mods, offset = parseNum(osr, offset, 4)
data['mods'] = parseMods(mods)
data['mods_bitwise'] = mods
graphString, offset = parseString(osr, offset)
data['life_graph'] = parseLifeGraph(graphString)
data['time_stamp'], offset = parseDate(osr, offset)
data_len, offset = parseNum(osr, offset, 4)
replay_str = str(lzma.decompress(osr[offset:offset+data_len]), 'utf-8')
data['replay_data'] = parseReplayString(replay_str)
return data
def _gz_decompress(data):
return zlib.decompress(data, 16+zlib.MAX_WBITS)
def decompress(data):
for item in decompressor_from_magic:
if data.startswith(item[0]):
return item[1](data)
raise FatalError("Unknown compression type!")
def gz_pack(source):
"""
Returns 'source' as a gzip-compressed, self-extracting python script.
.. note::
This method uses up more space than the zip_pack method but it has the
advantage in that the resulting .py file can still be imported into a
python program.
"""
import zlib
import base64
out = ""
# Preserve shebangs (don't care about encodings for this)
first_line = source.split('\n')[0]
if analyze.shebang.match(first_line):
if py3:
if first_line.rstrip().endswith('python'): # Make it python3
first_line = first_line.rstrip()
first_line += '3' # !/usr/bin/env python3
out = first_line + '\n'
compressed_source = zlib.compress(source.encode('utf-8'))
out += 'import zlib, base64\n'
out += "exec(zlib.decompress(base64.b64decode('"
out += base64.b64encode(compressed_source).decode('utf-8')
out += "')))\n"
return out
def lzma_pack(source):
"""
Returns 'source' as a lzma-compressed, self-extracting python script.
.. note::
This method uses up more space than the zip_pack method but it has the
advantage in that the resulting .py file can still be imported into a
python program.
"""
import lzma
import base64
out = ""
# Preserve shebangs (don't care about encodings for this)
first_line = source.split('\n')[0]
if analyze.shebang.match(first_line):
if py3:
if first_line.rstrip().endswith('python'): # Make it python3
first_line = first_line.rstrip()
first_line += '3' # !/usr/bin/env python3
out = first_line + '\n'
compressed_source = lzma.compress(source.encode('utf-8'))
out += 'import lzma, base64\n'
out += "exec(lzma.decompress(base64.b64decode('"
out += base64.b64encode(compressed_source).decode('utf-8')
out += "')))\n"
return out
def decompress(self, data):
try:
return lzma.decompress(data)
except lzma.LZMAError as e:
raise CompressorError(e)
def loadTask(filename):
try:
with open(filename, 'rb') as task:
try:
header = task.readline().decode()
if header.startswith('SOSTASK1.1'):
# ignore the tags
task.readline()
return pickle.load(task)
elif header.startswith('SOSTASK1.2'):
task.readline()
try:
return pickle.loads(lzma.decompress(task.read()))
except:
# at some point, the task files were compressed with zlib
import zlib
return pickle.loads(zlib.decompress(task.read()))
else:
raise ValueError('Try old format')
except:
# old format
task.seek(0)
param = pickle.load(task)
# old format does not have tags
param.tags = []
return param
except ImportError as e:
raise RuntimeError(
f'Failed to load task {os.path.basename(filename)}, which is likely caused by incompatible python modules between local and remote hosts: {e}')
def _gzip(self):
'''GZip processor'''
try:
archive = gzip.decompress(self.cur_attachment.file_obj.read())
new_fn, ext = os.path.splitext(self.cur_attachment.orig_filename)
cur_file = File(archive, new_fn)
self.process_payload(cur_file)
except:
self.cur_attachment.make_dangerous()
return self.cur_attachment
def decompress(msg, method):
# type: (bytes, int) -> bytes
"""Shortcut method for decompression
Args:
msg: The message you wish to decompress, the type required is
defined by the requested method
method: The decompression method you wish to use. Supported
(assuming installed):
- :py:data:`~py2p.flags.gzip`
- :py:data:`~py2p.flags.zlib`
- :py:data:`~py2p.flags.bz2`
- :py:data:`~py2p.flags.lzma`
- :py:data:`~py2p.flags.snappy`
Returns:
Defined by the decompression method, but typically the bytes of the
compressed message
Warning:
The types fed are dependent on which decompression method you use.
Best to assume most values are :py:class:`bytes` or
:py:class:`bytearray`
Raises:
ValueError: if there is an unknown compression method, or a
method-specific error
"""
if method in (flags.gzip, flags.zlib):
return zlib.decompress(msg, zlib.MAX_WBITS | 32)
elif method == flags.bz2:
return bz2.decompress(msg)
elif method == flags.lzma:
return lzma.decompress(msg)
elif method == flags.snappy:
return snappy.decompress(msg)
else: # pragma: no cover
raise ValueError('Unknown decompression method')
# This should be in order of preference, with None being implied as last
def feed_string(
cls, # type: Any
string, # type: Union[bytes, bytearray, str]
sizeless=False, # type: bool
compressions=None # type: Union[None, Iterable[int]]
): # type: (...) -> InternalMessage
"""Constructs a :py:class:`~py2p.messages.InternalMessage` from a string
or :py:class:`bytes` object.
Args:
string: The string you wish to parse
sizeless: A :py:class:`bool` which describes whether this
string has its size header (default: it does)
compressions: A iterable containing the standardized compression
methods this message might be under
(default: ``[]``)
Returns:
A :py:class:`~py2p.messages.InternalMessage` from the given string
Raises:
AttributeError: Fed a non-string, non-:py:class:`bytes` argument
AssertionError: Initial size header is incorrect
ValueError: Unrecognized compression method fed in compressions
IndexError: Packet headers are incorrect OR
unrecognized compression
"""
# First section checks size header
_string = cls.__sanitize_string(string, sizeless)
# Then we attempt to decompress
_string, compression_fail = cls.__decompress_string(
_string, compressions)
id_ = _string[0:32]
serialized = _string[32:]
checksum = sha256(serialized).digest()
assert id_ == checksum, "Checksum failed: {} != {}".format(
id_, checksum)
packets = unpackb(serialized)
msg = cls(
packets[0], packets[1], packets[3:], compression=compressions)
msg.time = packets[2]
msg.compression_fail = compression_fail
msg._InternalMessage__id = checksum
msg._InternalMessage__string = serialized
# msg.__string = _string
return msg
def bz2_pack(source):
"""
Returns 'source' as a bzip2-compressed, self-extracting python script.
.. note::
This method uses up more space than the zip_pack method but it has the
advantage in that the resulting .py file can still be imported into a
python program.
"""
import bz2
import base64
out = ""
# Preserve shebangs (don't care about encodings for this)
first_line = source.split('\n')[0]
if analyze.shebang.match(first_line):
if py3:
if first_line.rstrip().endswith('python'): # Make it python3
first_line = first_line.rstrip()
first_line += '3' # !/usr/bin/env python3
out = first_line + '\n'
compressed_source = bz2.compress(source.encode('utf-8'))
out += 'import bz2, base64\n'
out += "exec(bz2.decompress(base64.b64decode('"
out += base64.b64encode(compressed_source).decode('utf-8')
out += "')))\n"
return out