def rollback(self):
if not self.dry_run:
for f in list(self.files_written):
if os.path.exists(f):
os.remove(f)
# dirs should all be empty now, except perhaps for
# __pycache__ subdirs
# reverse so that subdirs appear before their parents
dirs = sorted(self.dirs_created, reverse=True)
for d in dirs:
flist = os.listdir(d)
if flist:
assert flist == ['__pycache__']
sd = os.path.join(d, flist[0])
os.rmdir(sd)
os.rmdir(d) # should fail if non-empty
self._init_record()
python类rmdir()的实例源码
def rollback(self):
if not self.dry_run:
for f in list(self.files_written):
if os.path.exists(f):
os.remove(f)
# dirs should all be empty now, except perhaps for
# __pycache__ subdirs
# reverse so that subdirs appear before their parents
dirs = sorted(self.dirs_created, reverse=True)
for d in dirs:
flist = os.listdir(d)
if flist:
assert flist == ['__pycache__']
sd = os.path.join(d, flist[0])
os.rmdir(sd)
os.rmdir(d) # should fail if non-empty
self._init_record()
def _test_NoAccessDir(self, nodeName):
devBooter, devMgr = self.launchDeviceManager("/nodes/%s/DeviceManager.dcd.xml" % nodeName)
device = devMgr._get_registeredDevices()[0]
fileMgr = self._domMgr._get_fileMgr()
dirname = '/noaccess'
testdir = os.path.join(scatest.getSdrPath(), 'dom' + dirname)
if not os.path.exists(testdir):
os.mkdir(testdir, 0000)
else:
os.chmod(testdir, 0000)
try:
self.assertFalse(os.access(testdir, os.R_OK|os.X_OK), 'Current user can still access directory')
self.assertRaises(CF.LoadableDevice.LoadFail, device.load, fileMgr, dirname, CF.LoadableDevice.SHARED_LIBRARY)
finally:
os.rmdir(testdir)
def test_comp_macro_directories_config_python(self):
file_loc = os.getcwd()
self.comp = sb.launch(self.cname, impl="python", execparams={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/logconfig.cfg'} )
fp = None
try:
fp = open('foo/bar/test.log','r')
except:
pass
try:
os.remove('foo/bar/test.log')
except:
pass
try:
os.rmdir('foo/bar')
except:
pass
try:
os.rmdir('foo')
except:
pass
self.assertNotEquals(fp, None)
def test_comp_macro_directories_config_cpp(self):
file_loc = os.getcwd()
self.comp = sb.launch(self.cname, impl="cpp", execparams={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/logconfig.cfg'} )
fp = None
try:
fp = open('foo/bar/test.log','r')
except:
pass
try:
os.remove('foo/bar/test.log')
except:
pass
try:
os.rmdir('foo/bar')
except:
pass
try:
os.rmdir('foo')
except:
pass
self.assertNotEquals(fp, None)
def test_comp_macro_directories_config_java(self):
file_loc = os.getcwd()
self.comp = sb.launch(self.cname, impl="java", execparams={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/logconfig.cfg'} )
fp = None
try:
fp = open('foo/bar/test.log','r')
except:
pass
try:
os.remove('foo/bar/test.log')
except:
pass
try:
os.rmdir('foo/bar')
except:
pass
try:
os.rmdir('foo')
except:
pass
self.assertNotEquals(fp, None)
def test_ExistsException(self):
self.assertNotEqual(self._domMgr, None)
fileMgr = self._domMgr._get_fileMgr()
# Makes sure that FileSystem::exists() throws correct exception and
# doesn't kill domain for files in directories it cannot access
dirname = '/noaccess'
testdir = os.path.join(scatest.getSdrPath(), 'dom' + dirname)
if not os.path.exists(testdir):
os.mkdir(testdir, 0644)
else:
os.chmod(testdir, 0644)
try:
self.assertFalse(os.access(testdir, os.R_OK|os.X_OK), 'Current user can still access directory')
self.assertRaises(CF.InvalidFileName, fileMgr.exists, os.path.join(dirname, 'testfile'))
finally:
os.rmdir(testdir)
def remove_folder(self, folder):
"""Delete the named folder, which must be empty."""
path = os.path.join(self._path, '.' + folder)
for entry in os.listdir(os.path.join(path, 'new')) + \
os.listdir(os.path.join(path, 'cur')):
if len(entry) < 1 or entry[0] != '.':
raise NotEmptyError('Folder contains message(s): %s' % folder)
for entry in os.listdir(path):
if entry != 'new' and entry != 'cur' and entry != 'tmp' and \
os.path.isdir(os.path.join(path, entry)):
raise NotEmptyError("Folder contains subdirectory '%s': %s" %
(folder, entry))
for root, dirs, files in os.walk(path, topdown=False):
for entry in files:
os.remove(os.path.join(root, entry))
for entry in dirs:
os.rmdir(os.path.join(root, entry))
os.rmdir(path)
def reformat(self, sourcefile, destfile, configfile):
# type: (str, str, str) -> None
"""Reformats sourcefile according to configfile and writes it to destfile.
This method is only used for testing.
"""
tmpdir = tempfile.mkdtemp(prefix='whatstyle_')
cfg = os.path.join(tmpdir, self.configfilename)
copyfile(configfile, cfg)
tmpfilename = os.path.join(tmpdir, os.path.basename(sourcefile))
copyfile(sourcefile, tmpfilename)
cmdargs = [tmpfilename]
exeresult = run_executable(self.exe, cmdargs)
writebinary(destfile, exeresult.stdout)
os.remove(tmpfilename)
os.remove(cfg)
os.rmdir(tmpdir)
def test_replace_loggroup_section(mocked_configparser_class):
config_dir = api_utils.user_config_dir(cli.lecli.__name__)
if not os.path.exists(api_utils.CONFIG_FILE_PATH):
if not os.path.exists(config_dir):
os.makedirs(config_dir)
loggroups_section = api_utils.LOGGROUPS_SECTION
config_parser_mock = mocked_configparser_class.return_value
config_parser_mock.add_section(loggroups_section)
with patch.object(api_utils.CONFIG, 'items',
return_value=[('test-log-group-favs', ["test-log-key1", "test-log-key2"])]):
api_utils.replace_loggroup_section()
assert not api_utils.CONFIG.has_section(loggroups_section)
assert config_parser_mock.has_section(api_utils.CLI_FAVORITES_SECTION)
try:
os.remove(api_utils.CONFIG_FILE_PATH)
os.rmdir(config_dir)
except OSError:
pass
def cleanup(self):
"""Close and remove all created temporary files.
For the purposes of debugging, files are not removed
if BAP finished with a positive nonzero code. I.e.,
they are removed only if BAP terminated normally, or was
killed by a signal (terminated).
All opened file descriptros are closed in any case."""
for desc in self.fds:
desc.close()
if not self.DEBUG and (self.proc is None or
self.proc.returncode <= 0):
for path in os.listdir(self.tmpdir):
os.remove(os.path.join(self.tmpdir, path))
os.rmdir(self.tmpdir)
def rollback(self):
if not self.dry_run:
for f in list(self.files_written):
if os.path.exists(f):
os.remove(f)
# dirs should all be empty now, except perhaps for
# __pycache__ subdirs
# reverse so that subdirs appear before their parents
dirs = sorted(self.dirs_created, reverse=True)
for d in dirs:
flist = os.listdir(d)
if flist:
assert flist == ['__pycache__']
sd = os.path.join(d, flist[0])
os.rmdir(sd)
os.rmdir(d) # should fail if non-empty
self._init_record()
def rollback(self):
if not self.dry_run:
for f in list(self.files_written):
if os.path.exists(f):
os.remove(f)
# dirs should all be empty now, except perhaps for
# __pycache__ subdirs
# reverse so that subdirs appear before their parents
dirs = sorted(self.dirs_created, reverse=True)
for d in dirs:
flist = os.listdir(d)
if flist:
assert flist == ['__pycache__']
sd = os.path.join(d, flist[0])
os.rmdir(sd)
os.rmdir(d) # should fail if non-empty
self._init_record()
def remove_directory_files(path, remove_directory=False, remove_path=False):
"Remove directory at path, respecting the flags."
for root, dirs, files in os.walk(path, False):
# Ignore path if remove_path is false.
if remove_path or root != path:
for name in files:
filename = os.path.join(root, name)
try:
os.remove(filename)
except OSError:
pass
# Ignore directory if remove_directory is false.
if remove_directory:
try:
os.rmdir(root)
except OSError:
pass
def unlock(self):
'''does not raise an exception,
safe to unlock as often as you want
it may just do nothing'''
if self.locked.has_key(self.d2):
#we're the ones that unlocked it,
#if time matched
if self.locked[self.d2]==\
os.stat(self.d2)[8]:
try:
del(self.locked[self.d2])
os.rmdir(self.d2)
os.rmdir(self.d)
return 1
except:
return 0
else:
del(self.locked[self.d2])
return 0
else:
return 0
def prepare_working_dir(directory_path, purge=False):
folders = ["/findings/", "/corpus", "/findings/panic", "/findings/kasan", "/findings/timeout", "/rbuf", "/evaluation"]
if purge:
if os.path.isdir(directory_path):
for root, dirs, files in os.walk(directory_path, topdown=False):
for name in files:
os.remove(os.path.join(root, name))
for name in dirs:
os.rmdir(os.path.join(root, name))
if os.path.exists("/dev/shm/kafl_filter0"):
os.remove("/dev/shm/kafl_filter0")
if os.path.exists("/dev/shm/kafl_tfilter"):
os.remove("/dev/shm/kafl_tfilter")
if len(os.listdir(directory_path)) == 0:
for folder in folders:
os.makedirs(directory_path + folder)
else:
for folder in folders:
if not os.path.isdir(directory_path + folder):
return False
return True
def rollback(self):
if not self.dry_run:
for f in list(self.files_written):
if os.path.exists(f):
os.remove(f)
# dirs should all be empty now, except perhaps for
# __pycache__ subdirs
# reverse so that subdirs appear before their parents
dirs = sorted(self.dirs_created, reverse=True)
for d in dirs:
flist = os.listdir(d)
if flist:
assert flist == ['__pycache__']
sd = os.path.join(d, flist[0])
os.rmdir(sd)
os.rmdir(d) # should fail if non-empty
self._init_record()
def rollback(self):
if not self.dry_run:
for f in list(self.files_written):
if os.path.exists(f):
os.remove(f)
# dirs should all be empty now, except perhaps for
# __pycache__ subdirs
# reverse so that subdirs appear before their parents
dirs = sorted(self.dirs_created, reverse=True)
for d in dirs:
flist = os.listdir(d)
if flist:
assert flist == ['__pycache__']
sd = os.path.join(d, flist[0])
os.rmdir(sd)
os.rmdir(d) # should fail if non-empty
self._init_record()
def test_log_file_with_timed_rotating(self):
tmpdir = tempfile.mkdtemp()
try:
self.options.log_file_prefix = tmpdir + '/test_log'
self.options.log_rotate_mode = 'time'
enable_pretty_logging(options=self.options, logger=self.logger)
self.logger.error('hello')
self.logger.handlers[0].flush()
filenames = glob.glob(tmpdir + '/test_log*')
self.assertEqual(1, len(filenames))
with open(filenames[0]) as f:
self.assertRegexpMatches(
f.read(),
r'^\[E [^]]*\] hello$')
finally:
for handler in self.logger.handlers:
handler.flush()
handler.close()
for filename in glob.glob(tmpdir + '/test_log*'):
os.unlink(filename)
os.rmdir(tmpdir)
def test_log_file(self):
tmpdir = tempfile.mkdtemp()
try:
self.options.log_file_prefix = tmpdir + '/test_log'
enable_pretty_logging(options=self.options, logger=self.logger)
self.assertEqual(1, len(self.logger.handlers))
self.logger.error('hello')
self.logger.handlers[0].flush()
filenames = glob.glob(tmpdir + '/test_log*')
self.assertEqual(1, len(filenames))
with open(filenames[0]) as f:
self.assertRegexpMatches(f.read(), r'^\[E [^]]*\] hello$')
finally:
for handler in self.logger.handlers:
handler.flush()
handler.close()
for filename in glob.glob(tmpdir + '/test_log*'):
os.unlink(filename)
os.rmdir(tmpdir)
def test_log_file_with_timed_rotating(self):
tmpdir = tempfile.mkdtemp()
try:
self.options.log_file_prefix = tmpdir + '/test_log'
self.options.log_rotate_mode = 'time'
enable_pretty_logging(options=self.options, logger=self.logger)
self.logger.error('hello')
self.logger.handlers[0].flush()
filenames = glob.glob(tmpdir + '/test_log*')
self.assertEqual(1, len(filenames))
with open(filenames[0]) as f:
self.assertRegexpMatches(
f.read(),
r'^\[E [^]]*\] hello$')
finally:
for handler in self.logger.handlers:
handler.flush()
handler.close()
for filename in glob.glob(tmpdir + '/test_log*'):
os.unlink(filename)
os.rmdir(tmpdir)
def rollback(self):
if not self.dry_run:
for f in list(self.files_written):
if os.path.exists(f):
os.remove(f)
# dirs should all be empty now, except perhaps for
# __pycache__ subdirs
# reverse so that subdirs appear before their parents
dirs = sorted(self.dirs_created, reverse=True)
for d in dirs:
flist = os.listdir(d)
if flist:
assert flist == ['__pycache__']
sd = os.path.join(d, flist[0])
os.rmdir(sd)
os.rmdir(d) # should fail if non-empty
self._init_record()
def test_log_file(self):
tmpdir = tempfile.mkdtemp()
try:
self.options.log_file_prefix = tmpdir + '/test_log'
enable_pretty_logging(options=self.options, logger=self.logger)
self.assertEqual(1, len(self.logger.handlers))
self.logger.error('hello')
self.logger.handlers[0].flush()
filenames = glob.glob(tmpdir + '/test_log*')
self.assertEqual(1, len(filenames))
with open(filenames[0]) as f:
self.assertRegexpMatches(f.read(), r'^\[E [^]]*\] hello$')
finally:
for handler in self.logger.handlers:
handler.flush()
handler.close()
for filename in glob.glob(tmpdir + '/test_log*'):
os.unlink(filename)
os.rmdir(tmpdir)
def test_log_file_with_timed_rotating(self):
tmpdir = tempfile.mkdtemp()
try:
self.options.log_file_prefix = tmpdir + '/test_log'
self.options.log_rotate_mode = 'time'
enable_pretty_logging(options=self.options, logger=self.logger)
self.logger.error('hello')
self.logger.handlers[0].flush()
filenames = glob.glob(tmpdir + '/test_log*')
self.assertEqual(1, len(filenames))
with open(filenames[0]) as f:
self.assertRegexpMatches(
f.read(),
r'^\[E [^]]*\] hello$')
finally:
for handler in self.logger.handlers:
handler.flush()
handler.close()
for filename in glob.glob(tmpdir + '/test_log*'):
os.unlink(filename)
os.rmdir(tmpdir)
def rollback(self):
if not self.dry_run:
for f in list(self.files_written):
if os.path.exists(f):
os.remove(f)
# dirs should all be empty now, except perhaps for
# __pycache__ subdirs
# reverse so that subdirs appear before their parents
dirs = sorted(self.dirs_created, reverse=True)
for d in dirs:
flist = os.listdir(d)
if flist:
assert flist == ['__pycache__']
sd = os.path.join(d, flist[0])
os.rmdir(sd)
os.rmdir(d) # should fail if non-empty
self._init_record()
def rollback(self):
if not self.dry_run:
for f in list(self.files_written):
if os.path.exists(f):
os.remove(f)
# dirs should all be empty now, except perhaps for
# __pycache__ subdirs
# reverse so that subdirs appear before their parents
dirs = sorted(self.dirs_created, reverse=True)
for d in dirs:
flist = os.listdir(d)
if flist:
assert flist == ['__pycache__']
sd = os.path.join(d, flist[0])
os.rmdir(sd)
os.rmdir(d) # should fail if non-empty
self._init_record()
def retrive_osd_details(device_name):
osd_details = {}
if device_name is None:
return None
try:
tmpd = tempfile.mkdtemp()
log.info("Create temp directory %s" %(tmpd))
try:
out_mnt = utils.execute_local_command(['mount',device_name,tmpd])
if out_mnt['retcode'] == 0:
osd_details = _retrive_osd_details_from_dir(tmpd)
finally:
utils.execute_local_command(['umount',tmpd])
finally:
log.info("Destroy temp directory %s" %(tmpd))
os.rmdir(tmpd)
return osd_details
def paired_files():
root = tempfile.mkdtemp()
files = [[root, 'file1.mp3'], [root, 'file1.jams'],
[root, 'file2.ogg'], [root, 'file2.jams'],
[root, 'file3.wav'], [root, 'file3.jams'],
[root, 'file4.flac'], [root, 'file4.jams']]
files = [os.sep.join(_) for _ in files]
for fname in files:
with open(fname, 'w'):
pass
yield root, files
for fname in files:
try:
os.remove(fname)
except FileNotFoundError:
pass
os.rmdir(root)
def symlinks_supported():
"""
A function to check if creating symlinks are supported in the
host platform and/or if they are allowed to be created (e.g.
on Windows it requires admin permissions).
"""
tmpdir = tempfile.mkdtemp()
original_path = os.path.join(tmpdir, 'original')
symlink_path = os.path.join(tmpdir, 'symlink')
os.makedirs(original_path)
try:
os.symlink(original_path, symlink_path)
supported = True
except (OSError, NotImplementedError, AttributeError):
supported = False
else:
os.remove(symlink_path)
finally:
os.rmdir(original_path)
os.rmdir(tmpdir)
return supported
def rollback(self):
if not self.dry_run:
for f in list(self.files_written):
if os.path.exists(f):
os.remove(f)
# dirs should all be empty now, except perhaps for
# __pycache__ subdirs
# reverse so that subdirs appear before their parents
dirs = sorted(self.dirs_created, reverse=True)
for d in dirs:
flist = os.listdir(d)
if flist:
assert flist == ['__pycache__']
sd = os.path.join(d, flist[0])
os.rmdir(sd)
os.rmdir(d) # should fail if non-empty
self._init_record()