def _download(url, imageName, folderName):
session = _setupSession()
try:
# time out is another parameter tuned
# fit for the network about 10Mb
image = session.get(url, timeout = 5)
with open(os.path.join(folderName, imageName),'wb') as fout:
fout.write(image.content)
fileExtension = imghdr.what(os.path.join(folderName, imageName))
if fileExtension is None:
os.remove(os.path.join(folderName, imageName))
else:
newName = imageName + '.' + str(fileExtension)
os.rename(os.path.join(folderName, imageName), os.path.join(folderName, newName))
except Exception as e:
print ("failed to download one pages with url of " + str(url))
# wrapper for map function
python类rename()的实例源码
def Set(self,key,data):
path = self._GetPath(key)
directory = os.path.dirname(path)
if not os.path.exists(directory):
os.makedirs(directory)
if not os.path.isdir(directory):
raise _FileCacheError('%s exists but is not a directory' % directory)
temp_fd, temp_path = tempfile.mkstemp()
temp_fp = os.fdopen(temp_fd, 'w')
temp_fp.write(data)
temp_fp.close()
if not path.startswith(self._root_directory):
raise _FileCacheError('%s does not appear to live under %s' %
(path, self._root_directory))
if os.path.exists(path):
os.remove(path)
os.rename(temp_path, path)
def setUp(self):
test_configuration = \
'''DATA_DIRECTORY: ~/
TODO_FILE: ~/footodo.txt
TODO_START_TAG: ''
TODO_STOP_TAG: ''
'''
self.test_conf_file = os.path.expanduser(os.path.join('~', '.letsdo'))
self.user_conf_bak = os.path.expanduser(os.path.join('~', '.letsdo.bak'))
if os.path.exists(self.test_conf_file):
# Backup user configuration file
os.rename(self.test_conf_file, self.user_conf_bak)
# Create test configuration file
with open(self.test_conf_file, 'w') as fconf:
fconf.write(test_configuration)
self.conf = Configuration()
if os.path.exists(self.conf.data_fullpath):
os.remove(self.conf.data_fullpath)
if os.path.exists(self.conf.task_fullpath):
os.remove(self.conf.task_fullpath)
def prepare_inception_data(o_dir, i_dir):
if not os.path.exists(o_dir):
os.makedirs(o_dir)
cnt = 0
bar = progressbar.ProgressBar(redirect_stdout=True,
max_value=progressbar.UnknownLength)
for root, subFolders, files in os.walk(i_dir):
if files:
for f in files:
if 'jpg' in f:
f_name = str(cnt) + '_ins.' + f.split('.')[-1]
cnt += 1
file_dir = os.path.join(root, f)
dest_path = os.path.join(o_dir, f)
dest_new_name = os.path.join(o_dir, f_name)
copy(file_dir, o_dir)
os.rename(dest_path, dest_new_name)
bar.update(cnt)
bar.finish()
print('Total number of files: {}'.format(cnt))
def download(url, target, expected_hash):
if filesystem.calculate_hash(target) == expected_hash:
return
count = 0
while filesystem.calculate_hash(TEMP_FILE) != expected_hash:
count += 1
if count > 5:
os.remove(TEMP_FILE)
raise OSError("Aborting download of %s after 5 unsuccessful attempts" % url)
try:
http_client.get(url).raise_for_status().download_to(TEMP_FILE)
except OSError:
pass
# If it already exists the rename will fail
try:
os.remove(target)
except OSError:
pass
os.rename(TEMP_FILE, target)
def get_pkg_dir(self, pkgname, pkgver, subdir):
pkgdir = os.path.join(get_distnet_cache(), pkgname, pkgver)
if not os.path.isdir(pkgdir):
os.makedirs(pkgdir)
target = os.path.join(pkgdir, subdir)
if os.path.exists(target):
return target
(fd, tmp) = tempfile.mkstemp(dir=pkgdir)
try:
os.close(fd)
self.download_to_file(pkgname, pkgver, subdir, tmp)
if subdir == REQUIRES:
os.rename(tmp, target)
else:
self.extract_tar(subdir, pkgdir, tmp)
finally:
try:
os.remove(tmp)
except OSError:
pass
return target
def get_pkg_dir(self, pkgname, pkgver, subdir):
pkgdir = os.path.join(get_distnet_cache(), pkgname, pkgver)
if not os.path.isdir(pkgdir):
os.makedirs(pkgdir)
target = os.path.join(pkgdir, subdir)
if os.path.exists(target):
return target
(fd, tmp) = tempfile.mkstemp(dir=pkgdir)
try:
os.close(fd)
self.download_to_file(pkgname, pkgver, subdir, tmp)
if subdir == REQUIRES:
os.rename(tmp, target)
else:
self.extract_tar(subdir, pkgdir, tmp)
finally:
try:
os.remove(tmp)
except OSError:
pass
return target
def _rename_atomic(src, dst):
ta = _CreateTransaction(None, 0, 0, 0, 0, 1000, 'Werkzeug rename')
if ta == -1:
return False
try:
retry = 0
rv = False
while not rv and retry < 100:
rv = _MoveFileTransacted(src, dst, None, None,
_MOVEFILE_REPLACE_EXISTING |
_MOVEFILE_WRITE_THROUGH, ta)
if rv:
rv = _CommitTransaction(ta)
break
else:
time.sleep(0.001)
retry += 1
return rv
finally:
_CloseHandle(ta)
def rename(src, dst):
# Try atomic or pseudo-atomic rename
if _rename(src, dst):
return
# Fall back to "move away and replace"
try:
os.rename(src, dst)
except OSError as e:
if e.errno != errno.EEXIST:
raise
old = "%s-%08x" % (dst, random.randint(0, sys.maxint))
os.rename(dst, old)
os.rename(src, dst)
try:
os.unlink(old)
except Exception:
pass
def create_seed_and_test_random(factor, start_id):
# Only use 1/factor of the crop images
# for example there are 10000 crops and a factor of 100
#then only 100 of them would be the random seed and test images.
# A factor of 0 would be 100%
# This should be changed to percent!
crops = []
image_ids = []
for filename in glob.iglob(crop_dir + '*.png'):
crops.append(filename)
for filename in crops:
renamed = filename.replace("_", "")
image_id = int(renamed.replace('.png', '').replace('/home/pkrush/cents/', ''))
if image_id < start_id:
continue
renamed = crop_dir + str(image_id) + '.png'
os.rename(filename, renamed)
rand_int = random.randint(0, factor)
if rand_int == 0:
image_ids.append(image_id)
pickle.dump(image_ids, open(data_dir + 'seed_image_ids.pickle', "wb"))
pickle.dump(image_ids, open(data_dir + 'test_image_ids.pickle', "wb"))
def create_seed_and_test_random(factor, start_id):
# Only use 1/factor of the crop images
# for example there are 10000 crops and a factor of 100
#then only 100 of them would be the random seed and test images.
# A factor of 0 would be 100%
# This should be changed to percent!
crops = []
image_ids = []
for filename in glob.iglob(crop_dir + '*.png'):
crops.append(filename)
for filename in crops:
renamed = filename.replace("_", "")
image_id = int(renamed.replace('.png', '').replace('/home/pkrush/cents/', ''))
if image_id < start_id:
continue
renamed = crop_dir + str(image_id) + '.png'
os.rename(filename, renamed)
rand_int = random.randint(0, factor)
if rand_int == 0:
image_ids.append(image_id)
pickle.dump(image_ids, open(data_dir + 'seed_image_ids.pickle', "wb"))
pickle.dump(image_ids, open(data_dir + 'test_image_ids.pickle', "wb"))
def munge(src_dir):
# stored as: ./MCG-COCO-val2014-boxes/COCO_val2014_000000193401.mat
# want: ./MCG/mat/COCO_val2014_0/COCO_val2014_000000141/COCO_val2014_000000141334.mat
files = os.listdir(src_dir)
for fn in files:
base, ext = os.path.splitext(fn)
# first 14 chars / first 22 chars / all chars + .mat
# COCO_val2014_0/COCO_val2014_000000447/COCO_val2014_000000447991.mat
first = base[:14]
second = base[:22]
dst_dir = os.path.join('MCG', 'mat', first, second)
if not os.path.exists(dst_dir):
os.makedirs(dst_dir)
src = os.path.join(src_dir, fn)
dst = os.path.join(dst_dir, fn)
print 'MV: {} -> {}'.format(src, dst)
os.rename(src, dst)
def rander(self):
writers = os.listdir(conf.train_path)
for writer in writers:
test_writer_path = '{:s}/{:s}'.format(conf.test_path, writer)
train_writer_path = '{:s}/{:s}'.format(conf.train_path, writer)
if not os.path.isdir(train_writer_path): continue
# make sure path: {test_path}/{writer} exist
if not os.path.exists(test_writer_path):
os.mkdir(test_writer_path)
# move train file as test file
files = os.listdir('{:s}/{:s}'.format(conf.train_path, writer))
for file in files:
# (0, split) to move train file => test file
if random.random() < self.split:
os.rename('{:s}/{:s}'.format(train_writer_path, file), '{:s}/{:s}'.format(test_writer_path, file))
return self
def recover(self):
writers = os.listdir(conf.test_path)
for writer in writers:
test_writer_path = '{:s}/{:s}'.format(conf.test_path, writer)
train_writer_path = '{:s}/{:s}'.format(conf.train_path, writer)
if not os.path.isdir(train_writer_path): continue
# make sure path: {train_path}/{writer} exist
if not os.path.exists(train_writer_path):
os.mkdir(train_writer_path)
# move test file to train file
files = os.listdir('{:s}/{:s}'.format(conf.test_path, writer))
for file in files:
os.rename('{:s}/{:s}'.format(test_writer_path, file), '{:s}/{:s}'.format(train_writer_path, file))
return self
def build_identify(self, writer):
X_list = []
dict_map = {}
train_writer_path = '{:s}/{:s}'.format(conf.train_path, writer)
files = os.listdir(train_writer_path)
for file in files:
tmp_path = '{:s}/{:s}'.format(train_writer_path, file)
img_data = self.read(tmp_path)
if img_data is None: continue
X_list.append(img_data)
letter = self.predict(np.array(img_data))[0]
if letter not in dict_map:
dict_map[letter] = 0
else:
dict_map[letter] += 1
print('=> rename: {:s} => {:s}_{:d}.jpg'.format(file, letter, dict_map[letter]))
os.rename(tmp_path, '{:s}/{:s}_{:d}.jpg'.format(train_writer_path, letter, dict_map[letter]))
return self
def write_file(path, fcont, uid=None, gid=None, atomic=False):
if not atomic:
return _write_file(path, fcont, uid, gid)
tmp_path = '{path}._tmp_.{pid}_{timestamp}'.format(
path=path,
pid=os.getpid(),
timestamp=timeutil.ns(),
)
_write_file(tmp_path, fcont, uid, gid)
try:
os.rename(tmp_path, path)
except EnvironmentError:
os.remove(tmp_path)
raise
def removeall_clicked(self, button, store):
"""
@description: Same as the past function but remove all lines of the
treeview
"""
# if there is still an entry in the model
old = expanduser('~') + '/.config/mama/mama.xml'
new = expanduser('~') + '/.config/mama/.mama.bak'
if os.path.exists(old):
os.rename(old, new)
if len(store) != 0:
# remove all the entries in the model
self.labelState.set_text('Remove all commands')
for i in range(len(store)):
iter = store.get_iter(0)
store.remove(iter)
self.saveTree(store)
print("Empty list")
def set_docker_compose_version_to(dir, repo_docker, tag):
"""Modifies docker-compose files in the given directory so that repo_docker
image points to the given tag."""
compose_files = docker_compose_files_list(dir)
for filename in compose_files:
old = open(filename)
new = open(filename + ".tmp", "w")
for line in old:
# Replace build tag with a new one.
line = re.sub(r"^(\s*image:\s*mendersoftware/%s:)\S+(\s*)$" % re.escape(repo_docker),
r"\g<1>%s\2" % tag, line)
new.write(line)
new.close()
old.close()
os.rename(filename + ".tmp", filename)
def save_yaml(self, filename: str=None):
"""
Serialize the hierarchy to a file.
:param filename: Target filename, autogenerated if None
"""
if filename is None:
filename = self.config_filename
with tempfile.NamedTemporaryFile('w', dir=os.path.dirname(filename),
delete=False) as temp:
header = self._yaml_header()
if header is not None:
temp.write(header)
yaml.round_trip_dump(self, stream=temp)
tempname = temp.name
os.rename(tempname, filename)
if filename in self.__class__._yaml_cache:
del self.__class__._yaml_cache[filename]
# YAML library configuration
download.py 文件源码
项目:Unsupervised-Anomaly-Detection-with-Generative-Adversarial-Networks
作者: xtarx
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def download_celeb_a(dirpath):
data_dir = 'celebA'
if os.path.exists(os.path.join(dirpath, data_dir)):
print('Found Celeb-A - skip')
return
filename, drive_id = "img_align_celeba.zip", "0B7EVK8r0v71pZjFTYXZWM3FlRnM"
save_path = os.path.join(dirpath, filename)
if os.path.exists(save_path):
print('[*] {} already exists'.format(save_path))
else:
download_file_from_google_drive(drive_id, save_path)
zip_dir = ''
with zipfile.ZipFile(save_path) as zf:
zip_dir = zf.namelist()[0]
zf.extractall(dirpath)
os.remove(save_path)
os.rename(os.path.join(dirpath, zip_dir), os.path.join(dirpath, data_dir))