def get_client_version(my):
'''API Function: get_client_version()
@return:
string - Version of TACTIC that this client came from'''
# may use pkg_resources in 2.6
if '.zip' in __file__:
import zipfile
parts = __file__.split('.zip')
zip_name = '%s.zip'%parts[0]
if zipfile.is_zipfile(zip_name):
z = zipfile.ZipFile(zip_name)
version = z.read('pyasm/application/common/interpreter/tactic_client_lib/VERSION')
version = version.strip()
z.close()
else:
dir = os.path.dirname(__file__)
f = open('%s/VERSION' % dir, 'r')
version = f.readline().strip()
f.close()
return version
python类ZipFile()的实例源码
def get_client_api_version(my):
'''API Function: get_client_api_version()
@return:
string - client api version'''
# may use pkg_resources in 2.6
if '.zip' in __file__:
import zipfile
parts = __file__.split('.zip')
zip_name = '%s.zip'%parts[0]
if zipfile.is_zipfile(zip_name):
z = zipfile.ZipFile(zip_name)
version = z.read('pyasm/application/common/interpreter/tactic_client_lib/VERSION_API')
version = version.strip()
z.close()
else:
dir = os.path.dirname(__file__)
f = open('%s/VERSION_API' % dir, 'r')
version = f.readline().strip()
f.close()
return version
def test_export_result_csv_no_tasks_returns_empty_file(self):
"""Test WEB export Result to CSV returns empty file if no results in
project."""
project = ProjectFactory.create(short_name='no_tasks_here')
uri = "/project/%s/tasks/export?type=result&format=csv" % project.short_name
res = self.app.get(uri, follow_redirects=True)
zip = zipfile.ZipFile(StringIO(res.data))
extracted_filename = zip.namelist()[0]
csv_content = StringIO(zip.read(extracted_filename))
csvreader = unicode_csv_reader(csv_content)
is_empty = True
for line in csvreader:
is_empty = False, line
assert is_empty
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
with open(filename, 'rb') as json_file:
return json.loads(json_file.read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
def test_zipfile_attributes():
# With the change from ZipFile.write() to .writestr(), we need to manually
# set member attributes.
with temporary_directory() as tempdir:
files = (('foo', 0o644), ('bar', 0o755))
for filename, mode in files:
path = os.path.join(tempdir, filename)
with codecs.open(path, 'w', encoding='utf-8') as fp:
fp.write(filename + '\n')
os.chmod(path, mode)
zip_base_name = os.path.join(tempdir, 'dummy')
zip_filename = wheel.archive.make_wheelfile_inner(
zip_base_name, tempdir)
with readable_zipfile(zip_filename) as zf:
for filename, mode in files:
info = zf.getinfo(os.path.join(tempdir, filename))
assert info.external_attr == (mode | 0o100000) << 16
assert info.compress_type == zipfile.ZIP_DEFLATED
def init_zoneinfo():
"""
Add each zone info to the datastore. This will overwrite existing zones.
This must be called before the AppengineTimezoneLoader will work.
"""
import os, logging
from zipfile import ZipFile
zoneobjs = []
zoneinfo_path = os.path.abspath(os.path.join(os.path.dirname(__file__),
'zoneinfo.zip'))
with ZipFile(zoneinfo_path) as zf:
for zfi in zf.filelist:
key = ndb.Key('Zoneinfo', zfi.filename, namespace=NDB_NAMESPACE)
zobj = Zoneinfo(key=key, data=zf.read(zfi))
zoneobjs.append(zobj)
logging.info("Adding %d timezones to the pytz-appengine database" %
len(zoneobjs)
)
ndb.put_multi(zoneobjs)
def get_appliances():
"""
This will download an archive of all GNS3 appliances and load them to a dictionnary
"""
appliances = {}
url = 'https://github.com/GNS3/gns3-registry/archive/master.zip'
response = urllib.request.urlopen(url)
z = zipfile.ZipFile(io.BytesIO(response.read()))
for path in z.namelist():
if path.endswith('.gns3a'):
with z.open(path, 'r') as f:
id = os.path.basename(path).split('.')[0]
appliance = json.loads(f.read().decode())
if appliance["status"] != "broken":
appliances[id] = appliance
return appliances
def get_appliances():
"""
This will download an archive of all GNS3 appliances and load them to a dictionnary
"""
appliances = {}
url = 'https://github.com/GNS3/gns3-registry/archive/master.zip'
response = urllib.request.urlopen(url)
z = zipfile.ZipFile(io.BytesIO(response.read()))
for path in z.namelist():
if path.endswith('.gns3a'):
with z.open(path, 'r') as f:
id = os.path.basename(path).split('.')[0]
appliance = json.loads(f.read().decode())
if appliance["status"] != "broken":
appliances[id] = appliance
return appliances
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
return json.loads(open(filename, 'rb').read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
def create_zipfile(self, filename):
zip_file = zipfile.ZipFile(filename, "w")
try:
self.mkpath(self.target_dir) # just in case
for root, dirs, files in os.walk(self.target_dir):
if root == self.target_dir and not files:
raise DistutilsOptionError(
"no files found in upload directory '%s'"
% self.target_dir)
for name in files:
full = os.path.join(root, name)
relative = root[len(self.target_dir):].lstrip(os.path.sep)
dest = os.path.join(relative, name)
zip_file.write(full, dest)
finally:
zip_file.close()
def extract_zipfile(archive_name, destpath):
"Unpack a zip file"
archive = zipfile.ZipFile(archive_name)
archive.extractall(destpath)
def get_zip_class():
"""
Supplement ZipFile class to support context manager for Python 2.6
"""
class ContextualZipFile(zipfile.ZipFile):
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close
return zipfile.ZipFile if hasattr(zipfile.ZipFile, '__exit__') else \
ContextualZipFile
def download_glove(glove):
if os.path.exists(glove):
return
print('Downloading glove...')
with tempfile.TemporaryFile() as tmp:
with urllib.request.urlopen('http://nlp.stanford.edu/data/glove.42B.300d.zip') as res:
shutil.copyfileobj(res, tmp)
with zipfile.ZipFile(tmp, 'r') as glove_zip:
glove_zip.extract('glove.42B.300d.txt', path=os.path.dirname(glove))
print('Done')
def _unpack_zipfile(filename, extract_dir):
"""Unpack zip `filename` to `extract_dir`
"""
try:
import zipfile
except ImportError:
raise ReadError('zlib not supported, cannot unpack this archive.')
if not zipfile.is_zipfile(filename):
raise ReadError("%s is not a zip file" % filename)
zip = zipfile.ZipFile(filename)
try:
for info in zip.infolist():
name = info.filename
# don't extract absolute paths or ones with .. in them
if name.startswith('/') or '..' in name:
continue
target = os.path.join(extract_dir, *name.split('/'))
if not target:
continue
_ensure_directory(target)
if not name.endswith('/'):
# file
data = zip.read(info.filename)
f = open(target, 'wb')
try:
f.write(data)
finally:
f.close()
del data
finally:
zip.close()
def __new__(cls, *args, **kwargs):
"""
Construct a ZipFile or ContextualZipFile as appropriate
"""
if hasattr(zipfile.ZipFile, '__exit__'):
return zipfile.ZipFile(*args, **kwargs)
return super(ContextualZipFile, cls).__new__(cls)
def unzip_file(filename, location, flatten=True):
"""
Unzip the file (with path `filename`) to the destination `location`. All
files are written based on system defaults and umask (i.e. permissions are
not preserved), except that regular file members with any execute
permissions (user, group, or world) have "chmod +x" applied after being
written. Note that for windows, any execute changes using os.chmod are
no-ops per the python docs.
"""
ensure_dir(location)
zipfp = open(filename, 'rb')
try:
zip = zipfile.ZipFile(zipfp, allowZip64=True)
leading = has_leading_dir(zip.namelist()) and flatten
for info in zip.infolist():
name = info.filename
data = zip.read(name)
fn = name
if leading:
fn = split_leading_dir(name)[1]
fn = os.path.join(location, fn)
dir = os.path.dirname(fn)
if fn.endswith('/') or fn.endswith('\\'):
# A directory
ensure_dir(fn)
else:
ensure_dir(dir)
fp = open(fn, 'wb')
try:
fp.write(data)
finally:
fp.close()
mode = info.external_attr >> 16
# if mode and regular file and any execute permissions for
# user/group/world?
if mode and stat.S_ISREG(mode) and mode & 0o111:
# make dest file have execute for user/group/world
# (chmod +x) no-op on windows per python docs
os.chmod(fn, (0o777 - current_umask() | 0o111))
finally:
zipfp.close()
def readable_zipfile(path):
# zipfile.ZipFile() isn't a context manager under Python 2.
zf = zipfile.ZipFile(path, 'r')
try:
yield zf
finally:
zf.close()
def test_verifying_zipfile():
if not hasattr(zipfile.ZipExtFile, '_update_crc'):
pytest.skip('No ZIP verification. Missing ZipExtFile._update_crc.')
sio = StringIO()
zf = zipfile.ZipFile(sio, 'w')
zf.writestr("one", b"first file")
zf.writestr("two", b"second file")
zf.writestr("three", b"third file")
zf.close()
# In default mode, VerifyingZipFile checks the hash of any read file
# mentioned with set_expected_hash(). Files not mentioned with
# set_expected_hash() are not checked.
vzf = wheel.install.VerifyingZipFile(sio, 'r')
vzf.set_expected_hash("one", hashlib.sha256(b"first file").digest())
vzf.set_expected_hash("three", "blurble")
vzf.open("one").read()
vzf.open("two").read()
try:
vzf.open("three").read()
except wheel.install.BadWheelFile:
pass
else:
raise Exception("expected exception 'BadWheelFile()'")
# In strict mode, VerifyingZipFile requires every read file to be
# mentioned with set_expected_hash().
vzf.strict = True
try:
vzf.open("two").read()
except wheel.install.BadWheelFile:
pass
else:
raise Exception("expected exception 'BadWheelFile()'")
vzf.set_expected_hash("two", None)
vzf.open("two").read()
def open(self, name_or_info, mode="r", pwd=None):
"""Return file-like object for 'name'."""
# A non-monkey-patched version would contain most of zipfile.py
ef = zipfile.ZipFile.open(self, name_or_info, mode, pwd)
if isinstance(name_or_info, zipfile.ZipInfo):
name = name_or_info.filename
else:
name = name_or_info
if (name in self._expected_hashes
and self._expected_hashes[name] != None):
expected_hash = self._expected_hashes[name]
try:
_update_crc_orig = ef._update_crc
except AttributeError:
warnings.warn('Need ZipExtFile._update_crc to implement '
'file hash verification (in Python >= 2.7)')
return ef
running_hash = self._hash_algorithm()
if hasattr(ef, '_eof'): # py33
def _update_crc(data):
_update_crc_orig(data)
running_hash.update(data)
if ef._eof and running_hash.digest() != expected_hash:
raise BadWheelFile("Bad hash for file %r" % ef.name)
else:
def _update_crc(data, eof=None):
_update_crc_orig(data, eof=eof)
running_hash.update(data)
if eof and running_hash.digest() != expected_hash:
raise BadWheelFile("Bad hash for file %r" % ef.name)
ef._update_crc = _update_crc
elif self.strict and name not in self._expected_hashes:
raise BadWheelFile("No expected hash for file %r" % ef.name)
return ef