def test_on_error(self):
self.errorState = 0
os.mkdir(TESTFN)
self.childpath = os.path.join(TESTFN, 'a')
f = open(self.childpath, 'w')
f.close()
old_dir_mode = os.stat(TESTFN).st_mode
old_child_mode = os.stat(self.childpath).st_mode
# Make unwritable.
os.chmod(self.childpath, stat.S_IREAD)
os.chmod(TESTFN, stat.S_IREAD)
shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
# Test whether onerror has actually been called.
self.assertEqual(self.errorState, 2,
"Expected call to onerror function did not happen.")
# Make writable again.
os.chmod(TESTFN, old_dir_mode)
os.chmod(self.childpath, old_child_mode)
# Clean up.
shutil.rmtree(TESTFN)
python类open()的实例源码
def test_dont_copy_file_onto_link_to_itself(self):
# Temporarily disable test on Windows.
if os.name == 'nt':
return
# bug 851123.
os.mkdir(TESTFN)
src = os.path.join(TESTFN, 'cheese')
dst = os.path.join(TESTFN, 'shop')
try:
with open(src, 'w') as f:
f.write('cheddar')
os.link(src, dst)
self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
with open(src, 'r') as f:
self.assertEqual(f.read(), 'cheddar')
os.remove(dst)
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
def test_tarfile_root_owner(self):
tmpdir, tmpdir2, base_name = self._create_files()
old_dir = os.getcwd()
os.chdir(tmpdir)
group = grp.getgrgid(0)[0]
owner = pwd.getpwuid(0)[0]
try:
archive_name = _make_tarball(base_name, 'dist', compress=None,
owner=owner, group=group)
finally:
os.chdir(old_dir)
# check if the compressed tarball was created
self.assertTrue(os.path.exists(archive_name))
# now checks the rights
archive = tarfile.open(archive_name)
try:
for member in archive.getmembers():
self.assertEqual(member.uid, 0)
self.assertEqual(member.gid, 0)
finally:
archive.close()
def test_w_dest_open_fails(self):
srcfile = self.Faux()
def _open(filename, mode='r'):
if filename == 'srcfile':
return srcfile
if filename == 'destfile':
raise IOError('Cannot open "destfile"')
assert 0 # shouldn't reach here.
self._set_shutil_open(_open)
shutil.copyfile('srcfile', 'destfile')
self.assertTrue(srcfile._entered)
self.assertTrue(srcfile._exited_with[0] is IOError)
self.assertEqual(srcfile._exited_with[1].args,
('Cannot open "destfile"',))
def test_on_error(self):
self.errorState = 0
os.mkdir(TESTFN)
self.childpath = os.path.join(TESTFN, 'a')
f = open(self.childpath, 'w')
f.close()
old_dir_mode = os.stat(TESTFN).st_mode
old_child_mode = os.stat(self.childpath).st_mode
# Make unwritable.
os.chmod(self.childpath, stat.S_IREAD)
os.chmod(TESTFN, stat.S_IREAD)
shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
# Test whether onerror has actually been called.
self.assertEqual(self.errorState, 2,
"Expected call to onerror function did not happen.")
# Make writable again.
os.chmod(TESTFN, old_dir_mode)
os.chmod(self.childpath, old_child_mode)
# Clean up.
shutil.rmtree(TESTFN)
def test_tarfile_root_owner(self):
root_dir, base_dir = self._create_files()
base_name = os.path.join(self.mkdtemp(), 'archive')
group = grp.getgrgid(0)[0]
owner = pwd.getpwuid(0)[0]
with support.change_cwd(root_dir):
archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
owner=owner, group=group)
# check if the compressed tarball was created
self.assertTrue(os.path.isfile(archive_name))
# now checks the rights
archive = tarfile.open(archive_name)
try:
for member in archive.getmembers():
self.assertEqual(member.uid, 0)
self.assertEqual(member.gid, 0)
finally:
archive.close()
def setUp(self):
filename = "foo"
self.src_dir = tempfile.mkdtemp()
self.dst_dir = tempfile.mkdtemp()
self.src_file = os.path.join(self.src_dir, filename)
self.dst_file = os.path.join(self.dst_dir, filename)
# Try to create a dir in the current directory, hoping that it is
# not located on the same filesystem as the system tmp dir.
try:
self.dir_other_fs = tempfile.mkdtemp(
dir=os.path.dirname(__file__))
self.file_other_fs = os.path.join(self.dir_other_fs,
filename)
except OSError:
self.dir_other_fs = None
with open(self.src_file, "wb") as f:
f.write("spam")
def test_w_dest_open_fails(self):
srcfile = self.Faux()
def _open(filename, mode='r'):
if filename == 'srcfile':
return srcfile
if filename == 'destfile':
raise IOError('Cannot open "destfile"')
assert 0 # shouldn't reach here.
self._set_shutil_open(_open)
shutil.copyfile('srcfile', 'destfile')
self.assertTrue(srcfile._entered)
self.assertTrue(srcfile._exited_with[0] is IOError)
self.assertEqual(srcfile._exited_with[1].args,
('Cannot open "destfile"',))
def test_on_error(self):
self.errorState = 0
os.mkdir(TESTFN)
self.childpath = os.path.join(TESTFN, 'a')
f = open(self.childpath, 'w')
f.close()
old_dir_mode = os.stat(TESTFN).st_mode
old_child_mode = os.stat(self.childpath).st_mode
# Make unwritable.
os.chmod(self.childpath, stat.S_IREAD)
os.chmod(TESTFN, stat.S_IREAD)
shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
# Test whether onerror has actually been called.
self.assertEqual(self.errorState, 2,
"Expected call to onerror function did not happen.")
# Make writable again.
os.chmod(TESTFN, old_dir_mode)
os.chmod(self.childpath, old_child_mode)
# Clean up.
shutil.rmtree(TESTFN)
def test_tarfile_root_owner(self):
root_dir, base_dir = self._create_files()
base_name = os.path.join(self.mkdtemp(), 'archive')
group = grp.getgrgid(0)[0]
owner = pwd.getpwuid(0)[0]
with support.change_cwd(root_dir):
archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
owner=owner, group=group)
# check if the compressed tarball was created
self.assertTrue(os.path.isfile(archive_name))
# now checks the rights
archive = tarfile.open(archive_name)
try:
for member in archive.getmembers():
self.assertEqual(member.uid, 0)
self.assertEqual(member.gid, 0)
finally:
archive.close()
def setUp(self):
filename = "foo"
self.src_dir = tempfile.mkdtemp()
self.dst_dir = tempfile.mkdtemp()
self.src_file = os.path.join(self.src_dir, filename)
self.dst_file = os.path.join(self.dst_dir, filename)
# Try to create a dir in the current directory, hoping that it is
# not located on the same filesystem as the system tmp dir.
try:
self.dir_other_fs = tempfile.mkdtemp(
dir=os.path.dirname(__file__))
self.file_other_fs = os.path.join(self.dir_other_fs,
filename)
except OSError:
self.dir_other_fs = None
with open(self.src_file, "wb") as f:
f.write("spam")
def test_w_dest_open_fails(self):
srcfile = self.Faux()
def _open(filename, mode='r'):
if filename == 'srcfile':
return srcfile
if filename == 'destfile':
raise IOError('Cannot open "destfile"')
assert 0 # shouldn't reach here.
self._set_shutil_open(_open)
shutil.copyfile('srcfile', 'destfile')
self.assertTrue(srcfile._entered)
self.assertTrue(srcfile._exited_with[0] is IOError)
self.assertEqual(srcfile._exited_with[1].args,
('Cannot open "destfile"',))
def test_rmtree_uses_safe_fd_version_if_available(self):
_use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
os.supports_dir_fd and
os.listdir in os.supports_fd and
os.stat in os.supports_follow_symlinks)
if _use_fd_functions:
self.assertTrue(shutil._use_fd_functions)
self.assertTrue(shutil.rmtree.avoids_symlink_attacks)
tmp_dir = self.mkdtemp()
d = os.path.join(tmp_dir, 'a')
os.mkdir(d)
try:
real_rmtree = shutil._rmtree_safe_fd
class Called(Exception): pass
def _raiser(*args, **kwargs):
raise Called
shutil._rmtree_safe_fd = _raiser
self.assertRaises(Called, shutil.rmtree, d)
finally:
shutil._rmtree_safe_fd = real_rmtree
else:
self.assertFalse(shutil._use_fd_functions)
self.assertFalse(shutil.rmtree.avoids_symlink_attacks)
def test_dont_copy_file_onto_link_to_itself(self):
# Temporarily disable test on Windows.
if os.name == 'nt':
return
# bug 851123.
os.mkdir(TESTFN)
src = os.path.join(TESTFN, 'cheese')
dst = os.path.join(TESTFN, 'shop')
try:
with open(src, 'w') as f:
f.write('cheddar')
os.link(src, dst)
self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
with open(src, 'r') as f:
self.assertEqual(f.read(), 'cheddar')
os.remove(dst)
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
def test_dont_copy_file_onto_symlink_to_itself(self):
# bug 851123.
os.mkdir(TESTFN)
src = os.path.join(TESTFN, 'cheese')
dst = os.path.join(TESTFN, 'shop')
try:
with open(src, 'w') as f:
f.write('cheddar')
# Using `src` here would mean we end up with a symlink pointing
# to TESTFN/TESTFN/cheese, while it should point at
# TESTFN/cheese.
os.symlink('cheese', dst)
self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
with open(src, 'r') as f:
self.assertEqual(f.read(), 'cheddar')
os.remove(dst)
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
def test_tarfile_root_owner(self):
tmpdir, tmpdir2, base_name = self._create_files()
old_dir = os.getcwd()
os.chdir(tmpdir)
group = grp.getgrgid(0)[0]
owner = pwd.getpwuid(0)[0]
try:
archive_name = _make_tarball(base_name, 'dist', compress=None,
owner=owner, group=group)
finally:
os.chdir(old_dir)
# check if the compressed tarball was created
self.assertTrue(os.path.exists(archive_name))
# now checks the rights
archive = tarfile.open(archive_name)
try:
for member in archive.getmembers():
self.assertEqual(member.uid, 0)
self.assertEqual(member.gid, 0)
finally:
archive.close()
def test_w_dest_open_fails(self):
srcfile = self.Faux()
def _open(filename, mode='r'):
if filename == 'srcfile':
return srcfile
if filename == 'destfile':
raise IOError('Cannot open "destfile"')
assert 0 # shouldn't reach here.
self._set_shutil_open(_open)
shutil.copyfile('srcfile', 'destfile')
self.assertTrue(srcfile._entered)
self.assertTrue(srcfile._exited_with[0] is IOError)
self.assertEqual(srcfile._exited_with[1].args,
('Cannot open "destfile"',))
def test_on_error(self):
self.errorState = 0
os.mkdir(TESTFN)
self.childpath = os.path.join(TESTFN, 'a')
f = open(self.childpath, 'w')
f.close()
old_dir_mode = os.stat(TESTFN).st_mode
old_child_mode = os.stat(self.childpath).st_mode
# Make unwritable.
os.chmod(self.childpath, stat.S_IREAD)
os.chmod(TESTFN, stat.S_IREAD)
shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
# Test whether onerror has actually been called.
self.assertEqual(self.errorState, 2,
"Expected call to onerror function did not happen.")
# Make writable again.
os.chmod(TESTFN, old_dir_mode)
os.chmod(self.childpath, old_child_mode)
# Clean up.
shutil.rmtree(TESTFN)
def setUp(self):
filename = "foo"
self.src_dir = tempfile.mkdtemp()
self.dst_dir = tempfile.mkdtemp()
self.src_file = os.path.join(self.src_dir, filename)
self.dst_file = os.path.join(self.dst_dir, filename)
# Try to create a dir in the current directory, hoping that it is
# not located on the same filesystem as the system tmp dir.
try:
self.dir_other_fs = tempfile.mkdtemp(
dir=os.path.dirname(__file__))
self.file_other_fs = os.path.join(self.dir_other_fs,
filename)
except OSError:
self.dir_other_fs = None
with open(self.src_file, "wb") as f:
f.write("spam")
def test_rmtree_uses_safe_fd_version_if_available(self):
_use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
os.supports_dir_fd and
os.listdir in os.supports_fd and
os.stat in os.supports_follow_symlinks)
if _use_fd_functions:
self.assertTrue(shutil._use_fd_functions)
self.assertTrue(shutil.rmtree.avoids_symlink_attacks)
tmp_dir = self.mkdtemp()
d = os.path.join(tmp_dir, 'a')
os.mkdir(d)
try:
real_rmtree = shutil._rmtree_safe_fd
class Called(Exception): pass
def _raiser(*args, **kwargs):
raise Called
shutil._rmtree_safe_fd = _raiser
self.assertRaises(Called, shutil.rmtree, d)
finally:
shutil._rmtree_safe_fd = real_rmtree
else:
self.assertFalse(shutil._use_fd_functions)
self.assertFalse(shutil.rmtree.avoids_symlink_attacks)
def test_dont_copy_file_onto_symlink_to_itself(self):
# bug 851123.
os.mkdir(TESTFN)
src = os.path.join(TESTFN, 'cheese')
dst = os.path.join(TESTFN, 'shop')
try:
with open(src, 'w') as f:
f.write('cheddar')
# Using `src` here would mean we end up with a symlink pointing
# to TESTFN/TESTFN/cheese, while it should point at
# TESTFN/cheese.
os.symlink('cheese', dst)
self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
with open(src, 'r') as f:
self.assertEqual(f.read(), 'cheddar')
os.remove(dst)
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
def test_tarfile_root_owner(self):
root_dir, base_dir = self._create_files()
base_name = os.path.join(self.mkdtemp(), 'archive')
group = grp.getgrgid(0)[0]
owner = pwd.getpwuid(0)[0]
with support.change_cwd(root_dir):
archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
owner=owner, group=group)
# check if the compressed tarball was created
self.assertTrue(os.path.isfile(archive_name))
# now checks the rights
archive = tarfile.open(archive_name)
try:
for member in archive.getmembers():
self.assertEqual(member.uid, 0)
self.assertEqual(member.gid, 0)
finally:
archive.close()
def test_w_dest_open_fails(self):
srcfile = self.Faux()
def _open(filename, mode='r'):
if filename == 'srcfile':
return srcfile
if filename == 'destfile':
raise OSError('Cannot open "destfile"')
assert 0 # shouldn't reach here.
self._set_shutil_open(_open)
shutil.copyfile('srcfile', 'destfile')
self.assertTrue(srcfile._entered)
self.assertTrue(srcfile._exited_with[0] is OSError)
self.assertEqual(srcfile._exited_with[1].args,
('Cannot open "destfile"',))
def test_on_error(self):
self.errorState = 0
os.mkdir(TESTFN)
self.childpath = os.path.join(TESTFN, 'a')
f = open(self.childpath, 'w')
f.close()
old_dir_mode = os.stat(TESTFN).st_mode
old_child_mode = os.stat(self.childpath).st_mode
# Make unwritable.
os.chmod(self.childpath, stat.S_IREAD)
os.chmod(TESTFN, stat.S_IREAD)
shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
# Test whether onerror has actually been called.
self.assertEqual(self.errorState, 2,
"Expected call to onerror function did not happen.")
# Make writable again.
os.chmod(TESTFN, old_dir_mode)
os.chmod(self.childpath, old_child_mode)
# Clean up.
shutil.rmtree(TESTFN)
def setUp(self):
filename = "foo"
self.src_dir = tempfile.mkdtemp()
self.dst_dir = tempfile.mkdtemp()
self.src_file = os.path.join(self.src_dir, filename)
self.dst_file = os.path.join(self.dst_dir, filename)
# Try to create a dir in the current directory, hoping that it is
# not located on the same filesystem as the system tmp dir.
try:
self.dir_other_fs = tempfile.mkdtemp(
dir=os.path.dirname(__file__))
self.file_other_fs = os.path.join(self.dir_other_fs,
filename)
except OSError:
self.dir_other_fs = None
with open(self.src_file, "wb") as f:
f.write("spam")
def test_rmtree_uses_safe_fd_version_if_available(self):
_use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
os.supports_dir_fd and
os.listdir in os.supports_fd and
os.stat in os.supports_follow_symlinks)
if _use_fd_functions:
self.assertTrue(shutil._use_fd_functions)
self.assertTrue(shutil.rmtree.avoids_symlink_attacks)
tmp_dir = self.mkdtemp()
d = os.path.join(tmp_dir, 'a')
os.mkdir(d)
try:
real_rmtree = shutil._rmtree_safe_fd
class Called(Exception): pass
def _raiser(*args, **kwargs):
raise Called
shutil._rmtree_safe_fd = _raiser
self.assertRaises(Called, shutil.rmtree, d)
finally:
shutil._rmtree_safe_fd = real_rmtree
else:
self.assertFalse(shutil._use_fd_functions)
self.assertFalse(shutil.rmtree.avoids_symlink_attacks)
def test_dont_copy_file_onto_symlink_to_itself(self):
# bug 851123.
os.mkdir(TESTFN)
src = os.path.join(TESTFN, 'cheese')
dst = os.path.join(TESTFN, 'shop')
try:
with open(src, 'w') as f:
f.write('cheddar')
# Using `src` here would mean we end up with a symlink pointing
# to TESTFN/TESTFN/cheese, while it should point at
# TESTFN/cheese.
os.symlink('cheese', dst)
self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
with open(src, 'r') as f:
self.assertEqual(f.read(), 'cheddar')
os.remove(dst)
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
def test_tarfile_root_owner(self):
tmpdir, tmpdir2, base_name = self._create_files()
old_dir = os.getcwd()
os.chdir(tmpdir)
group = grp.getgrgid(0)[0]
owner = pwd.getpwuid(0)[0]
try:
archive_name = _make_tarball(base_name, 'dist', compress=None,
owner=owner, group=group)
finally:
os.chdir(old_dir)
# check if the compressed tarball was created
self.assertTrue(os.path.exists(archive_name))
# now checks the rights
archive = tarfile.open(archive_name)
try:
for member in archive.getmembers():
self.assertEqual(member.uid, 0)
self.assertEqual(member.gid, 0)
finally:
archive.close()
def test_w_dest_open_fails(self):
srcfile = self.Faux()
def _open(filename, mode='r'):
if filename == 'srcfile':
return srcfile
if filename == 'destfile':
raise OSError('Cannot open "destfile"')
assert 0 # shouldn't reach here.
self._set_shutil_open(_open)
shutil.copyfile('srcfile', 'destfile')
self.assertTrue(srcfile._entered)
self.assertTrue(srcfile._exited_with[0] is OSError)
self.assertEqual(srcfile._exited_with[1].args,
('Cannot open "destfile"',))
def write_file(self, path, content='xxx'):
"""Writes a file in the given path.
path can be a string or a sequence.
"""
if isinstance(path, (list, tuple)):
path = os.path.join(*path)
f = open(path, 'w')
try:
f.write(content)
finally:
f.close()