def close(self):
# TODO: should I do clean shutdown here? Do I have to?
if self._makefile_refs < 1:
self._closed = True
if self.context:
CoreFoundation.CFRelease(self.context)
self.context = None
if self._client_cert_chain:
CoreFoundation.CFRelease(self._client_cert_chain)
self._client_cert_chain = None
if self._keychain:
Security.SecKeychainDelete(self._keychain)
CoreFoundation.CFRelease(self._keychain)
shutil.rmtree(self._keychain_dir)
self._keychain = self._keychain_dir = None
return self.socket.close()
else:
self._makefile_refs -= 1
python类rmtree()的实例源码
def archive_context(filename):
# extracting the archive
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', tmpdir)
old_wd = os.getcwd()
try:
os.chdir(tmpdir)
with ContextualZipFile(filename) as archive:
archive.extractall()
# going in the directory
subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
os.chdir(subdir)
log.warn('Now working in %s', subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
def clean_project_files(path_or_glob, logger) :
"""
Resolve file name references and ensure they are properly deleted
"""
if "*" in path_or_glob :
files_to_clean = glob.glob(path_or_glob)
else :
files_to_clean = [os.path.expanduser(path_or_glob)]
for file_to_clean in files_to_clean :
if not os.path.exists(file_to_clean) :
continue
if os.path.isdir(file_to_clean) :
logger.info("Removing directory {}".format(file_to_clean))
shutil.rmtree(file_to_clean)
else :
logger.info("Removing file {}".format(file_to_clean))
os.remove(file_to_clean)
def cleanup_dir(tmpdir, keep_data_files=False, ignore_errors=False):
if keep_data_files: return
#Remove our tmpdir, but don't fail the test if it doesn't remove
try:
shutil.rmtree(tmpdir, ignore_errors=ignore_errors)
except OSError as oe:
error = ""
if oe.errno: error = "%s: " % oe.errno
if oe.strerror: error += oe.strerror
if oe.filename: error += " (filename: %s)" % oe.filename
log.warning("Unable to remove powstream temporary directory %s due to error reported by OS: %s" % (tmpdir, error))
except:
log.warning("Unable to remove powstream temporary directory %s: %s" % (tmpdir, sys.exc_info()[0]))
##
# Called by signal handlers to clean-up then exit
def test_get_picture(self):
picture_root = os.path.join(
MockRunMod.load_config(None)[helper.DATA_ROOT], '..', 'pictures')
try:
shutil.rmtree(picture_root)
except:
pass
try:
os.makedirs(picture_root)
except:
pass
with open(os.path.join(picture_root, 'test.jpg'), 'wb') as out:
out.write(b'abcde')
rv = self.app.get('/picture?name=test.jpg&mediatype=image/jpeg')
expected = b'abcde'
actual = rv.data
self.assertEqual(expected, actual)
def test_simple(self):
mock_pipeline = test_helper.get_mock_pipeline([])
data_root = os.path.join('local_data', 'unittests')
if os.path.exists(data_root):
shutil.rmtree(data_root)
_copy = copy_file.Subscriber(mock_pipeline)
_copy.setup({
helper.DATA_ROOT: data_root,
'workers': 1,
'tag': 'default',
helper.COPY_EXT: ['xyz']
})
_copy.consume(document.get_document('mock.xyz'), BytesIO(b'juba.'))
_copy.consume(document.get_document('ignore.doc'), BytesIO(b'mock'))
expected = ['39bbf948-mock.xyz']
actual = os.listdir(os.path.join(data_root, 'files', 'xyz'))
self.assertEqual(expected, actual)
def test_simple(self):
mock_pipeline = test_helper.get_mock_pipeline([])
data_root = os.path.join('local_data', 'unittests')
if os.path.exists(data_root):
shutil.rmtree(data_root)
_store_text = store_text.Subscriber(mock_pipeline)
_store_text.setup({
helper.DATA_ROOT: data_root,
'workers': 1
})
doc = document.get_document('mock')
doc.text = 'mock-mock-mock'
_store_text.consume(doc, None)
expected = 'local_data/unittests/text/17404a59-mock'
actual = doc.meta['text_file']
self.assertEquals(expected, actual)
def test_event_logging():
logdir = './experiment/'
summary_writer = FileWriter(logdir)
scalar_value = 1.0
s = scalar('test_scalar', scalar_value)
summary_writer.add_summary(s, global_step=1)
summary_writer.close()
assert os.path.isdir(logdir)
assert len(os.listdir(logdir)) == 1
summary_writer = FileWriter(logdir)
scalar_value = 1.0
s = scalar('test_scalar', scalar_value)
summary_writer.add_summary(s, global_step=1)
summary_writer.close()
assert os.path.isdir(logdir)
assert len(os.listdir(logdir)) == 2
# clean up.
shutil.rmtree(logdir)
def clean(mcu_switch=None, supress_output=False):
cmd = TOOLCHAIN_BASIC_CONFIGURE + ' '
if mcu_switch is None:
sphinx_build_dir = os.path.join('build', 'sphinx')
if os.path.isdir(sphinx_build_dir):
shutil.rmtree(sphinx_build_dir)
print "Successfully removed sphinx documentation"
else:
print 'Nothing to clean...'
return
elif mcu_switch == '-p' or mcu_switch == '-s' or mcu_switch == '-b' :
cmd += ' ' + mcu_switch + ' ' + 'clean'
else:
print 'Invalid clean argument: \'{}\''.format(mcu_switch)
sys.exit(1)
start_process(cmd, supress_output)
def neo4j_test_ws_dir(datafiles):
return datafiles
# @pytest.fixture(scope="session")
# def workspace(request, data_directory):
# wsconf_file = data_directory.join("workspace.yaml")
# temp_root = tempfile.mkdtemp()
# ws = Workspace("saapy-test-ws",
# temp_root,
# "saapy-test-ws",
# configuration_text=wsconf_file.read_text("utf-8"))
#
# def fin():
# shutil.rmtree(temp_root)
#
# request.addfinalizer(fin)
# return ws # provide the fixture value
def archive_context(filename):
# extracting the archive
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', tmpdir)
old_wd = os.getcwd()
try:
os.chdir(tmpdir)
with get_zip_class()(filename) as archive:
archive.extractall()
# going in the directory
subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
os.chdir(subdir)
log.warn('Now working in %s', subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
def stop(self):
print("Cleaning up self %s." % self.getName())
# kill the running self
_uncheckedDockerCommand(["kill", self.getName()])
# remove self
_uncheckedDockerCommand(["rm", self.getName()])
# remove image -- later may be a preference to keep it
#_uncheckedDockerCommand(["rmi", self.getImageName()])
# remove shared directory
try:
if "BP_LEAVE_FILES" in os.environ:
pass
else:
shutil.rmtree(self.getSharedDir())
except OSError:
print("Warning: failed to remove shared directory.", file=sys.stderr)
def compare_component_output(self, input_path, expected_output_path):
rendering_engine = self.get_rendering_engine()
temp_dir = tempfile.gettempdir()
output_dir = os.path.join(temp_dir, str(uuid.uuid4()))
process_sketch_archive(zip_path=input_path, compress_zip=False,
output_path=output_dir, engine=rendering_engine)
self.assertTrue(dircmp.is_same(expected_output_path, output_dir))
shutil.rmtree(output_dir)
storage.clear()
output_zip = os.path.join(temp_dir, "{}.zip".format(str(uuid.uuid4())))
process_sketch_archive(zip_path=input_path, compress_zip=True,
output_path=output_zip, engine=rendering_engine)
z = zipfile.ZipFile(output_zip)
z.extractall(output_dir)
self.assertTrue(dircmp.is_same(expected_output_path, output_dir))
shutil.rmtree(output_dir)
os.remove(output_zip)
def __init__(self, **kwargs):
self.__class__._instance = self
self._nodes = {}
self._disabled_nodes = {}
self._avail_nodes = set()
self._nodes_avail = pycos.Event()
self._nodes_avail.clear()
self._shared = False
self._cur_computation = None
self.__cur_client_auth = None
self.__cur_node_allocations = []
self.__pulse_interval = kwargs.pop('pulse_interval', MaxPulseInterval)
self.__ping_interval = kwargs.pop('ping_interval', 0)
self.__zombie_period = kwargs.pop('zombie_period', 100 * MaxPulseInterval)
self._node_port = kwargs.pop('dispycosnode_port', 51351)
self.__server_locations = set()
self.__job_scheduler_task = None
kwargs['name'] = 'dispycos_scheduler'
clean = kwargs.pop('clean', False)
nodes = kwargs.pop('nodes', [])
self.pycos = pycos.Pycos.instance(**kwargs)
self.__dest_path = os.path.join(self.pycos.dest_path, 'dispycos', 'dispycosscheduler')
if clean:
shutil.rmtree(self.__dest_path)
self.pycos.dest_path = self.__dest_path
self.__computation_sched_event = pycos.Event()
self.__computation_scheduler_task = SysTask(self.__computation_scheduler_proc, nodes)
self.__client_task = SysTask(self.__client_proc)
self.__timer_task = SysTask(self.__timer_proc)
Scheduler.__status_task = self.__status_task = SysTask(self.__status_proc)
self.__client_task.register('dispycos_scheduler')
self.pycos.discover_peers(port=self._node_port)
def __init__(self, **kwargs):
self.__class__._instance = self
self._nodes = {}
self._disabled_nodes = {}
self._avail_nodes = set()
self._nodes_avail = pycos.Event()
self._nodes_avail.clear()
self._shared = False
self._cur_computation = None
self.__cur_client_auth = None
self.__cur_node_allocations = []
self.__pulse_interval = kwargs.pop('pulse_interval', MaxPulseInterval)
self.__ping_interval = kwargs.pop('ping_interval', 0)
self.__zombie_period = kwargs.pop('zombie_period', 100 * MaxPulseInterval)
self._node_port = kwargs.pop('dispycosnode_port', 51351)
self.__server_locations = set()
self.__job_scheduler_task = None
kwargs['name'] = 'dispycos_scheduler'
clean = kwargs.pop('clean', False)
nodes = kwargs.pop('nodes', [])
self.pycos = pycos.Pycos.instance(**kwargs)
self.__dest_path = os.path.join(self.pycos.dest_path, 'dispycos', 'dispycosscheduler')
if clean:
shutil.rmtree(self.__dest_path)
self.pycos.dest_path = self.__dest_path
self.__computation_sched_event = pycos.Event()
self.__computation_scheduler_task = SysTask(self.__computation_scheduler_proc, nodes)
self.__client_task = SysTask(self.__client_proc)
self.__timer_task = SysTask(self.__timer_proc)
Scheduler.__status_task = self.__status_task = SysTask(self.__status_proc)
self.__client_task.register('dispycos_scheduler')
self.pycos.discover_peers(port=self._node_port)
def sync_directory(src, dest, opts=None):
if os.path.exists(dest):
logging.debug('Removing existing directory: %s' % dest)
shutil.rmtree(dest)
logging.info('Syncing directory: %s -> %s.' % (src, dest))
shutil.copytree(src, dest, ignore=get_filter(opts))
ensure_init(dest)
def rmdir(d):
''' Remove an existingdirectory'''
if os.path.exists(d):
shutil.rmtree(d)
def gen_makeself(conf_dir,alias):
mkself_tmp = os.path.join(conf_dir,'tmp')
conf_mkself = os.path.join(conf_dir,'Installers')
if not os.path.exists(conf_mkself):
os.makedirs(conf_mkself)
if not os.path.exists(mkself_tmp):
os.makedirs(mkself_tmp)
if sys.platform.startswith('darwin'):
alias_app = os.path.join(conf_dir,'{}.app'.format(alias))
if os.path.exists(alias_app):
run_command('cp -R {} {}'.format(alias_app,mkself_tmp))
gen_osx_plist(alias,mkself_tmp)
gen_st_setup(alias,mkself_tmp)
mkself_installer = 'bash "{}" "{}" "{}/{}_Installer" "Stitch" bash st_setup.sh'.format(mkself_exe, mkself_tmp, conf_mkself,alias)
st_log.info(mkself_installer)
st_log.info(run_command(mkself_installer))
shutil.rmtree(mkself_tmp)
else:
binry_dir = os.path.join(conf_dir,'Binaries')
alias_dir = os.path.join(binry_dir, alias)
if os.path.exists(alias_dir):
run_command('cp -R {} {}'.format(alias_dir,mkself_tmp))
gen_lnx_daemon(alias,mkself_tmp)
gen_st_setup(alias,mkself_tmp)
mkself_installer = 'bash "{}" "{}" "{}/{}_Installer" "Stitch" bash st_setup.sh'.format(mkself_exe, mkself_tmp, conf_mkself,alias)
st_log.info(mkself_installer)
st_log.info(run_command(mkself_installer))
shutil.rmtree(mkself_tmp)
test_run_no_updates_available.py 文件源码
项目:pyupdater-wx-demo
作者: wettenhj
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def tearDown(self):
"""
Destroy the app
"""
if self.app:
self.app.frame.Hide()
self.app.OnCloseFrame(wx.PyEvent())
self.app.frame.Destroy()
del os.environ['PYUPDATER_FILESERVER_DIR']
del os.environ['WXUPDATEDEMO_TESTING']
shutil.rmtree(self.fileServerDir)
def tearDown(self):
"""
Destroy the app
"""
if self.app:
self.app.frame.Hide()
self.app.OnCloseFrame(wx.PyEvent())
self.app.frame.Destroy()
del os.environ['PYUPDATER_FILESERVER_DIR']
del os.environ['WXUPDATEDEMO_TESTING']
shutil.rmtree(self.fileServerDir)