def test_copytree_named_pipe(self):
os.mkdir(TESTFN)
try:
subdir = os.path.join(TESTFN, "subdir")
os.mkdir(subdir)
pipe = os.path.join(subdir, "mypipe")
os.mkfifo(pipe)
try:
shutil.copytree(TESTFN, TESTFN2)
except shutil.Error as e:
errors = e.args[0]
self.assertEqual(len(errors), 1)
src, dst, error_msg = errors[0]
self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
else:
self.fail("shutil.Error should have been raised")
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
shutil.rmtree(TESTFN2, ignore_errors=True)
python类Error()的实例源码
def test_copytree_dangling_symlinks(self):
# a dangling symlink raises an error at the end
src_dir = self.mkdtemp()
dst_dir = os.path.join(self.mkdtemp(), 'destination')
os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
os.mkdir(os.path.join(src_dir, 'test_dir'))
write_file((src_dir, 'test_dir', 'test.txt'), '456')
self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
# a dangling symlink is ignored with the proper flag
dst_dir = os.path.join(self.mkdtemp(), 'destination2')
shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
self.assertNotIn('test.txt', os.listdir(dst_dir))
# a dangling symlink is copied if symlinks=True
dst_dir = os.path.join(self.mkdtemp(), 'destination3')
shutil.copytree(src_dir, dst_dir, symlinks=True)
self.assertIn('test.txt', os.listdir(dst_dir))
def _check_locale(self):
'''
Uses the locale module to test the currently set locale
(per the LANG and LC_CTYPE environment settings)
'''
try:
# setting the locale to '' uses the default locale
# as it would be returned by locale.getdefaultlocale()
locale.setlocale(locale.LC_ALL, '')
except locale.Error:
# fallback to the 'C' locale, which may cause unicode
# issues but is preferable to simply failing because
# of an unknown locale
locale.setlocale(locale.LC_ALL, 'C')
os.environ['LANG'] = 'C'
os.environ['LC_ALL'] = 'C'
os.environ['LC_MESSAGES'] = 'C'
except Exception as e:
self.fail_json(msg="An unknown error was encountered while attempting to validate the locale: %s" %
to_native(e), exception=traceback.format_exc())
def _unsafe_writes(self, src, dest):
# sadly there are some situations where we cannot ensure atomicity, but only if
# the user insists and we get the appropriate error we update the file unsafely
try:
try:
out_dest = open(dest, 'wb')
in_src = open(src, 'rb')
shutil.copyfileobj(in_src, out_dest)
finally: # assuring closed files in 2.4 compatible way
if out_dest:
out_dest.close()
if in_src:
in_src.close()
except (shutil.Error, OSError, IOError) as e:
self.fail_json(msg='Could not write data to file (%s) from (%s): %s' % (dest, src, to_native(e)),
exception=traceback.format_exc())
def setlocale(category, loc=None, printer=None):
"""Wraps locale.setlocale(), falling back to the C locale if the desired
locale is broken or unavailable. The 'printer' parameter should be a
function which takes a string and displays it. If 'None' (the default),
setlocale() will print the message to stderr."""
if printer is None:
printer = emsg
try:
locale.setlocale(category, loc)
# Because of Python bug 813449, getdefaultlocale may fail
# with a ValueError even if setlocale succeeds. So we call
# it here to prevent having this error raised if it is
# called later by other non-pkg(7) code.
locale.getdefaultlocale()
except (locale.Error, ValueError):
try:
dl = " '{0}.{1}'".format(*locale.getdefaultlocale())
except ValueError:
dl = ""
printer("Unable to set locale{0}; locale package may be broken "
"or\nnot installed. Reverting to C locale.".format(dl))
locale.setlocale(category, "C")
def moveKeyFilesToCorrectLocations(keys_dir, pkdir, skdir):
for key_file in os.listdir(keys_dir):
if key_file.endswith(".key"):
try:
shutil.move(os.path.join(keys_dir, key_file),
os.path.join(pkdir, key_file))
except shutil.Error as ex:
# print(ex)
pass
if key_file.endswith(".key_secret"):
try:
shutil.move(os.path.join(keys_dir, key_file),
os.path.join(skdir, key_file))
except shutil.Error as ex:
# print(ex)
pass
def post(self):
to = self.get_body_argument("to", False)
if to:
destination_uri = os.path.join(self.config['auto']['base_path'], to)
destination_path = urllib.url2pathname(urlparse.urlparse(destination_uri.split('file://')[1]).path)
logger.info("Moving '{}' to '{}'".format(self.album_path, destination_path))
self.core.tracklist.clear()
try:
shutil.move(self.album_path, destination_path)
logger.info("Moved album '{}' to '{}'".format(self.album_path, destination_path))
except shutil.Error:
logger.error("Could not move album '{}' to '{}'".format(self.album_path, destination_path), exc_info=1)
else:
logger.error('Destination not supplied, move canceled')
return self.redirect('/auto')
def copyPhoto(self, photo, target, recursive=True):
"""Attempt to copy photo from cache to given destination.
Arguments:
photo: photo object to download.
target: target filename
recursive (optional): attempt to download picture if it does not exist yet
Returns:
True if image was copied successfully,
False if image did not exist and download was initiated,
otherwise None.
Raises:
shutil.Error if an error occured during moving the file.
"""
pass
def _copy_temp_directory(self):
"""Copy the temp directory to the output directory."""
if os.path.exists(self.paths['output']):
oldfiles = [f for f in os.listdir(self.paths['output'])]
for oldfile in oldfiles:
os.remove(os.path.join(self.paths['output'], oldfile))
newfiles = [f for f in os.listdir(self.paths['temp'])]
for newfile in newfiles:
shutil.copy2(os.path.join(self.paths['temp'], newfile), self.paths['output'])
else:
try:
shutil.copytree(self.paths['temp'], self.paths['output'])
except shutil.Error as err:
self.exit(err)
shutil.rmtree(self.paths['temp'])
def open_pack(self):
pack = filedialog.askdirectory(initialdir=self.resourcepack_location)
if os.path.isfile(pack + "/pack.mcmeta"):
# messagebox.showinfo("Information", "Found 'pack.mcmeta'.")
self.parent.directory = pack
self.parent.cmd.tree_refresh()
self.destroy()
else:
messagebox.showerror("Error", "Could not find 'pack.mcmeta'.")
def open_zip(self):
pack = filedialog.askopenfile("r", initialdir=self.resourcepack_location)
found_pack = False
if pack:
amount = functions.zip_files(pack.name)
progress = dialog.ProgressWindow(self.parent, title="Opening Zip", maximum=amount)
count = 0
with zipfile.ZipFile(pack.name, "r") as z:
for file in z.namelist():
if file == "pack.mcmeta":
# messagebox.showinfo("Information", "Found 'pack.mcmeta'.")
found_pack = True
self.destroy()
if found_pack:
self.parent.d = tempfile.TemporaryDirectory()
for file in z.namelist():
z.extract(file, self.parent.d.name)
count += 1
progress.variable_name.set("Current File: " + file)
progress.variable_percent.set("{}% Complete".format(round(100 * float(count) / float(amount))))
progress.variable_progress.set(progress.variable_progress.get() + 1)
self.parent.name = pack.name.split("/")[-1].split(".")[0]
self.parent.directory = self.parent.d.name
self.parent.directory_real = pack.name
self.parent.cmd.tree_refresh()
self.destroy()
elif not found_pack:
messagebox.showerror("Error", "Could not find 'pack.mcmeta'.")
pack.close()
progress.destroy()
def install_pack(self):
pack = filedialog.askdirectory()
if os.path.isfile(pack + "/pack.mcmeta"):
# messagebox.showinfo("Information", "Found 'pack.mcmeta'.")
try:
shutil.move(pack, self.resourcepack_location)
except shutil.Error:
messagebox.showerror("Error", "This pack is already installed.")
else:
messagebox.showerror("Error", "Could not find 'pack.mcmeta'.")
def put_file(self, in_path, out_path):
''' transfer a file from local to local '''
super(Connection, self).put_file(in_path, out_path)
display.vvv(u"PUT {0} TO {1}".format(in_path, out_path), host=self._play_context.remote_addr)
if not os.path.exists(to_bytes(in_path, errors='surrogate_or_strict')):
raise AnsibleFileNotFound("file or module does not exist: {0}".format(to_native(in_path)))
try:
shutil.copyfile(to_bytes(in_path, errors='surrogate_or_strict'), to_bytes(out_path, errors='surrogate_or_strict'))
except shutil.Error:
raise AnsibleError("failed to copy: {0} and {1} are the same".format(to_native(in_path), to_native(out_path)))
except IOError as e:
raise AnsibleError("failed to transfer file to {0}: {1}".format(to_native(out_path), to_native(e)))
def test_existing_file_inside_dest_dir(self):
# A file with the same name inside the destination dir already exists.
with open(self.dst_file, "wb"):
pass
self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
def sort(self):
"""Sort the files inside the drop folder."""
for elt in os.listdir(self.path):
if elt == CONFIG_FILE_NAME:
continue
try:
dest = self._get_destination(elt)
move(os.path.join(self.path, elt), dest)
logging.info("%s moved to %s", elt, dest)
except LookupError as e:
logging.warning(str(e))
except Error:
logging.warning("error while moving the element %s", elt)
def show_bug_reports(self, *v):
m = Gtk.Dialog(_("Question"), self, 0)
m.add_button(Gtk.STOCK_CANCEL, Gtk.ResponseType.CANCEL)
m.add_button(Gtk.STOCK_OK, Gtk.ResponseType.OK)
vbox = Gtk.VBox()
m.vbox.pack_start(vbox, False, False, 0)
vbox.set_spacing(18)
vbox.set_border_width(12)
l = Gtk.Label(label=_("Please enter the email used when you submitted the bugs:"))
vbox.pack_start(l, False, False, 0)
self.g_email = Gtk.Entry()
m.action_area.get_children()[0].grab_default()
self.g_email.set_activates_default(True)
vbox.pack_start(self.g_email, False, False, 0)
m.show_all()
ret = m.run()
m.destroy()
if ret == Gtk.ResponseType.OK:
params = urllib.urlencode({
'pagename': 'SITS-Incoming/SearchBugs',
'q': 'SITS-Incoming/"Submitter: %s"' % utils.mangle_email(self.g_email.get_text().decode("utf-8")()),
})
try:
webbrowser.open_new("http://www.solfege.org?%s" % params)
except Exception, e:
self.display_error_message2(_("Error opening web browser"), str(e))
def display_docfile(self, fn):
"""
Display the HTML file named by fn in the help browser window.
"""
for lang in solfege.app.m_userman_language, "C":
filename = os.path.join(os.getcwdu(), u"help", lang, fn)
if os.path.isfile(filename):
break
try:
webbrowser.open(filename)
except Exception, e:
self.display_error_message2(_("Error opening web browser"), str(e))
def test_dont_copy_file_onto_link_to_itself(self):
# bug 851123.
os.mkdir(TESTFN)
src = os.path.join(TESTFN, 'cheese')
dst = os.path.join(TESTFN, 'shop')
try:
f = open(src, 'w')
f.write('cheddar')
f.close()
os.link(src, dst)
self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
with open(src, 'r') as f:
self.assertEqual(f.read(), 'cheddar')
os.remove(dst)
# Using `src` here would mean we end up with a symlink pointing
# to TESTFN/TESTFN/cheese, while it should point at
# TESTFN/cheese.
os.symlink('cheese', dst)
self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
with open(src, 'r') as f:
self.assertEqual(f.read(), 'cheddar')
os.remove(dst)
finally:
try:
shutil.rmtree(TESTFN)
except OSError:
pass
def test_existing_file_inside_dest_dir(self):
# A file with the same name inside the destination dir already exists.
with open(self.dst_file, "wb"):
pass
self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)