def decompress(data, *args, **kwargs):
return lzma.decompress(data, *args, **kwargs)
python类decompress()的实例源码
def decompress(data, *args, **kwargs):
return lzo.decompress(data, *args, **kwargs)
def decompress(data, *args, **kwargs):
from cStringIO import StringIO
data = list(data)
dict_size = 256
dictionary = dict((i, chr(i)) for i in xrange(dict_size))
result = StringIO()
w = chr(data.pop(0))
result.write(w)
for k in data:
if k in dictionary:
entry = dictionary[k]
elif k == dict_size:
entry = w + w[0]
else:
raise ValueError('Bad compressed k: %s' % k)
result.write(entry)
# Add w+entry[0] to the dictionary
dictionary[dict_size] = str(w + entry[0])
dict_size += 1
w = entry
return result.getvalue()
def decompress(data, *args, **kwargs):
return zlib.decompress(data, *args, **kwargs)
def decompress(data, *args, **kwargs):
return zstd.decompress(data, *args, **kwargs)
#########################
### IMAGE COMPRESSION ###
#########################
def decompress(data, *args, **kwargs):
return _png.decompress(data)
#########################
### VIDEO COMPRESSION ###
#########################
def decompress(data, *args, **kwargs):
return x264.decompress(data)
def get_local(self):
return bz2.decompress(base64.decodestring(self.local))
def get_local(self):
return bz2.decompress(base64.decodestring(self.local))
def __decompress_string(cls, string, compressions=None):
# type: (Any, bytes, Union[None, Iterable[int]]) -> Tuple[bytes, bool]
"""Returns a tuple containing the decompressed :py:class:`bytes` and a
:py:class:`bool` as to whether decompression failed or not
Args:
string: The possibly-compressed message you wish to parse
compressions: A list of the standard compression methods this
message may be under (default: ``[]``)
Returns:
A decompressed version of the message
Raises:
ValueError: Unrecognized compression method fed in compressions
Warning:
Do not feed it with the size header, it will throw errors
"""
compression_fail = False
# second is module scope compression
for method in intersect(compressions, compression):
try:
string = decompress(string, method)
compression_fail = False
break
except:
compression_fail = True
continue
return (string, compression_fail)
def get_local(self):
return bz2.decompress(base64.decodestring(self.local))
def test_incremental_compress():
basic_test_c(bz2.Compressor(), decompress)
def getFile(cls, getfile, unpack=True):
if cls.getProxy():
proxy = req.ProxyHandler({'http': cls.getProxy(), 'https': cls.getProxy()})
auth = req.HTTPBasicAuthHandler()
opener = req.build_opener(proxy, auth, req.HTTPHandler)
req.install_opener(opener)
try:
response = req.urlopen(getfile)
except:
msg = "[!] Could not fetch file %s"%getfile
if cls.exitWhenNoSource(): sys.exit(msg)
else: print(msg)
data = None
data = response.read()
# TODO: if data == text/plain; charset=utf-8, read and decode
if unpack:
if 'gzip' in response.info().get('Content-Type'):
data = gzip.GzipFile(fileobj = BytesIO(data))
elif 'bzip2' in response.info().get('Content-Type'):
data = BytesIO(bz2.decompress(data))
elif 'zip' in response.info().get('Content-Type'):
fzip = zipfile.ZipFile(BytesIO(data), 'r')
if len(fzip.namelist())>0:
data=BytesIO(fzip.read(fzip.namelist()[0]))
# In case the webserver is being generic
elif 'application/octet-stream' in response.info().get('Content-Type'):
if data[:4] == b'PK\x03\x04': # Zip
fzip = zipfile.ZipFile(BytesIO(data), 'r')
if len(fzip.namelist())>0:
data=BytesIO(fzip.read(fzip.namelist()[0]))
return (data, response)
def download_bzip2(path, url):
if os.path.exists(path): return
print("Downloading {0} from {1}...".format(path, url))
response = urlopen(url)
with open(path, 'wb') as f:
f.write(bz2.decompress(response.read()))
def get_local(self):
return bz2.decompress(base64.decodestring(self.local))
def decompress(
data,
):
decompressed_object = bz2.decompress(data)
return decompressed_object
def _contents(self):
import bz2
return bz2.decompress(self._contents)
def gather():
url_regex = '(?:http.*://)?(?P<host>[^:/ ]+).?(?P<port>[0-9]*).*'
base_url = "http://data.phishtank.com/data/online-valid.csv.bz2"
attack_type = "undefined"
res = get_url(base_url)
results = bz2.decompress(res.content)
for line in results.split("\n")[1:]:
if line == "":
continue
line = line.split(",")
site_url = line[1]
m = re.search(url_regex, site_url)
host = m.group('host')
ip_address = get_ip(host)
if ip_address == "undefined":
who_is, country = "undefined", "undefined"
else:
who_is, country = get_who_is_and_country(ip_address)
doc = {
'IP': ip_address,
'SourceInfo': base_url,
'Type': attack_type,
'Country': country,
'Domain': host,
'URL': host,
'WhoIsInfo': who_is,
}
pprint(doc)
def get_local(self):
return bz2.decompress(base64.decodestring(self.local))
def get_local(self):
return bz2.decompress(base64.decodestring(self.local))