def __close_computation(self, client=None, await_async=False, task=None):
self.__server_locations.clear()
if self._cur_computation:
close_tasks = [SysTask(self.__close_node, node, self._cur_computation,
await_async=await_async) for node in self._nodes.itervalues()]
close_tasks.extend([SysTask(self.__close_node, node, self._cur_computation)
for node in self._disabled_nodes.itervalues()
if node.status == Scheduler.NodeDiscovered])
for close_task in close_tasks:
yield close_task.finish()
if self.__cur_client_auth:
computation_path = os.path.join(self.__dest_path, self.__cur_client_auth)
if os.path.isdir(computation_path):
shutil.rmtree(computation_path, ignore_errors=True)
if self._cur_computation and self._cur_computation.status_task:
self._cur_computation.status_task.send(DispycosStatus(Scheduler.ComputationClosed,
id(self._cur_computation)))
self.__cur_client_auth = self._cur_computation = None
self.__computation_sched_event.set()
if client:
client.send('closed')
raise StopIteration(0)
python类rmtree()的实例源码
def __close_computation(self, client=None, await_async=False, task=None):
self.__server_locations.clear()
if self._cur_computation:
close_tasks = [SysTask(self.__close_node, node, self._cur_computation,
await_async=await_async) for node in self._nodes.values()]
close_tasks.extend([SysTask(self.__close_node, node, self._cur_computation)
for node in self._disabled_nodes.values()
if node.status == Scheduler.NodeDiscovered])
for close_task in close_tasks:
yield close_task.finish()
if self.__cur_client_auth:
computation_path = os.path.join(self.__dest_path, self.__cur_client_auth)
if os.path.isdir(computation_path):
shutil.rmtree(computation_path, ignore_errors=True)
if self._cur_computation and self._cur_computation.status_task:
self._cur_computation.status_task.send(DispycosStatus(Scheduler.ComputationClosed,
id(self._cur_computation)))
self.__cur_client_auth = self._cur_computation = None
self.__computation_sched_event.set()
if client:
client.send('closed')
raise StopIteration(0)
def archive_context(filename):
# extracting the archive
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', tmpdir)
old_wd = os.getcwd()
try:
os.chdir(tmpdir)
with get_zip_class()(filename) as archive:
archive.extractall()
# going in the directory
subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
os.chdir(subdir)
log.warn('Now working in %s', subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
def _delete_directory_contents(self, dirpath, filter_func):
"""Delete all files in a directory.
:param dirpath: path to directory to clear
:type dirpath: ``unicode`` or ``str``
:param filter_func function to determine whether a file shall be
deleted or not.
:type filter_func ``callable``
"""
if os.path.exists(dirpath):
for filename in os.listdir(dirpath):
if not filter_func(filename):
continue
path = os.path.join(dirpath, filename)
if os.path.isdir(path):
shutil.rmtree(path)
else:
os.unlink(path)
self.logger.debug('Deleted : %r', path)
def generate2():
"""
Call an external Python 2 program to retrieve the AST symbols of that
language version
:return:
"""
import subprocess as sp
import tempfile, shutil, sys, traceback
tempdir = tempfile.mkdtemp()
tempfile = os.path.join(tempdir, "py2_ast_code.py")
py2_proc_out = ""
try:
with open(tempfile, 'w') as py2code:
py2code.write(generate_str + WRITESYMS_CODE)
py2_proc_out = sp.check_output(["python2", tempfile]).decode()
finally:
try:
shutil.rmtree(tempdir)
except:
print("Warning: error trying to delete the temporal directory:", file=sys.stderr)
print(traceback.format_exc(), file=sys.stderr)
return set(py2_proc_out.splitlines())
def ensure_removed(self, path):
if os.path.exists(path):
if os.path.isdir(path) and not os.path.islink(path):
logger.debug('Removing directory tree at %s', path)
if not self.dry_run:
shutil.rmtree(path)
if self.record:
if path in self.dirs_created:
self.dirs_created.remove(path)
else:
if os.path.islink(path):
s = 'link'
else:
s = 'file'
logger.debug('Removing %s %s', s, path)
if not self.dry_run:
os.remove(path)
if self.record:
if path in self.files_written:
self.files_written.remove(path)
def test_install():
tempdir = mkdtemp()
def get_supported():
return list(wheel.pep425tags.get_supported()) + [('py3', 'none', 'win32')]
whl = WheelFile(TESTWHEEL, context=get_supported)
assert whl.supports_current_python(get_supported)
try:
locs = {}
for key in ('purelib', 'platlib', 'scripts', 'headers', 'data'):
locs[key] = os.path.join(tempdir, key)
os.mkdir(locs[key])
whl.install(overrides=locs)
assert len(os.listdir(locs['purelib'])) == 0
assert check(locs['platlib'], 'hello.pyd')
assert check(locs['platlib'], 'hello', 'hello.py')
assert check(locs['platlib'], 'hello', '__init__.py')
assert check(locs['data'], 'hello.dat')
assert check(locs['headers'], 'hello.dat')
assert check(locs['scripts'], 'hello.sh')
assert check(locs['platlib'], 'test-1.0.dist-info', 'RECORD')
finally:
shutil.rmtree(tempdir)
def run(self):
self.run_command("egg_info")
from glob import glob
for pattern in self.match:
pattern = self.distribution.get_name() + '*' + pattern
files = glob(os.path.join(self.dist_dir, pattern))
files = [(os.path.getmtime(f), f) for f in files]
files.sort()
files.reverse()
log.info("%d file(s) matching %s", len(files), pattern)
files = files[self.keep:]
for (t, f) in files:
log.info("Deleting %s", f)
if not self.dry_run:
if os.path.isdir(f):
shutil.rmtree(f)
else:
os.unlink(f)
def build_and_install(self, setup_script, setup_base):
args = ['bdist_egg', '--dist-dir']
dist_dir = tempfile.mkdtemp(
prefix='egg-dist-tmp-', dir=os.path.dirname(setup_script)
)
try:
self._set_fetcher_options(os.path.dirname(setup_script))
args.append(dist_dir)
self.run_setup(setup_script, setup_base, args)
all_eggs = Environment([dist_dir])
eggs = []
for key in all_eggs:
for dist in all_eggs[key]:
eggs.append(self.install_egg(dist.location, setup_base))
if not eggs and not self.dry_run:
log.warn("No eggs found in %s (setup script problem?)",
dist_dir)
return eggs
finally:
rmtree(dist_dir)
log.set_verbosity(self.verbose) # restore our log verbosity
def archive_context(filename):
# extracting the archive
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', tmpdir)
old_wd = os.getcwd()
try:
os.chdir(tmpdir)
with get_zip_class()(filename) as archive:
archive.extractall()
# going in the directory
subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
os.chdir(subdir)
log.warn('Now working in %s', subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
def archive_context(filename):
# extracting the archive
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', tmpdir)
old_wd = os.getcwd()
try:
os.chdir(tmpdir)
with get_zip_class()(filename) as archive:
archive.extractall()
# going in the directory
subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
os.chdir(subdir)
log.warn('Now working in %s', subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
def upload_to_fileshare_test(self): #pylint: disable=no-self-use
"""Upload copies files to non-native store correctly with no
progress"""
import shutil
import tempfile
temp_file = tempfile.NamedTemporaryFile(dir=tempfile.mkdtemp())
temp_src_dir = os.path.dirname(temp_file.name)
temp_dst_dir = tempfile.mkdtemp()
shutil_mock = MagicMock()
shutil_mock.copyfile.return_value = None
with patch('sfctl.custom_app.shutil', new=shutil_mock):
sf_c.upload_to_fileshare(temp_src_dir, temp_dst_dir, False)
shutil_mock.copyfile.assert_called_once()
temp_file.close()
shutil.rmtree(os.path.dirname(temp_file.name))
shutil.rmtree(temp_dst_dir)
def create(self, data):
if not os.path.exists(const.REPOS_DIR):
os.mkdir(const.REPOS_DIR)
repo_path = os.path.join(const.REPOS_DIR, data['repo_name'])
if os.path.exists(repo_path):
logger.debug('Repo directory exists. Removing...')
shutil.rmtree(repo_path)
user_key = data.get('user_key', '')
if user_key:
self._create_key_file(data['repo_name'], user_key)
os.environ['GIT_SSH'] = self._get_ssh_cmd(data['repo_name'])
repo = Repo.clone_from(data['git_url'], repo_path)
instance = super(GitRepo, self).create(data)
instance.repo = repo
return instance
def tearDown(self):
"""clean up the test
"""
shutil.rmtree(defaults.server_side_storage_path)
# remove generic_temp_folder
shutil.rmtree(self.temp_test_data_folder, ignore_errors=True)
# remove repository
shutil.rmtree(self.test_repo_path, ignore_errors=True)
# clean up test database
# from stalker.db.declarative import Base
# Base.metadata.drop_all(db.DBSession.connection())
# db.DBSession.commit()
db.DBSession.remove()
testing.tearDown()
def close(self):
# TODO: should I do clean shutdown here? Do I have to?
if self._makefile_refs < 1:
self._closed = True
if self.context:
CoreFoundation.CFRelease(self.context)
self.context = None
if self._client_cert_chain:
CoreFoundation.CFRelease(self._client_cert_chain)
self._client_cert_chain = None
if self._keychain:
Security.SecKeychainDelete(self._keychain)
CoreFoundation.CFRelease(self._keychain)
shutil.rmtree(self._keychain_dir)
self._keychain = self._keychain_dir = None
return self.socket.close()
else:
self._makefile_refs -= 1
def ensure_removed(self, path):
if os.path.exists(path):
if os.path.isdir(path) and not os.path.islink(path):
logger.debug('Removing directory tree at %s', path)
if not self.dry_run:
shutil.rmtree(path)
if self.record:
if path in self.dirs_created:
self.dirs_created.remove(path)
else:
if os.path.islink(path):
s = 'link'
else:
s = 'file'
logger.debug('Removing %s %s', s, path)
if not self.dry_run:
os.remove(path)
if self.record:
if path in self.files_written:
self.files_written.remove(path)
def build_and_install(self, setup_script, setup_base):
args = ['bdist_egg', '--dist-dir']
dist_dir = tempfile.mkdtemp(
prefix='egg-dist-tmp-', dir=os.path.dirname(setup_script)
)
try:
self._set_fetcher_options(os.path.dirname(setup_script))
args.append(dist_dir)
self.run_setup(setup_script, setup_base, args)
all_eggs = Environment([dist_dir])
eggs = []
for key in all_eggs:
for dist in all_eggs[key]:
eggs.append(self.install_egg(dist.location, setup_base))
if not eggs and not self.dry_run:
log.warn("No eggs found in %s (setup script problem?)",
dist_dir)
return eggs
finally:
rmtree(dist_dir)
log.set_verbosity(self.verbose) # restore our log verbosity
def clean(args, config):
""" Main entrypoint for clean """
lock_file = join(HACKSPORTS_ROOT, "deploy.lock")
# remove staging directories
if os.path.isdir(STAGING_ROOT):
logger.info("Removing the staging directories")
shutil.rmtree(STAGING_ROOT)
# remove lock file
if os.path.isfile(lock_file):
logger.info("Removing the stale lock file")
os.remove(lock_file)
#TODO: potentially perform more cleaning
def _test_Valgrind(self, valgrind):
# Clear the device cache to prevent false positives
deviceCacheDir = os.path.join(scatest.getSdrCache(), ".ExecutableDevice_node", "ExecutableDevice1")
shutil.rmtree(deviceCacheDir, ignore_errors=True)
os.environ['VALGRIND'] = valgrind
try:
# Checking that the node and device launch as expected
nb, devMgr = self.launchDeviceManager("/nodes/test_ExecutableDevice_node/DeviceManager.dcd.xml")
finally:
del os.environ['VALGRIND']
self.assertFalse(devMgr is None)
self.assertEquals(len(devMgr._get_registeredDevices()), 1, msg='device failed to launch with valgrind')
children = getChildren(nb.pid)
self.assertEqual(len(children), 1)
devMgr.shutdown()
# Check that a valgrind logfile exists
logfile = os.path.join(deviceCacheDir, 'valgrind.%s.log' % children[0])
self.assertTrue(os.path.exists(logfile))
def createTestDomain():
domainName = getTestDomainName()
print domainName
domainPath = os.path.join(getSdrPath(), "dom", "domain")
templatePath = os.path.join(getSdrPath(), "templates", "domain")
# Create a test domain
if os.path.isdir(domainPath):
shutil.rmtree(domainPath)
# Copy the template over
shutil.copytree(templatePath, domainPath)
# Open the DMD file and replace the name, using a very naive method
dmd = open(os.path.join(domainPath, "DomainManager.dmd.xml"), "r")
lines = dmd.read()
dmd.close()
lines = lines.replace("${DOMAINNAME}", domainName)
dmd = open(os.path.join(domainPath, "DomainManager.dmd.xml"), "w+")
dmd.write(lines)
dmd.close()
setupDeviceAndDomainMgrPackage()
def _demo():
info("hi")
debug("shouldn't appear")
set_level(DEBUG)
debug("should appear")
dir = "/tmp/testlogging"
if os.path.exists(dir):
shutil.rmtree(dir)
with session(dir=dir):
record_tabular("a", 3)
record_tabular("b", 2.5)
dump_tabular()
record_tabular("b", -2.5)
record_tabular("a", 5.5)
dump_tabular()
info("^^^ should see a = 5.5")
record_tabular("b", -2.5)
dump_tabular()
record_tabular("a", "longasslongasslongasslongasslongasslongassvalue")
dump_tabular()
def start_ab3(tmp_dir_loc, repo_dir, pkg_info, rm_abdir=False):
start_time = int(time.time())
os.chdir(tmp_dir_loc)
if not copy_abd(tmp_dir_loc, repo_dir, pkg_info):
return False
# For logging support: ptyprocess.PtyProcessUnicode.spawn(['autobuild'])
shadow_defines_loc = os.path.abspath(os.path.curdir)
if not parser_pass_through(pkg_info, shadow_defines_loc):
return False
try:
subprocess.check_call(['autobuild'])
except:
return False
time_span = int(time.time()) - start_time
print('>>>>>>>>>>>>>>>>>> Time for building\033[36m {} \033[0m:\033[36m {} \033[0mseconds'.format(
pkg_info['NAME'], time_span))
if rm_abdir is True:
shutil.rmtree(os.path.abspath(os.path.curdir) + '/autobuild/')
# Will get better display later
return True
def build(**args):
freezer = Freezer(app)
if args.get('base_url'):
app.config['FREEZER_BASE_URL'] = args.get('base_url')
app.config['mathjax_node'] = args.get('node_mathjax', False)
app.config['MATHJAX_WHOLEBOOK'] = args.get('node_mathjax', False)
app.config['FREEZER_DESTINATION'] = os.path.join(curdir, "build")
app.config['freeze'] = True
freezer.freeze()
if args.get('copy_mathjax'):
mathjax_postfix = os.path.join('assets', "js", "mathjax")
mathjax_from = os.path.join(scriptdir, mathjax_postfix)
mathjax_to = os.path.join(curdir, "build", mathjax_postfix)
try:
shutil.rmtree(mathjax_to)
except FileNotFoundError:
pass
shutil.copytree(mathjax_from, mathjax_to)
def archive_context(filename):
"""
Unzip filename to a temporary directory, set to the cwd.
The unzipped target is cleaned up after.
"""
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', tmpdir)
old_wd = os.getcwd()
try:
os.chdir(tmpdir)
with ContextualZipFile(filename) as archive:
archive.extractall()
# going in the directory
subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
os.chdir(subdir)
log.warn('Now working in %s', subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
def createDir(name, force=False):
if os.path.exists(name):
if force:
shutil.rmtree(name)
else:
response = raw_input('%s already exists. Do you wish to overwrite it? (y/n) ' % name)
if response.lower() == 'y' or response.lower() == 'yes':
shutil.rmtree(name)
elif response.lower() == 'n' or response.lower() == 'no':
print 'Modeler aborted.'
exit(0)
else:
print 'Response not understood.'
print 'Modeler aborted.'
exit(1)
os.mkdir(name)
def createDir(name, force=False):
if os.path.exists(name):
if force:
shutil.rmtree(name)
else:
response = raw_input('%s already exists. Do you wish to overwrite it? (y/n) ' % name)
if response.lower() == 'y' or response.lower() == 'yes':
shutil.rmtree(name)
elif response.lower() == 'n' or response.lower() == 'no':
print 'Modeler aborted.'
exit(0)
else:
print 'Response not understood.'
print 'Modeler aborted.'
exit(1)
os.mkdir(name)
def delete_module(self,name):
try:
file = open("{directory}/route.py".format(directory=BASE),"r+")
readfile = file.readlines()
file.seek(0)
#delete = ("\n \nfrom openedoo.{module} import {module}".format(module=name))
for line in readfile:
if str(name) not in line:
file.writelines(line)
file.truncate()
file.close()
shutil.rmtree('{dir_file}/modules/{name}'.format(dir_file=BASE_DIR,name=name))
file = open("{directory}/tables.py".format(directory=BASE),"r+")
readfile = file.readlines()
file.seek(0)
#delete = ("\n \nfrom openedoo.{module} import {module}".format(module=name))
for line in readfile:
if str(name) not in line:
file.writelines(line)
file.truncate()
file.close()
except Exception as e:
print e
def delete_module(self,name):
try:
file = open("{directory}/route.py".format(directory=BASE),"r+")
readfile = file.readlines()
file.seek(0)
#delete = ("\n \nfrom openedoo.{module} import {module}".format(module=name))
for line in readfile:
if str(name) not in line:
file.writelines(line)
file.truncate()
file.close()
shutil.rmtree('{dir_file}/modules/{name}'.format(dir_file=BASE_DIR,name=name))
file = open("{directory}/tables.py".format(directory=BASE),"r+")
readfile = file.readlines()
file.seek(0)
#delete = ("\n \nfrom openedoo.{module} import {module}".format(module=name))
for line in readfile:
if str(name) not in line:
file.writelines(line)
file.truncate()
file.close()
except Exception as e:
print e
def delete_module(self,name):
try:
file = open("{directory}/route.py".format(directory=BASE),"r+")
readfile = file.readlines()
file.seek(0)
#delete = ("\n \nfrom openedoo.{module} import {module}".format(module=name))
for line in readfile:
if str(name) not in line:
file.writelines(line)
file.truncate()
file.close()
shutil.rmtree('{dir_file}/modules/{name}'.format(dir_file=BASE_DIR,name=name))
file = open("{directory}/tables.py".format(directory=BASE),"r+")
readfile = file.readlines()
file.seek(0)
#delete = ("\n \nfrom openedoo.{module} import {module}".format(module=name))
for line in readfile:
if str(name) not in line:
file.writelines(line)
file.truncate()
file.close()
except Exception as e:
print e
def archive_context(filename):
# extracting the archive
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', tmpdir)
old_wd = os.getcwd()
try:
os.chdir(tmpdir)
with get_zip_class()(filename) as archive:
archive.extractall()
# going in the directory
subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
os.chdir(subdir)
log.warn('Now working in %s', subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)