def test_forge_globus_download():
f = forge.Forge()
# Simple case
res1 = f.globus_download(example_result1)
assert os.path.exists("./test_fetch.txt")
os.remove("./test_fetch.txt")
# With dest and preserve_dir
dest_path = os.path.expanduser("~/mdf")
f.globus_download(example_result1, dest=dest_path, preserve_dir=True)
assert os.path.exists(os.path.join(dest_path, "test", "test_fetch.txt"))
os.remove(os.path.join(dest_path, "test", "test_fetch.txt"))
os.rmdir(os.path.join(dest_path, "test"))
# With multiple files
f.globus_download(example_result2, dest=dest_path)
assert os.path.exists(os.path.join(dest_path, "test_fetch.txt"))
assert os.path.exists(os.path.join(dest_path, "test_multifetch.txt"))
os.remove(os.path.join(dest_path, "test_fetch.txt"))
os.remove(os.path.join(dest_path, "test_multifetch.txt"))
python类rmdir()的实例源码
def remove(self, rec=1, ignore_errors=False):
""" remove a file or directory (or a directory tree if rec=1).
if ignore_errors is True, errors while removing directories will
be ignored.
"""
if self.check(dir=1, link=0):
if rec:
# force remove of readonly files on windows
if iswin32:
self.chmod(448, rec=1) # octcal 0700
py.error.checked_call(py.std.shutil.rmtree, self.strpath,
ignore_errors=ignore_errors)
else:
py.error.checked_call(os.rmdir, self.strpath)
else:
if iswin32:
self.chmod(448) # octcal 0700
py.error.checked_call(os.remove, self.strpath)
def rollback(self):
if not self.dry_run:
for f in list(self.files_written):
if os.path.exists(f):
os.remove(f)
# dirs should all be empty now, except perhaps for
# __pycache__ subdirs
# reverse so that subdirs appear before their parents
dirs = sorted(self.dirs_created, reverse=True)
for d in dirs:
flist = os.listdir(d)
if flist:
assert flist == ['__pycache__']
sd = os.path.join(d, flist[0])
os.rmdir(sd)
os.rmdir(d) # should fail if non-empty
self._init_record()
def testFindBrainClasses(self):
brainDir = "tmp-test-brains"
brainModule1 = "barBrain"
brainClass1 = "BarBrain"
brainModule2 = "fooBrain"
brainClass2 = "FooBrain"
brainFile1 = os.path.join(brainDir, "%s.py" % brainModule1)
brainFile2 = os.path.join(brainDir, "%s.py" % brainModule2)
os.mkdir(brainDir)
open(brainFile1, "w+").write("\n")
open(brainFile2, "w+").write("\n")
expected = [brainModule1 + "." + brainClass1, brainModule2 + "." + brainClass2]
try:
self.assertEqual(sorted(expected), sorted(simLauncher.SimulatorLauncher.findBrainClasses(brainDir)))
finally:
os.unlink(brainFile1)
os.unlink(brainFile2)
os.rmdir(brainDir)
def testWriteMapFile_writesStringIntoFile(self):
dirname = "testWriteMapFile"
filename = "test.txt"
filepath = os.path.join(dirname, filename)
os.mkdir(dirname)
mObj = gameMap.GameMap()
mObj.loadMapFile("maps/test-room2-l-shape.txt")
expected = mObj.toText()
mObj.writeMapFile(filepath)
actual = None
if os.path.exists(filepath):
actual = open(filepath, "r").read()
try:
self.assertEqual(expected, actual)
finally:
if os.path.exists(filepath):
os.unlink(filepath)
os.rmdir(dirname)
def cleanAll(path=None):
trash = getTrashFolder(path)
if not os.path.isdir(trash):
print "[Trashcan] No trash.", trash
return 0
for root, dirs, files in os.walk(trash, topdown=False):
for name in files:
fn = os.path.join(root, name)
try:
enigma.eBackgroundFileEraser.getInstance().erase(fn)
except Exception, e:
print "[Trashcan] Failed to erase %s:"% name, e
# Remove empty directories if possible
for name in dirs:
try:
os.rmdir(os.path.join(root, name))
except:
pass
def destroy(self):
"""Removes any files in this storage object and then removes the
storage object's directory. What happens if any of the files or the
directory are in use depends on the underlying platform.
"""
# Remove all files
self.clean()
try:
# Try to remove the directory
os.rmdir(self.folder)
except IOError:
e = sys.exc_info()[1]
if e.errno == errno.ENOENT:
pass
else:
raise e
def __del__(self):
if os.path.isfile(os.path.join(self.rootdir, 'GAMIT.status')):
os.remove(os.path.join(self.rootdir, 'GAMIT.status'))
if os.path.isfile(os.path.join(self.rootdir, 'GAMIT.fatal')):
os.remove(os.path.join(self.rootdir, 'GAMIT.fatal'))
if os.path.isfile(os.path.join(self.rootdir, 'grdtab.out')):
os.remove(os.path.join(self.rootdir, 'grdtab.out'))
if os.path.isfile(os.path.join(self.rootdir, 'harpos.' + self.StationCode)):
os.remove(os.path.join(self.rootdir, 'harpos.' + self.StationCode))
if os.path.isfile(os.path.join(self.rootdir, 'otl.grid')):
os.remove(os.path.join(self.rootdir, 'otl.grid'))
if os.path.isfile(os.path.join(self.rootdir, 'ufile.' + self.StationCode)):
os.remove(os.path.join(self.rootdir, 'ufile.' + self.StationCode))
if os.path.isdir(self.rootdir):
os.rmdir(self.rootdir)
def remove_empty_folders(folder):
for dirpath, _, files in os.walk(folder, topdown=False): # Listing the files
for file in files:
if file.endswith('DS_Store'):
# delete the stupid mac files
try:
os.remove(os.path.join(dirpath, file))
except:
sys.exc_clear()
if dirpath == folder:
break
try:
os.rmdir(dirpath)
except OSError:
sys.exc_clear()
return
def contain(command, image_name, image_dir, container_id, container_dir):
linux.unshare(linux.CLONE_NEWNS) # create a new mount namespace
linux.mount(None, '/', None, linux.MS_PRIVATE | linux.MS_REC, None)
new_root = create_container_root(
image_name, image_dir, container_id, container_dir)
print('Created a new root fs for our container: {}'.format(new_root))
_create_mounts(new_root)
old_root = os.path.join(new_root, 'old_root')
os.makedirs(old_root)
linux.pivot_root(new_root, old_root)
os.chdir('/')
linux.umount2('/old_root', linux.MNT_DETACH) # umount old root
os.rmdir('/old_root') # rmdir the old_root dir
os.execvp(command[0], command)
def contain(command, image_name, image_dir, container_id, container_dir):
linux.unshare(linux.CLONE_NEWNS) # create a new mount namespace
# TODO: switch to a new UTS namespace, change hostname to container_id
# HINT: use linux.sethostname()
linux.mount(None, '/', None, linux.MS_PRIVATE | linux.MS_REC, None)
new_root = create_container_root(
image_name, image_dir, container_id, container_dir)
print('Created a new root fs for our container: {}'.format(new_root))
_create_mounts(new_root)
old_root = os.path.join(new_root, 'old_root')
os.makedirs(old_root)
linux.pivot_root(new_root, old_root)
os.chdir('/')
linux.umount2('/old_root', linux.MNT_DETACH) # umount old root
os.rmdir('/old_root') # rmdir the old_root dir
os.execvp(command[0], command)
def contain(command, image_name, image_dir, container_id, container_dir):
linux.sethostname(container_id) # change hostname to container_id
linux.mount(None, '/', None, linux.MS_PRIVATE | linux.MS_REC, None)
new_root = create_container_root(
image_name, image_dir, container_id, container_dir)
print('Created a new root fs for our container: {}'.format(new_root))
_create_mounts(new_root)
old_root = os.path.join(new_root, 'old_root')
os.makedirs(old_root)
linux.pivot_root(new_root, old_root)
os.chdir('/')
linux.umount2('/old_root', linux.MNT_DETACH) # umount old root
os.rmdir('/old_root') # rmdir the old_root dir
os.execvp(command[0], command)
def rollback(self):
if not self.dry_run:
for f in list(self.files_written):
if os.path.exists(f):
os.remove(f)
# dirs should all be empty now, except perhaps for
# __pycache__ subdirs
# reverse so that subdirs appear before their parents
dirs = sorted(self.dirs_created, reverse=True)
for d in dirs:
flist = os.listdir(d)
if flist:
assert flist == ['__pycache__']
sd = os.path.join(d, flist[0])
os.rmdir(sd)
os.rmdir(d) # should fail if non-empty
self._init_record()
def rollback(self):
if not self.dry_run:
for f in list(self.files_written):
if os.path.exists(f):
os.remove(f)
# dirs should all be empty now, except perhaps for
# __pycache__ subdirs
# reverse so that subdirs appear before their parents
dirs = sorted(self.dirs_created, reverse=True)
for d in dirs:
flist = os.listdir(d)
if flist:
assert flist == ['__pycache__']
sd = os.path.join(d, flist[0])
os.rmdir(sd)
os.rmdir(d) # should fail if non-empty
self._init_record()
def clear_cache_directory():
for root, dirs, files in os.walk(download_cache_directory):
for d in dirs:
if d == '.git':
dirpath = os.path.join(root,d)
# print('rm dir ' + dirpath)
try:
os.rmdir(dirpath)
except:
pass
for f in files:
if f.endswith('.h') or f.endswith('.m'):
continue
filepath = os.path.join(root, f)
#remove
try:
# print('remove ' + filepath)
os.remove(filepath)
except:
pass
def rollback(self):
if not self.dry_run:
for f in list(self.files_written):
if os.path.exists(f):
os.remove(f)
# dirs should all be empty now, except perhaps for
# __pycache__ subdirs
# reverse so that subdirs appear before their parents
dirs = sorted(self.dirs_created, reverse=True)
for d in dirs:
flist = os.listdir(d)
if flist:
assert flist == ['__pycache__']
sd = os.path.join(d, flist[0])
os.rmdir(sd)
os.rmdir(d) # should fail if non-empty
self._init_record()
def tree_delete_selected(self):
try:
item = self.parent.widget_tree.item(self.parent.widget_tree.focus())["tags"][0]
os.remove(item)
print("{} | Deleting: {}".format(datetime.now().strftime("%H:%M:%S"), item))
except FileNotFoundError:
for root, dirs, files in os.walk(self.parent.widget_tree.item(self.parent.widget_tree.focus())["tags"][1],
topdown=False):
for name in files:
os.remove(os.path.join(root, name))
print("{} | Deleting: {}".format(datetime.now().strftime("%H:%M:%S"), name))
for name in dirs:
os.rmdir(os.path.join(root, name))
print("{} | Deleting: {}".format(datetime.now().strftime("%H:%M:%S"), name))
item = self.parent.widget_tree.item(self.parent.widget_tree.focus())["tags"][1]
os.rmdir(item)
print("{} | Deleting: {}".format(datetime.now().strftime("%H:%M:%S"), item))
self.parent.cmd.tree_refresh()
def run(parser, args):
tmpdir = tempfile.mkdtemp(dir='.')
cmd = ("porechop --verbosity 2 --untrimmed -i \"%s\" -b %s --barcode_threshold 80 --threads %s --check_reads 10000 --barcode_diff 5 --require_two_barcodes > %s.demultiplexreport.txt" % (args.fasta, tmpdir, args.threads, args.fasta))
print >>sys.stderr, cmd
os.system(cmd)
a, b = os.path.split(args.fasta)
prefix, ext = os.path.splitext(b)
for fn in os.listdir(tmpdir):
newfn = "%s-%s" % (prefix, os.path.basename(fn))
shutil.move(tmpdir + '/' + fn, newfn)
os.system("gunzip -f %s" % (newfn,))
if not args.no_remove_directory:
os.rmdir(tmpdir)
def get_key(self, name):
try:
# pull it down from ipfs
self.api.get('{}/names/{}'.format(self.root_hash, name))
# stores in cwdir, so load the files into memory and delete them
n = self.file_to_memory('{}/{}/n'.format(os.getcwd(), name))
e = self.file_to_memory('{}/{}/e'.format(os.getcwd(), name))
os.rmdir('{}/{}'.format(os.getcwd(), name))
return (n, e)
except:
return False
def release(self):
if not self.is_locked():
raise NotLocked("%s is not locked" % self.path)
elif not os.path.exists(self.unique_name):
raise NotMyLock("%s is locked, but not by me" % self.path)
os.unlink(self.unique_name)
os.rmdir(self.lock_file)