def _decompressContent(response, new_content):
content = new_content
try:
encoding = response.get('content-encoding', None)
if encoding in ['gzip', 'deflate']:
if encoding == 'gzip':
content = gzip.GzipFile(fileobj=io.BytesIO(new_content)).read()
if encoding == 'deflate':
content = zlib.decompress(content, -zlib.MAX_WBITS)
response['content-length'] = str(len(content))
# Record the historical presence of the encoding in a way the won't interfere.
response['-content-encoding'] = response['content-encoding']
del response['content-encoding']
except IOError:
content = ""
raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
return content
python类GzipFile()的实例源码
def extract_images(filename):
"""Extract the images into a 4D uint8 numpy array [index, y, x, depth]."""
print('Extracting', filename)
with tf.gfile.Open(filename, 'rb') as f, gzip.GzipFile(fileobj=f) as bytestream:
magic = _read32(bytestream)
if magic != 2051:
raise ValueError(
'Invalid magic number %d in MNIST image file: %s' %
(magic, filename))
num_images = _read32(bytestream)
rows = _read32(bytestream)
cols = _read32(bytestream)
buf = bytestream.read(rows * cols * num_images)
data = numpy.frombuffer(buf, dtype=numpy.uint8)
data = data.reshape(num_images, rows, cols, 1)
return data
def get_title2id(self, dump_date):
print('get_title2id...')
title2id = {}
regex = re.compile(r"\((\d+),0,'(.+?)','")
fname = '/home/ddimitrov/data/enwiki20150304_plus_clickstream/enwiki-' + dump_date + '-page.sql.gz'
fname = '/home/ddimitrov/data/enwiki20150304_plus_clickstream/enwiki-' + dump_date + '-page.sql'
#with gzip.GzipFile(fname, 'rb') as infile:
with open(fname) as f:
content = f.readlines()
for line in content:
line = line.decode('utf-8')
if not line.startswith('INSERT'):
continue
for pid, title in regex.findall(line):
title2id[DataHandler.unescape_mysql(title)] = int(pid)
return title2id
def get_rpid2pid(self, dump_date):
print('get_rpid2pid...')
title2id = self.get_title2id(dump_date)
rpid2pid = {}
regex = re.compile(r"\((\d+),0,'(.+?)','")
fname = '/home/ddimitrov/data/enwiki20150304_plus_clickstream/enwiki-' + dump_date + '-redirect.sql.gz'
with gzip.GzipFile(fname, 'rb') as infile:
for line in infile:
line = line.decode('utf-8')
if not line.startswith('INSERT'):
continue
line = line.replace('NULL', "''")
for pid, title in regex.findall(line):
try:
rpid2pid[pid] = title2id[DataHandler.unescape_mysql(title)]
except KeyError:
print(pid, title)
# pdb.set_trace()
return rpid2pid
def read_lett_iter(f, decode=True):
fh = f
fh.seek(0)
if f.name.endswith('.gz'):
fh = gzip.GzipFile(fileobj=fh, mode='r')
for line in fh:
lang, mime, enc, url, html, text = line[:-1].split("\t")
html = base64.b64decode(html)
text = base64.b64decode(text)
if decode:
html = html.decode("utf-8")
text = text.decode("utf-8")
p = Page(url, html, text, mime, enc, lang)
yield p
def test_gzip_loadtxt():
# Thanks to another windows brokeness, we can't use
# NamedTemporaryFile: a file created from this function cannot be
# reopened by another open call. So we first put the gzipped string
# of the test reference array, write it to a securely opened file,
# which is then read from by the loadtxt function
s = BytesIO()
g = gzip.GzipFile(fileobj=s, mode='w')
g.write(b'1 2 3\n')
g.close()
s.seek(0)
with temppath(suffix='.gz') as name:
with open(name, 'wb') as f:
f.write(s.read())
res = np.loadtxt(name)
s.close()
assert_array_equal(res, [1, 2, 3])
def _decompressContent(response, new_content):
content = new_content
try:
encoding = response.get('content-encoding', None)
if encoding in ['gzip', 'deflate']:
if encoding == 'gzip':
content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read()
if encoding == 'deflate':
content = zlib.decompress(content)
response['content-length'] = str(len(content))
# Record the historical presence of the encoding in a way the won't interfere.
response['-content-encoding'] = response['content-encoding']
del response['content-encoding']
except IOError:
content = ""
raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
return content
def _read_datafile(self, path, expected_dims):
"""Helper function to read a file in IDX format."""
base_magic_num = 2048
with gzip.GzipFile(path) as f:
magic_num = struct.unpack('>I', f.read(4))[0]
expected_magic_num = base_magic_num + expected_dims
if magic_num != expected_magic_num:
raise ValueError('Incorrect MNIST magic number (expected '
'{}, got {})'
.format(expected_magic_num, magic_num))
dims = struct.unpack('>' + 'I' * expected_dims,
f.read(4 * expected_dims))
buf = f.read(reduce(operator.mul, dims))
data = np.frombuffer(buf, dtype=np.uint8)
data = data.reshape(*dims)
return data
def to_file(self, dir_path='.', fname='amsetrun', force_write=True):
if not force_write:
n = 1
fname0 = fname
while os.path.exists(os.path.join(dir_path, '{}.json.gz'.format(fname))):
warnings.warn('The file, {} exists. AMSET outputs will be '
'written in {}'.format(fname, fname0+'_'+str(n)))
fname = fname0 + '_' + str(n)
n += 1
# make the output dict
out_d = {'kgrid': self.kgrid, 'egrid': self.egrid}
# write the output dict to file
with gzip.GzipFile(
os.path.join(dir_path, '{}.json.gz'.format(fname)), 'w') as fp:
jsonstr = json.dumps(out_d, cls=MontyEncoder)
fp.write(jsonstr)
def _decompressContent(response, new_content):
content = new_content
try:
encoding = response.get('content-encoding', None)
if encoding in ['gzip', 'deflate']:
if encoding == 'gzip':
content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read()
if encoding == 'deflate':
content = zlib.decompress(content)
response['content-length'] = str(len(content))
# Record the historical presence of the encoding in a way the won't interfere.
response['-content-encoding'] = response['content-encoding']
del response['content-encoding']
except IOError:
content = ""
raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
return content
def _get_data(url):
"""Helper function to get data over http or from a local file"""
if url.startswith('http://'):
# Try Python 2, use Python 3 on exception
try:
resp = urllib.urlopen(url)
encoding = resp.headers.dict.get('content-encoding', 'plain')
except AttributeError:
resp = urllib.request.urlopen(url)
encoding = resp.headers.get('content-encoding', 'plain')
data = resp.read()
if encoding == 'plain':
pass
elif encoding == 'gzip':
data = StringIO(data)
data = gzip.GzipFile(fileobj=data).read()
else:
raise RuntimeError('unknown encoding')
else:
with open(url, 'r') as fid:
data = fid.read()
return data
def write_handle(self, handle):
"""
Write the database to the specified file handle.
"""
if self.compress and _gzip_ok:
try:
g = gzip.GzipFile(mode="wb", fileobj=handle)
except:
g = handle
else:
g = handle
self.g = codecs.getwriter("utf8")(g)
self.write_xml_data()
g.close()
return 1
def py_gunzip(gz_file, outdir = "."):
# extract a .gz file
# read in the contents
print gz_file
print outdir
print "Reading file:\t" + gz_file
input_file = gzip.GzipFile(gz_file, 'rb')
file_contents = input_file.read()
input_file.close()
print "Finished reading file"
# get the output path from the outdir and filename
output_file_base = os.path.basename(os.path.splitext(gz_file)[0])
print output_file_base
output_file_path = os.path.join(outdir, output_file_base)
print output_file_path
# write the contents
print "Writing contents to file:\t" + output_file_path
print type(output_file_path)
# output_file = file(output_file_path, 'wb')
output_file = open(output_file_path, 'wb')
output_file.write(file_contents)
output_file.close()
def saveIDL(filename, annotations):
[name, ext] = os.path.splitext(filename)
if(ext == ".idl"):
file = open(filename,'w')
if(ext == ".gz"):
file = gzip.GzipFile(filename, 'w')
if(ext == ".bz2"):
file = bz2.BZ2File(filename, 'w')
i=0
for annotation in annotations:
annotation.writeIDL(file)
if (i+1<len(annotations)):
file.write(";\n")
else:
file.write(".\n")
i+=1
file.close()
def encode_private_key(self):
"""
Based on spotnab, this is the gzipped version of the key
with base64 applied to it. We encode it as such and
return it.
"""
fileobj = StringIO()
with GzipFile(fileobj=fileobj, mode="wb") as f:
try:
f.write(self.private_pem())
except TypeError:
# It wasn't initialized yet
return None
return b64encode(fileobj.getvalue())
def decode_private_key(self, encoded):
"""
Based on spotnab, this is the gzipped version of the key
with base64 applied to it. We decode it and load it.
"""
fileobj = StringIO()
try:
fileobj.write(b64decode(encoded))
except TypeError:
return False
fileobj.seek(0L, SEEK_SET)
private_key = None
with GzipFile(fileobj=fileobj, mode="rb") as f:
private_key = f.read()
if not private_key:
return False
# We were successful
if not self.load(private_key=private_key):
return False
return True
def decode_public_key(self, encoded):
"""
Based on spotnab, this is the gzipped version of the key
with base64 applied to it. We decode it and load it.
"""
fileobj = StringIO()
try:
fileobj.write(b64decode(encoded))
except TypeError:
return False
fileobj.seek(0L, SEEK_SET)
self.public_key = None
with GzipFile(fileobj=fileobj, mode="rb") as f:
try:
self.public_key = serialization.load_pem_public_key(
f.read(),
backend=default_backend()
)
except ValueError:
# Could not decrypt content
return False
if not self.public_key:
return False
return True
def _init_read_gz(self):
"""Initialize for reading a gzip compressed fileobj.
"""
self.cmp = self.zlib.decompressobj(-self.zlib.MAX_WBITS)
self.dbuf = b""
# taken from gzip.GzipFile with some alterations
if self.__read(2) != b"\037\213":
raise ReadError("not a gzip file")
if self.__read(1) != b"\010":
raise CompressionError("unsupported compression method")
flag = ord(self.__read(1))
self.__read(6)
if flag & 4:
xlen = ord(self.__read(1)) + 256 * ord(self.__read(1))
self.read(xlen)
if flag & 8:
while True:
s = self.__read(1)
if not s or s == NUL:
break
if flag & 16:
while True:
s = self.__read(1)
if not s or s == NUL:
break
if flag & 2:
self.__read(2)
def seekable(self):
if not hasattr(self.fileobj, "seekable"):
# XXX gzip.GzipFile and bz2.BZ2File
return True
return self.fileobj.seekable()
def gzopen(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs):
"""Open gzip compressed tar archive name for reading or writing.
Appending is not allowed.
"""
if len(mode) > 1 or mode not in "rw":
raise ValueError("mode must be 'r' or 'w'")
try:
import gzip
gzip.GzipFile
except (ImportError, AttributeError):
raise CompressionError("gzip module is not available")
extfileobj = fileobj is not None
try:
fileobj = gzip.GzipFile(name, mode + "b", compresslevel, fileobj)
t = cls.taropen(name, mode, fileobj, **kwargs)
except IOError:
if not extfileobj and fileobj is not None:
fileobj.close()
if fileobj is None:
raise
raise ReadError("not a gzip file")
except:
if not extfileobj and fileobj is not None:
fileobj.close()
raise
t._extfileobj = extfileobj
return t