def fetch_data():
try:
r = requests.get(MTG_JSON_URL)
except requests.ConnectionError:
r = requests.get(FALLBACK_MTG_JSON_URL)
with closing(r), zipfile.ZipFile(io.BytesIO(r.content)) as archive:
unzipped_files = archive.infolist()
if len(unzipped_files) != 1:
raise RuntimeError("Found an unexpected number of files in the MTGJSON archive.")
data = archive.read(archive.infolist()[0])
decoded_data = data.decode('utf-8')
sets_data = json.loads(decoded_data)
return sets_data
python类ZipFile()的实例源码
def generate_info():
tickets_archive_path = ROOT_DIR_PATH.joinpath('tickets.zip')
ensure_data_file(tickets_archive_path, DATA_FILE_INFO['TICKETS_URL'])
with zipfile.ZipFile(str(tickets_archive_path)) as zf:
for name in zf.namelist():
stem, ext = os.path.splitext(name)
if ext != '.csv':
continue
with zf.open(name) as f:
# Zipfile only opens file in binary mode, but csv only accepts
# text files, so we need to wrap this.
# See <https://stackoverflow.com/questions/5627954>.
textfile = io.TextIOWrapper(f, encoding='utf8', newline='')
for row in csv.DictReader(textfile):
yield Registration(row)
def zipdir(archivename, basedir):
'''Zip directory, from J.F. Sebastian http://stackoverflow.com/'''
assert os.path.isdir(basedir)
with closing(ZipFile(archivename, "w", ZIP_DEFLATED)) as z:
for root, dirs, files in os.walk(basedir):
#NOTE: ignore empty directories
for fn in files:
if fn[-4:]!='.zip':
absfn = os.path.join(root, fn)
zfn = absfn[len(basedir)+len(os.sep):] #XXX: relative path
z.write(absfn, zfn)
# ================ Inventory input data and create data structure =================
def pickle_load(path, compression=False):
"""Unpickle a possible compressed pickle.
Parameters
----------
path: str
path to the output file
compression: bool
if true assumes that pickle was compressed when created and attempts decompression.
Returns
-------
obj: object
the unpickled object
"""
if compression:
with zipfile.ZipFile(path, "r", compression=zipfile.ZIP_DEFLATED) as myzip:
with myzip.open("data") as f:
return pickle.load(f)
else:
with open(path, "rb") as f:
return pickle.load(f)
def prepare_zip():
from pkg_resources import resource_filename as resource
from config import config
from json import dumps
logger.info('creating/updating gimel.zip')
with ZipFile('gimel.zip', 'w', ZIP_DEFLATED) as zipf:
info = ZipInfo('config.json')
info.external_attr = 0o664 << 16
zipf.writestr(info, dumps(config))
zipf.write(resource('gimel', 'config.py'), 'config.py')
zipf.write(resource('gimel', 'gimel.py'), 'gimel.py')
zipf.write(resource('gimel', 'logger.py'), 'logger.py')
for root, dirs, files in os.walk(resource('gimel', 'vendor')):
for file in files:
real_file = os.path.join(root, file)
relative_file = os.path.relpath(real_file,
resource('gimel', ''))
zipf.write(real_file, relative_file)
def download_current_dataset(self, dest_path='.', unzip=True):
now = datetime.now().strftime('%Y%m%d')
file_name = 'numerai_dataset_{0}.zip'.format(now)
dest_file_path ='{0}/{1}'.format(dest_path, file_name)
r = requests.get(self._dataset_url)
if r.status_code!=200:
return r.status_code
with open(dest_file_path, "wb") as fp:
for byte in r.content:
fp.write(byte)
if unzip:
with zipfile.ZipFile(dest_file_path, "r") as z:
z.extractall(dest_path)
return r.status_code
def __init__(self, file):
self.file = file
if file == '':
self.infile = sys.stdin
elif file.lower().startswith('http://') or file.lower().startswith('https://'):
try:
if sys.hexversion >= 0x020601F0:
self.infile = urllib23.urlopen(file, timeout=5)
else:
self.infile = urllib23.urlopen(file)
except urllib23.HTTPError:
print('Error accessing URL %s' % file)
print(sys.exc_info()[1])
sys.exit()
elif file.lower().endswith('.zip'):
try:
self.zipfile = zipfile.ZipFile(file, 'r')
self.infile = self.zipfile.open(self.zipfile.infolist()[0], 'r', C2BIP3('infected'))
except:
print('Error opening file %s' % file)
print(sys.exc_info()[1])
sys.exit()
else:
try:
self.infile = open(file, 'rb')
except:
print('Error opening file %s' % file)
print(sys.exc_info()[1])
sys.exit()
self.ungetted = []
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
return json.loads(open(filename, 'rb').read().decode('utf-8'))
pymeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/metadata.json'):
pymeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pymeta, pymeta_schema)
valid += 1
assert valid > 0, "No metadata.json found"
def test_zipfile_attributes():
# With the change from ZipFile.write() to .writestr(), we need to manually
# set member attributes.
with temporary_directory() as tempdir:
files = (('foo', 0o644), ('bar', 0o755))
for filename, mode in files:
path = os.path.join(tempdir, filename)
with codecs.open(path, 'w', encoding='utf-8') as fp:
fp.write(filename + '\n')
os.chmod(path, mode)
zip_base_name = os.path.join(tempdir, 'dummy')
zip_filename = wheel.archive.make_wheelfile_inner(
zip_base_name, tempdir)
with readable_zipfile(zip_filename) as zf:
for filename, mode in files:
info = zf.getinfo(os.path.join(tempdir, filename))
assert info.external_attr == (mode | 0o100000) << 16
assert info.compress_type == zipfile.ZIP_DEFLATED
def handle_file(self, fname):
with open(fname, 'rb') as fd:
if fd.read(4) == b'dex\n':
new_jar = self.name + '/classes-dex2jar.jar'
run([dex2jar, fname, '-f', '-o', new_jar], cwd=self.name, stderr=DEVNULL)
fname = new_jar
with ZipFile(fname) as jar:
jar.extractall(self.name)
for cls in jar.namelist():
if cls.endswith('.class'):
cls = cls.replace('/', '.')[:-6]
self.classes.append(cls)
elif cls.endswith('.dex'):
self.handle_file(self.name + '/' + cls)
elif cls.endswith('.proto'):
self.bonus_protos[cls] = jar.read(cls).decode('utf8')
elif cls.endswith('.so'):
self.bonus_protos.update(walk_binary(self.name + '/' + cls))
def load_names_data():
fp = os.path.join(tempfile.gettempdir(), ZIP_NAME)
if not os.path.exists(fp):
r = requests.get(URL_NAMES)
with open(fp, 'wb') as f:
f.write(r.content)
post = collections.OrderedDict()
with zipfile.ZipFile(fp) as zf:
# get ZipInfo instances
for zi in sorted(zf.infolist(), key=lambda zi: zi.filename):
fn = zi.filename
if fn.startswith('yob'):
year = int(fn[3:7])
df = pd.read_csv(
zf.open(zi),
header=None,
names=('name', 'gender', 'count'))
df['year'] = year
post[year] = df
df = pd.concat(post.values())
df.set_index('name', inplace=True, drop=True)
return df
def load_names_data():
fp = os.path.join(tempfile.gettempdir(), ZIP_NAME)
if not os.path.exists(fp):
r = requests.get(URL_NAMES)
with open(fp, 'wb') as f:
f.write(r.content)
post = collections.OrderedDict()
with zipfile.ZipFile(fp) as zf:
# get ZipInfo instances
for zi in sorted(zf.infolist(), key=lambda zi: zi.filename):
fn = zi.filename
if fn.startswith('yob'):
year = int(fn[3:7])
df = pd.read_csv(
zf.open(zi),
header=None,
names=('name', 'gender', 'count'))
df['year'] = year
post[year] = df
df = pd.concat(post.values())
df.set_index('name', inplace=True, drop=True)
return df
def file(self):
"""
Returns a file pointer to this binary
:example:
>>> process_obj = c.select(Process).where("process_name:svch0st.exe").first()
>>> binary_obj = process_obj.binary
>>> print(binary_obj.file.read(2))
MZ
"""
# TODO: I don't like reaching through to the session...
with closing(self._cb.session.get("/api/v1/binary/{0:s}".format(self.md5sum), stream=True)) as r:
z = StringIO(r.content)
zf = ZipFile(z)
fp = zf.open('filedata')
return fp
def _create_lambda(arn, func_name, func_desc, lambda_handler, lambda_main,
runtime):
func = dict()
lamb = boto3.client('lambda')
with open(temp_deploy_zip) as deploy:
func['ZipFile'] = deploy.read()
try:
resp = lamb.create_function(
FunctionName=func_name, Runtime=runtime, Publish=True,
Description=func_desc,
Role=arn, Code=func, Handler='{0}.{1}'.format(
lambda_main, lambda_handler
))
logging.info("Create Lambda Function resp:{0}".format(
json.dumps(resp, indent=4, sort_keys=True))
)
return resp
except ClientError as ce:
if ce.response['Error']['Code'] == 'ValidationException':
logging.warning("Validation Error {0} creating function '{1}'.".format(
ce, func_name))
else:
logging.error("Unexpected Error: {0}".format(ce))
def _update_lambda_function(zip_file, func_name):
lamb = boto3.client('lambda')
try:
resp = lamb.update_function_code(
FunctionName=func_name,
ZipFile=zip_file.read(),
Publish=True
)
return resp['Version']
except ClientError as ce:
if ce.response['Error']['Code'] == 'ValidationException':
logging.warning(
"Validation Error {0} updating function '{1}'.".format(
ce, func_name))
else:
logging.error("Unexpected Error: {0}".format(ce))
def download_driver_file(whichbin, url, base_path):
if url.endswith('.tar.gz'):
ext = '.tar.gz'
else:
ext = '.zip'
print("Downloading from: {}".format(url))
download_file(url, '/tmp/pwr_temp{}'.format(ext))
if ext == '.tar.gz':
import tarfile
tar = tarfile.open('/tmp/pwr_temp{}'.format(ext), "r:gz")
tar.extractall('{}/'.format(base_path))
tar.close()
else:
import zipfile
with zipfile.ZipFile('/tmp/pwr_temp{}'.format(ext), "r") as z:
z.extractall('{}/'.format(base_path))
# if whichbin == 'wires' and '/v{}/'.format(latest_gecko_driver) in url:
# os.rename('{}/geckodriver'.format(base_path),
# '{}/wires'.format(base_path))
# os.chmod('{}/wires'.format(base_path), 0o775)
if whichbin == 'wires':
os.chmod('{}/geckodriver'.format(base_path), 0o775)
else:
os.chmod('{}/chromedriver'.format(base_path), 0o775)
def reseed(self, netdb):
"""Compress netdb entries and set content"""
zip_file = io.BytesIO()
dat_files = []
for root, dirs, files in os.walk(netdb):
for f in files:
if f.endswith(".dat"):
# TODO check modified time
# may be not older than 10h
dat_files.append(os.path.join(root, f))
if len(dat_files) == 0:
raise PyseederException("Can't get enough netDb entries")
elif len(dat_files) > 75:
dat_files = random.sample(dat_files, 75)
with ZipFile(zip_file, "w", compression=ZIP_DEFLATED) as zf:
for f in dat_files:
zf.write(f, arcname=os.path.split(f)[1])
self.FILE_TYPE = 0x00
self.CONTENT_TYPE = 0x03
self.CONTENT = zip_file.getvalue()
self.CONTENT_LENGTH = len(self.CONTENT)
def _load(self, fn, notify=False):
"Load the video from a ZIP file"
with ZipFile(fn) as zf:
self._loadMeta(zf)
self._costumes = []
i = 0
while i >= 0:
try:
if notify: notify(fn, i, self)
data = zf.read(str(i))
if data: data = data[:-12], data[-12:]
else: data = self._costumes[i-1]
i += 1
self._costumes.append(data)
except:
if notify: notify(fn, None, self)
i = -1
def compare_component_output(self, input_path, expected_output_path):
rendering_engine = self.get_rendering_engine()
temp_dir = tempfile.gettempdir()
output_dir = os.path.join(temp_dir, str(uuid.uuid4()))
process_sketch_archive(zip_path=input_path, compress_zip=False,
output_path=output_dir, engine=rendering_engine)
self.assertTrue(dircmp.is_same(expected_output_path, output_dir))
shutil.rmtree(output_dir)
storage.clear()
output_zip = os.path.join(temp_dir, "{}.zip".format(str(uuid.uuid4())))
process_sketch_archive(zip_path=input_path, compress_zip=True,
output_path=output_zip, engine=rendering_engine)
z = zipfile.ZipFile(output_zip)
z.extractall(output_dir)
self.assertTrue(dircmp.is_same(expected_output_path, output_dir))
shutil.rmtree(output_dir)
os.remove(output_zip)
def write_file_to_zip_at_path(zip_path=None, archive=None, file_path='',
content='', encode=False):
if zip_path:
ensure_directories_for_path(zip_path)
z = zipfile.ZipFile(zip_path, mode="a")
elif archive:
z = archive
else:
raise Exception('One of zip_path, archive must be provided.')
print("Adding {} to archive ...".format(file_path))
if encode:
z.writestr(file_path, content.encode('utf-8'))
else:
z.writestr(file_path, content)
storage.file_paths.add(file_path)
if zip_path:
z.close()
def download_celeb_a(base_path):
data_path = os.path.join(base_path, 'celebA')
images_path = os.path.join(data_path, 'images')
if os.path.exists(data_path):
print('[!] Found celeb-A - skip')
return
filename, drive_id = "img_align_celeba.zip", "0B7EVK8r0v71pZjFTYXZWM3FlRnM"
save_path = os.path.join(base_path, filename)
if os.path.exists(save_path):
print('[*] {} already exists'.format(save_path))
else:
download_file_from_google_drive(drive_id, save_path)
zip_dir = ''
with zipfile.ZipFile(save_path) as zf:
zip_dir = zf.namelist()[0]
zf.extractall(base_path)
if not os.path.exists(data_path):
os.mkdir(data_path)
os.rename(os.path.join(base_path, "img_align_celeba"), images_path)
os.remove(save_path)
download_attr_file(data_path)
def listdir(path):
"""Replacement for os.listdir that works in frozen environments."""
if not hasattr(sys, 'frozen'):
return os.listdir(path)
(zipPath, archivePath) = splitZip(path)
if archivePath is None:
return os.listdir(path)
with zipfile.ZipFile(zipPath, "r") as zipobj:
contents = zipobj.namelist()
results = set()
for name in contents:
# components in zip archive paths are always separated by forward slash
if name.startswith(archivePath) and len(name) > len(archivePath):
name = name[len(archivePath):].split('/')[0]
results.add(name)
return list(results)
def listdir(path):
"""Replacement for os.listdir that works in frozen environments."""
if not hasattr(sys, 'frozen'):
return os.listdir(path)
(zipPath, archivePath) = splitZip(path)
if archivePath is None:
return os.listdir(path)
with zipfile.ZipFile(zipPath, "r") as zipobj:
contents = zipobj.namelist()
results = set()
for name in contents:
# components in zip archive paths are always separated by forward slash
if name.startswith(archivePath) and len(name) > len(archivePath):
name = name[len(archivePath):].split('/')[0]
results.add(name)
return list(results)
def find_package(self, package):
for path in self.paths():
full = os.path.join(path, package)
if os.path.exists(full):
return package, full
if not os.path.isdir(path) and zipfile.is_zipfile(path):
zip = zipfile.ZipFile(path, 'r')
try:
zip.read(os.path.join(package, '__init__.py'))
except KeyError:
pass
else:
zip.close()
return package, full
zip.close()
## FIXME: need special error for package.py case:
raise InstallationError(
'No package with the name %s found' % package)
def create_zipfile(self, filename):
zip_file = zipfile.ZipFile(filename, "w")
try:
self.mkpath(self.target_dir) # just in case
for root, dirs, files in os.walk(self.target_dir):
if root == self.target_dir and not files:
raise DistutilsOptionError(
"no files found in upload directory '%s'"
% self.target_dir)
for name in files:
full = os.path.join(root, name)
relative = root[len(self.target_dir):].lstrip(os.path.sep)
dest = os.path.join(relative, name)
zip_file.write(full, dest)
finally:
zip_file.close()
def setUp(self):
if self.datafile is None or self.dataname is None:
return
if not os.path.isfile(self.datafile):
self.old_cwd = None
return
self.old_cwd = os.getcwd()
self.temp_dir = tempfile.mkdtemp()
zip_file, source, target = [None, None, None]
try:
zip_file = zipfile.ZipFile(self.datafile)
for files in zip_file.namelist():
_extract(zip_file, files, self.temp_dir)
finally:
if zip_file:
zip_file.close()
del zip_file
os.chdir(os.path.join(self.temp_dir, self.dataname))
def test_create_zipfile(self):
# Test to make sure zipfile creation handles common cases.
# This explicitly includes a folder containing an empty folder.
dist = Distribution()
cmd = upload_docs(dist)
cmd.upload_dir = self.upload_dir
cmd.target_dir = self.upload_dir
tmp_dir = tempfile.mkdtemp()
tmp_file = os.path.join(tmp_dir, 'foo.zip')
try:
zip_file = cmd.create_zipfile(tmp_file)
assert zipfile.is_zipfile(tmp_file)
zip_file = zipfile.ZipFile(tmp_file) # woh...
assert zip_file.namelist() == ['index.html']
zip_file.close()
finally:
shutil.rmtree(tmp_dir)
def tutor_fpout():
pklout = os.path.join(RESDIR, TUTORPKL)
if os.path.exists(pklout):
with open(pklout, 'rb') as f:
fpout = pickle.load(f)
else:
print('re-creating fp results ... this could take a few minutes')
zip_archive = os.path.join(DATADIR, ZIPFILE)
with zipfile.ZipFile(zip_archive, 'r') as zfile:
zfile.extractall(DATADIR)
fpout = tutor_example()
make_clean_dat()
os.makedirs(RESDIR, exist_ok=True)
with open(pklout, 'wb') as f:
pickle.dump(fpout, f)
return fpout
def hfdata_plot():
make_quickdata()
unit_convert = {'q': 1e-3, 'c': 1e-6, 'P': 1e3}
converters = {
k: _converter_func(float(v), 0.) for k, v in unit_convert.items()}
converters['T'] = _converter_func(1., 273.15)
with zipfile.ZipFile(os.path.join(DATADIR, ZIPFILE), 'r') as zarch:
with zarch.open(QUICKFILE, 'r') as datafile:
data = HFData(
fname=datafile,
cols=(2, 3, 4, 6, 5, 7, 8),
converters=converters,
delimiter=",",
skip_header=4)
fig = plot_hfdata(data)
return fig
def extract_zip_file(zipFilePath, extractDir):
if not os.path.exists(extractDir):
os.mkdir(extractDir)
print '''Extracting
%s
to
%s...''' % (zipFilePath, extractDir),
zfile = zipfile.ZipFile(zipFilePath)
uncompress_size = sum((file.file_size for file in zfile.infolist()))
extracted_size = 0
print '\n'
for _file in zfile.infolist():
extracted_size += _file.file_size
sys.stdout.write(" %s%%\t\t%s\n" % (extracted_size * 100/uncompress_size, _file.filename))
zfile.extract(_file, path=extractDir)
# ORIG zip.extractall(path=extractDir)
print 'Ok'