def _unsafe_writes(self, src, dest, exception):
# sadly there are some situations where we cannot ensure atomicity, but only if
# the user insists and we get the appropriate error we update the file unsafely
if exception.errno == errno.EBUSY:
#TODO: issue warning that this is an unsafe operation, but doing it cause user insists
try:
try:
out_dest = open(dest, 'wb')
in_src = open(src, 'rb')
shutil.copyfileobj(in_src, out_dest)
finally: # assuring closed files in 2.4 compatible way
if out_dest:
out_dest.close()
if in_src:
in_src.close()
except (shutil.Error, OSError, IOError):
e = get_exception()
self.fail_json(msg='Could not write data to file (%s) from (%s): %s' % (dest, src, e))
else:
self.fail_json(msg='Could not replace file: %s to %s: %s' % (src, dest, exception))
python类Error()的实例源码
def do_last_steps():
'''Runs last steps'''
common.print_info("Starting last steps...")
# Restore automatic logins if our package postinstall script disabled them:
if os.path.exists(config_paths.AUTO_LOGIN_PASSWORD_FILE_MOVED):
try:
os.rename(config_paths.AUTO_LOGIN_PASSWORD_FILE_MOVED,config_paths.AUTO_LOGIN_PASSWORD_FILE)
except OSError:
common.print_error("Could not restore automatic login by moving %s." % config_paths.AUTO_LOGIN_PASSWORD_FILE_MOVED)
# Remove preference key that disabled FileVault automatic logins.
# Use defaults since the plist is probably binary.
osx.defaults_delete('DisableFDEAutoLogin', config_paths.LOGIN_WINDOW_PREFS_FILE)
# Cleanup files:
paths_list = config_paths.CLEANUP_FILES_ARRAY
paths_list.append(config_paths.THIS_LAUNCH_AGENT_PATH)
common.delete_files_by_path(paths_list)
if os.path.exists(config_paths.THIS_APP_PATH):
try:
shutil.rmtree(config_paths.THIS_APP_PATH)
except shutil.Error:
pass
common.print_info("Completed last steps. Rebooting...")
# Reboot:
osx.reboot_system()
def install_zip(self):
pack = filedialog.askopenfile("r")
found_pack = False
if pack:
with zipfile.ZipFile(pack.name, "r") as z:
for file in z.namelist():
if file == "pack.mcmeta":
# messagebox.showinfo("Information", "Found 'pack.mcmeta'.")
found_pack = True
pack.close()
if found_pack:
try:
shutil.move(pack.name, self.resourcepack_location)
except shutil.Error:
messagebox.showerror("Error", "This pack is already installed.")
else:
messagebox.showerror("Error", "Could not find 'pack.mcmeta'.")
def copy_files(src, dst, uid, gid):
"""Copy files recursively and set uid and gid."""
for root, dirs, files in os.walk(src):
for name in dirs:
dst_root = root.replace(src, dst)
try:
logging.warning("%s|%s" % (dst_root, name))
logging.warning(os.path.join(root, name))
os.mkdir(os.path.join(dst_root, name))
os.chown(os.path.join(dst_root, name), uid, gid)
except OSError, e:
logging.warn(e)
for name in files:
dst_root = root.replace(src, dst)
try:
shutil.copyfile(os.path.join(root, name),
os.path.join(dst_root, name))
os.chown(os.path.join(dst_root, name), uid, gid)
except shutil.Error:
logging.warn(e)
def _check_locale(self):
'''
Uses the locale module to test the currently set locale
(per the LANG and LC_CTYPE environment settings)
'''
try:
# setting the locale to '' uses the default locale
# as it would be returned by locale.getdefaultlocale()
locale.setlocale(locale.LC_ALL, '')
except locale.Error:
# fallback to the 'C' locale, which may cause unicode
# issues but is preferable to simply failing because
# of an unknown locale
locale.setlocale(locale.LC_ALL, 'C')
os.environ['LANG'] = 'C'
os.environ['LC_ALL'] = 'C'
os.environ['LC_MESSAGES'] = 'C'
except Exception:
e = get_exception()
self.fail_json(msg="An unknown error was encountered while attempting to validate the locale: %s" % e)
def test_dont_copy_file_onto_link_to_itself(self):
# Temporarily disable test on Windows.
if os.name == 'nt':
return
# bug 851123.
os.mkdir(TESTFN)
src = os.path.join(TESTFN, 'cheese')
dst = os.path.join(TESTFN, 'shop')
try:
with open(src, 'w') as f:
f.write('cheddar')
os.link(src, dst)
self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
with open(src, 'r') as f:
self.assertEqual(f.read(), 'cheddar')
os.remove(dst)
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
def test_dont_copy_file_onto_symlink_to_itself(self):
# bug 851123.
os.mkdir(TESTFN)
src = os.path.join(TESTFN, 'cheese')
dst = os.path.join(TESTFN, 'shop')
try:
with open(src, 'w') as f:
f.write('cheddar')
# Using `src` here would mean we end up with a symlink pointing
# to TESTFN/TESTFN/cheese, while it should point at
# TESTFN/cheese.
os.symlink('cheese', dst)
self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
with open(src, 'r') as f:
self.assertEqual(f.read(), 'cheddar')
os.remove(dst)
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
def test_copytree_named_pipe(self):
os.mkdir(TESTFN)
try:
subdir = os.path.join(TESTFN, "subdir")
os.mkdir(subdir)
pipe = os.path.join(subdir, "mypipe")
os.mkfifo(pipe)
try:
shutil.copytree(TESTFN, TESTFN2)
except shutil.Error as e:
errors = e.args[0]
self.assertEqual(len(errors), 1)
src, dst, error_msg = errors[0]
self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
else:
self.fail("shutil.Error should have been raised")
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
shutil.rmtree(TESTFN2, ignore_errors=True)
def test_copytree_dangling_symlinks(self):
# a dangling symlink raises an error at the end
src_dir = self.mkdtemp()
dst_dir = os.path.join(self.mkdtemp(), 'destination')
os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
os.mkdir(os.path.join(src_dir, 'test_dir'))
self._write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456')
self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
# a dangling symlink is ignored with the proper flag
dst_dir = os.path.join(self.mkdtemp(), 'destination2')
shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
self.assertNotIn('test.txt', os.listdir(dst_dir))
# a dangling symlink is copied if symlinks=True
dst_dir = os.path.join(self.mkdtemp(), 'destination3')
shutil.copytree(src_dir, dst_dir, symlinks=True)
self.assertIn('test.txt', os.listdir(dst_dir))
def write(self):
""" Writes configuration file """
# wait for file to unlock. Timeout after 5 seconds
for _ in range(5):
if not self._writing:
self._writing = True
break
time.sleep(1)
else:
_LOGGER.error('Could not write to configuration file. It is busy.')
return
# dump JSON to config file and unlock
encoded = self.encode()
json.dump(encoded, open(self._filetmp, 'w'), sort_keys=True, indent=4,
separators=(',', ': '))
os.chmod(self._filetmp, stat.S_IRUSR | stat.S_IWUSR)
try:
shutil.move(self._filetmp, self._file)
_LOGGER.debug('Config file succesfully updated.')
except shutil.Error as e:
_LOGGER.error('Failed to move temp config file to original error: ' + str(e))
self._writing = False
_LOGGER.debug('Wrote configuration file')
def test_copytree_named_pipe(self):
os.mkdir(TESTFN)
try:
subdir = os.path.join(TESTFN, "subdir")
os.mkdir(subdir)
pipe = os.path.join(subdir, "mypipe")
os.mkfifo(pipe)
try:
shutil.copytree(TESTFN, TESTFN2)
except shutil.Error as e:
errors = e.args[0]
self.assertEqual(len(errors), 1)
src, dst, error_msg = errors[0]
self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
else:
self.fail("shutil.Error should have been raised")
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
shutil.rmtree(TESTFN2, ignore_errors=True)
def test_copytree_named_pipe(self):
os.mkdir(TESTFN)
try:
subdir = os.path.join(TESTFN, "subdir")
os.mkdir(subdir)
pipe = os.path.join(subdir, "mypipe")
os.mkfifo(pipe)
try:
shutil.copytree(TESTFN, TESTFN2)
except shutil.Error as e:
errors = e.args[0]
self.assertEqual(len(errors), 1)
src, dst, error_msg = errors[0]
self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
else:
self.fail("shutil.Error should have been raised")
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
shutil.rmtree(TESTFN2, ignore_errors=True)
def CreateDirectories(path):
"""Create directory if the path to a file doesn't exist.
Args:
path: The full file path to where a file will be placed.
Raises:
Error: Failure creating the requested directory.
"""
dirname = os.path.dirname(path)
if not os.path.isdir(dirname):
logging.debug('Creating directory %s ', dirname)
try:
os.makedirs(dirname)
except (shutil.Error, OSError):
raise Error('Unable to make directory: %s' % dirname)
def _rename_temp_folder(name, target_folder, tmp_path):
"""
Rename the temp folder of the cloned repo
Return the name of the path to install
:return: path to install, None if already exists
"""
logger.debug("[ResourcesManager] Rename temp folder")
new_absolute_neuron_path = target_folder + os.sep + name
try:
shutil.move(tmp_path, new_absolute_neuron_path)
return new_absolute_neuron_path
except shutil.Error:
# the folder already exist
Utils.print_warning("The module %s already exist in the path %s" % (name, target_folder))
# remove the cloned repo
logger.debug("[ResourcesManager] Deleting temp folder %s" % str(tmp_path))
shutil.rmtree(tmp_path)
def test_dont_copy_file_onto_link_to_itself(self):
# Temporarily disable test on Windows.
if os.name == 'nt':
return
# bug 851123.
os.mkdir(TESTFN)
src = os.path.join(TESTFN, 'cheese')
dst = os.path.join(TESTFN, 'shop')
try:
with open(src, 'w') as f:
f.write('cheddar')
os.link(src, dst)
self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
with open(src, 'r') as f:
self.assertEqual(f.read(), 'cheddar')
os.remove(dst)
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
def test_dont_copy_file_onto_symlink_to_itself(self):
# bug 851123.
os.mkdir(TESTFN)
src = os.path.join(TESTFN, 'cheese')
dst = os.path.join(TESTFN, 'shop')
try:
with open(src, 'w') as f:
f.write('cheddar')
# Using `src` here would mean we end up with a symlink pointing
# to TESTFN/TESTFN/cheese, while it should point at
# TESTFN/cheese.
os.symlink('cheese', dst)
self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
with open(src, 'r') as f:
self.assertEqual(f.read(), 'cheddar')
os.remove(dst)
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
def test_copytree_named_pipe(self):
os.mkdir(TESTFN)
try:
subdir = os.path.join(TESTFN, "subdir")
os.mkdir(subdir)
pipe = os.path.join(subdir, "mypipe")
os.mkfifo(pipe)
try:
shutil.copytree(TESTFN, TESTFN2)
except shutil.Error as e:
errors = e.args[0]
self.assertEqual(len(errors), 1)
src, dst, error_msg = errors[0]
self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
else:
self.fail("shutil.Error should have been raised")
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
shutil.rmtree(TESTFN2, ignore_errors=True)
def test_copytree_dangling_symlinks(self):
# a dangling symlink raises an error at the end
src_dir = self.mkdtemp()
dst_dir = os.path.join(self.mkdtemp(), 'destination')
os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
os.mkdir(os.path.join(src_dir, 'test_dir'))
write_file((src_dir, 'test_dir', 'test.txt'), '456')
self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
# a dangling symlink is ignored with the proper flag
dst_dir = os.path.join(self.mkdtemp(), 'destination2')
shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
self.assertNotIn('test.txt', os.listdir(dst_dir))
# a dangling symlink is copied if symlinks=True
dst_dir = os.path.join(self.mkdtemp(), 'destination3')
shutil.copytree(src_dir, dst_dir, symlinks=True)
self.assertIn('test.txt', os.listdir(dst_dir))
def copy_files_to_project_folder(self):
old_dir = CWD
os.chdir(self.project_dir())
self.logger.info(u'Copying files to {}'.format(self.project_dir()))
for sgroup in self.settings['setting_groups']:
for setting in sgroup.values():
if setting.copy and setting.type == 'file' and setting.value:
f_path = setting.value.replace(self.project_dir(), '')
if os.path.isabs(f_path):
try:
utils.copy(setting.value, self.project_dir())
self.logger.info(u'Copying file {} to {}'.format(setting.value, self.project_dir()))
except shutil.Error as e: # same file warning
self.logger.warning(u'Warning: {}'.format(e))
finally:
setting.value = os.path.basename(setting.value)
os.chdir(old_dir)
def collect_cgroup(approot, destroot):
"""Get host treadmill cgroups inforamation."""
src = "%s/cgroup_svc" % approot
dest = "%s%s" % (destroot, src)
try:
shutil.copytree(src, dest)
except (shutil.Error, OSError):
_LOGGER.warning('skip %s => %s', src, dest)
pattern = '/cgroup/*/treadmill/core'
for cgrp_core in glob.glob(pattern):
core_dest = '%s%s' % (destroot, cgrp_core)
try:
shutil.copytree(cgrp_core, core_dest)
except (shutil.Error, OSError):
_LOGGER.warning('skip %s => %s', src, dest)
def test_copytree_named_pipe(self):
os.mkdir(TESTFN)
try:
subdir = os.path.join(TESTFN, "subdir")
os.mkdir(subdir)
pipe = os.path.join(subdir, "mypipe")
os.mkfifo(pipe)
try:
shutil.copytree(TESTFN, TESTFN2)
except shutil.Error as e:
errors = e.args[0]
self.assertEqual(len(errors), 1)
src, dst, error_msg = errors[0]
self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
else:
self.fail("shutil.Error should have been raised")
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
shutil.rmtree(TESTFN2, ignore_errors=True)
def test_copytree_named_pipe(self):
os.mkdir(TESTFN)
try:
subdir = os.path.join(TESTFN, "subdir")
os.mkdir(subdir)
pipe = os.path.join(subdir, "mypipe")
os.mkfifo(pipe)
try:
shutil.copytree(TESTFN, TESTFN2)
except shutil.Error as e:
errors = e.args[0]
self.assertEqual(len(errors), 1)
src, dst, error_msg = errors[0]
self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
else:
self.fail("shutil.Error should have been raised")
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
shutil.rmtree(TESTFN2, ignore_errors=True)
def test_copytree_dangling_symlinks(self):
# a dangling symlink raises an error at the end
src_dir = self.mkdtemp()
dst_dir = os.path.join(self.mkdtemp(), 'destination')
os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
os.mkdir(os.path.join(src_dir, 'test_dir'))
write_file((src_dir, 'test_dir', 'test.txt'), '456')
self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
# a dangling symlink is ignored with the proper flag
dst_dir = os.path.join(self.mkdtemp(), 'destination2')
shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
self.assertNotIn('test.txt', os.listdir(dst_dir))
# a dangling symlink is copied if symlinks=True
dst_dir = os.path.join(self.mkdtemp(), 'destination3')
shutil.copytree(src_dir, dst_dir, symlinks=True)
self.assertIn('test.txt', os.listdir(dst_dir))
backup_docker_volumes.py 文件源码
项目:chef_community_cookbooks
作者: DennyZhang
项目源码
文件源码
阅读 32
收藏 0
点赞 0
评论 0
def copytree(src, dst, symlinks=False, ignore=None):
for item in os.listdir(src):
s = os.path.join(src, item)
d = os.path.join(dst, item)
if os.path.isdir(s):
# TODO: better way to copy while skip symbol links
try:
shutil.copytree(s, d, symlinks, ignore)
except shutil.Error as e:
logging.warning('Warning: Some directories not copied under %s.' % (s))
except OSError as e:
logging.warning('Warning: Some directories not copied under %s.' % (s))
# logging.warning('Some directories not copied. Error: %s' % e)
else:
shutil.copy2(s, d)
################################################################################
def test_copytree_named_pipe(self):
os.mkdir(TESTFN)
try:
subdir = os.path.join(TESTFN, "subdir")
os.mkdir(subdir)
pipe = os.path.join(subdir, "mypipe")
os.mkfifo(pipe)
try:
shutil.copytree(TESTFN, TESTFN2)
except shutil.Error as e:
errors = e.args[0]
self.assertEqual(len(errors), 1)
src, dst, error_msg = errors[0]
self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
else:
self.fail("shutil.Error should have been raised")
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
shutil.rmtree(TESTFN2, ignore_errors=True)
def copytree(src, dst):
"""
Alternative implementation of shutil.copytree which allows to copy into
existing directories.
:param src:
:param dst:
:return:
"""
os.makedirs(dst, exist_ok=True)
errors = []
for name in os.listdir(src):
srcname = os.path.join(src, name)
dstname = os.path.join(dst, name)
try:
if os.path.isdir(srcname):
copytree(srcname, dstname)
else:
shutil.copy(srcname, dstname)
except shutil.Error as err:
errors.extend(err.args[0])
except OSError as err:
errors.append((srcname, dstname, str(err)))
if errors:
raise shutil.Error(errors)
return dst
def ack(self, tup_id):
"""Acknowledge tup_id, that is the path_mail. """
if os.path.exists(tup_id):
if self._what == "remove":
os.remove(tup_id)
else:
try:
shutil.move(tup_id, self._where)
except shutil.Error:
os.remove(tup_id)
try:
# Remove from tail analyzed mail
self._queue.task_done()
self._queue_tail.remove(tup_id)
self.log("Mails to process: {}".format(len(self._queue_tail)))
except KeyError, e:
self.raise_exception(e, tup_id)
def from_zip(cls, file_obj, stages_name, stages_root):
"Unpack zip from file_obj into os.path.join(stages_root, stages_name)."
try:
assignment_root = os.path.join(stages_root, stages_name)
os.mkdir(assignment_root)
with zipfile.ZipFile(file_obj, 'r') as zf:
bad_filename = zf.testzip()
if bad_filename is not None:
raise Error('Corrupt file in zip: ' + bad_filename)
# TODO: Handle case where zf.namelist() uses a lot of memory
archived_files = zf.namelist()
for af in archived_files:
zf.extract(af, assignment_root)
# TODO: The stage.save_main_script() code below is used as a workaround
# to ensure that the main script is executable. Ideally, file
# permissions would be preserved.
stages = cls(assignment_root)
for stage in stages.stages.itervalues():
stage.save_main_script()
return stages
except (zipfile.BadZipfile, zipfile.LargeZipFile) as e:
raise Error(e)
def _set_filename(self, filename):
"""Set the filename of the current working file"""
tmp_file = '_'.join(filename.split())
# new_file = new_file.replace("'",
# '_').replace('-',
# '_').replace(' ',
# '_').replace('(', '_').replace(')', '_')
new_file = ''
pathsep = os.path.sep
if sys.platform == 'win32':
pathsep = '/'
for char in tmp_file:
if char.isalnum() or char in ['.', '_', ':', pathsep, '-']:
new_file += char
try:
shutil.copy(filename, new_file)
except shutil.Error, err:
msg = "`%s` and `%s` are the same file" % (filename, new_file)
if str(err) == msg:
pass
else:
raise err
utils.ensure_file_exists(new_file)
self._filename = new_file
self._basename, self._ext = os.path.splitext(self._filename)
def delete_dir_content(dir_path=analyzer.TEMP_DIR):
global errors
gather_data_before_delete(dir_path)
dir_content = analyzer.get_dir_content(dir_path)
for item in dir_content.keys():
item_path = os.path.join(dir_path, item)
try:
if os.path.isfile(item_path):
print("Deleting file:", item)
os.remove(item_path)
else:
print("Deleting dir:", item)
shutil.rmtree(item_path)
except(os.error, shutil.Error) as error:
errors.append(str(error))
print("Unable to delete")
gather_cleanup_data(dir_path)