python类rmtree()的实例源码

securetransport.py 文件源码 项目:googletranslate.popclipext 作者: wizyoung 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def close(self):
        # TODO: should I do clean shutdown here? Do I have to?
        if self._makefile_refs < 1:
            self._closed = True
            if self.context:
                CoreFoundation.CFRelease(self.context)
                self.context = None
            if self._client_cert_chain:
                CoreFoundation.CFRelease(self._client_cert_chain)
                self._client_cert_chain = None
            if self._keychain:
                Security.SecKeychainDelete(self._keychain)
                CoreFoundation.CFRelease(self._keychain)
                shutil.rmtree(self._keychain_dir)
                self._keychain = self._keychain_dir = None
            return self.socket.close()
        else:
            self._makefile_refs -= 1
ez_setup.py 文件源码 项目:ccu_and_eccu_publish 作者: gaofubin 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def archive_context(filename):
    # extracting the archive
    tmpdir = tempfile.mkdtemp()
    log.warn('Extracting in %s', tmpdir)
    old_wd = os.getcwd()
    try:
        os.chdir(tmpdir)
        with ContextualZipFile(filename) as archive:
            archive.extractall()

        # going in the directory
        subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
        os.chdir(subdir)
        log.warn('Now working in %s', subdir)
        yield

    finally:
        os.chdir(old_wd)
        shutil.rmtree(tmpdir)
clean_project.py 文件源码 项目:shift-detect 作者: paolodedios 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def clean_project_files(path_or_glob, logger) :
    """
    Resolve file name references and ensure they are properly deleted
    """
    if "*" in path_or_glob :
        files_to_clean = glob.glob(path_or_glob)
    else :
        files_to_clean = [os.path.expanduser(path_or_glob)]

    for file_to_clean in files_to_clean :
        if not os.path.exists(file_to_clean) :
            continue

        if os.path.isdir(file_to_clean) :
            logger.info("Removing directory {}".format(file_to_clean))
            shutil.rmtree(file_to_clean)
        else :
            logger.info("Removing file {}".format(file_to_clean))
            os.remove(file_to_clean)
powstream_utils.py 文件源码 项目:pscheduler 作者: perfsonar 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def cleanup_dir(tmpdir, keep_data_files=False, ignore_errors=False):
    if keep_data_files: return
    #Remove our tmpdir, but don't fail the test if it doesn't remove
    try:
        shutil.rmtree(tmpdir, ignore_errors=ignore_errors)
    except OSError as oe:
        error = ""
        if oe.errno: error = "%s: " % oe.errno
        if oe.strerror: error += oe.strerror
        if oe.filename: error += " (filename: %s)" % oe.filename
        log.warning("Unable to remove powstream temporary directory %s due to error reported by OS: %s" % (tmpdir, error))
    except:
        log.warning("Unable to remove powstream temporary directory %s: %s" % (tmpdir, sys.exc_info()[0]))

##
# Called by signal handlers to clean-up then exit
ui_test.py 文件源码 项目:gransk 作者: pcbje 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_get_picture(self):
    picture_root = os.path.join(
        MockRunMod.load_config(None)[helper.DATA_ROOT], '..', 'pictures')

    try:
      shutil.rmtree(picture_root)
    except:
      pass

    try:
      os.makedirs(picture_root)
    except:
      pass

    with open(os.path.join(picture_root, 'test.jpg'), 'wb') as out:
      out.write(b'abcde')

    rv = self.app.get('/picture?name=test.jpg&mediatype=image/jpeg')

    expected = b'abcde'
    actual = rv.data

    self.assertEqual(expected, actual)
copy_file_test.py 文件源码 项目:gransk 作者: pcbje 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_simple(self):
    mock_pipeline = test_helper.get_mock_pipeline([])

    data_root = os.path.join('local_data', 'unittests')

    if os.path.exists(data_root):
      shutil.rmtree(data_root)

    _copy = copy_file.Subscriber(mock_pipeline)
    _copy.setup({
        helper.DATA_ROOT: data_root,
        'workers': 1,
        'tag': 'default',
        helper.COPY_EXT: ['xyz']
    })

    _copy.consume(document.get_document('mock.xyz'), BytesIO(b'juba.'))
    _copy.consume(document.get_document('ignore.doc'), BytesIO(b'mock'))

    expected = ['39bbf948-mock.xyz']

    actual = os.listdir(os.path.join(data_root, 'files', 'xyz'))

    self.assertEqual(expected, actual)
store_text_test.py 文件源码 项目:gransk 作者: pcbje 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_simple(self):
    mock_pipeline = test_helper.get_mock_pipeline([])

    data_root = os.path.join('local_data', 'unittests')

    if os.path.exists(data_root):
      shutil.rmtree(data_root)

    _store_text = store_text.Subscriber(mock_pipeline)
    _store_text.setup({
        helper.DATA_ROOT: data_root,
        'workers': 1
    })

    doc = document.get_document('mock')
    doc.text = 'mock-mock-mock'

    _store_text.consume(doc, None)

    expected = 'local_data/unittests/text/17404a59-mock'
    actual = doc.meta['text_file']

    self.assertEquals(expected, actual)
test_logging.py 文件源码 项目:tensorboard 作者: dmlc 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_event_logging():
    logdir = './experiment/'
    summary_writer = FileWriter(logdir)
    scalar_value = 1.0
    s = scalar('test_scalar', scalar_value)
    summary_writer.add_summary(s, global_step=1)
    summary_writer.close()
    assert os.path.isdir(logdir)
    assert len(os.listdir(logdir)) == 1

    summary_writer = FileWriter(logdir)
    scalar_value = 1.0
    s = scalar('test_scalar', scalar_value)
    summary_writer.add_summary(s, global_step=1)
    summary_writer.close()
    assert os.path.isdir(logdir)
    assert len(os.listdir(logdir)) == 2

    # clean up.
    shutil.rmtree(logdir)
build.py 文件源码 项目:foxbms-setup 作者: foxBMS 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def clean(mcu_switch=None, supress_output=False):
    cmd = TOOLCHAIN_BASIC_CONFIGURE +  ' '
    if mcu_switch is None:
        sphinx_build_dir = os.path.join('build', 'sphinx')
        if os.path.isdir(sphinx_build_dir):
            shutil.rmtree(sphinx_build_dir)
            print "Successfully removed sphinx documentation"
        else:
            print 'Nothing to clean...'
        return
    elif mcu_switch == '-p' or mcu_switch == '-s' or  mcu_switch == '-b' :
        cmd += ' ' + mcu_switch + ' ' + 'clean'
    else:
        print 'Invalid clean argument: \'{}\''.format(mcu_switch)
        sys.exit(1)
    start_process(cmd, supress_output)
conftest.py 文件源码 项目:saapy 作者: ashapochka 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def neo4j_test_ws_dir(datafiles):
    return datafiles


# @pytest.fixture(scope="session")
# def workspace(request, data_directory):
#     wsconf_file = data_directory.join("workspace.yaml")
#     temp_root = tempfile.mkdtemp()
#     ws = Workspace("saapy-test-ws",
#                    temp_root,
#                    "saapy-test-ws",
#                    configuration_text=wsconf_file.read_text("utf-8"))
#
#     def fin():
#         shutil.rmtree(temp_root)
#
#     request.addfinalizer(fin)
#     return ws  # provide the fixture value
ez_setup.py 文件源码 项目:Adafruit_Python_MCP4725 作者: adafruit 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def archive_context(filename):
    # extracting the archive
    tmpdir = tempfile.mkdtemp()
    log.warn('Extracting in %s', tmpdir)
    old_wd = os.getcwd()
    try:
        os.chdir(tmpdir)
        with get_zip_class()(filename) as archive:
            archive.extractall()

        # going in the directory
        subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
        os.chdir(subdir)
        log.warn('Now working in %s', subdir)
        yield

    finally:
        os.chdir(old_wd)
        shutil.rmtree(tmpdir)
docker.py 文件源码 项目:packagecore 作者: BytePackager 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def stop(self):
        print("Cleaning up self %s." % self.getName())
        # kill the running self
        _uncheckedDockerCommand(["kill", self.getName()])

        # remove self
        _uncheckedDockerCommand(["rm", self.getName()])

        # remove image -- later may be a preference to keep it
        #_uncheckedDockerCommand(["rmi", self.getImageName()])

        # remove shared directory
        try:
            if "BP_LEAVE_FILES" in os.environ:
                pass
            else:
                shutil.rmtree(self.getSharedDir())
        except OSError:
            print("Warning: failed to remove shared directory.", file=sys.stderr)
output_match_test.py 文件源码 项目:sketch-components 作者: ibhubs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def compare_component_output(self, input_path, expected_output_path):
        rendering_engine = self.get_rendering_engine()
        temp_dir = tempfile.gettempdir()
        output_dir = os.path.join(temp_dir, str(uuid.uuid4()))
        process_sketch_archive(zip_path=input_path, compress_zip=False,
                               output_path=output_dir, engine=rendering_engine)
        self.assertTrue(dircmp.is_same(expected_output_path, output_dir))
        shutil.rmtree(output_dir)
        storage.clear()
        output_zip = os.path.join(temp_dir, "{}.zip".format(str(uuid.uuid4())))
        process_sketch_archive(zip_path=input_path, compress_zip=True,
                               output_path=output_zip, engine=rendering_engine)
        z = zipfile.ZipFile(output_zip)
        z.extractall(output_dir)
        self.assertTrue(dircmp.is_same(expected_output_path, output_dir))
        shutil.rmtree(output_dir)
        os.remove(output_zip)
dispycos.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, **kwargs):
        self.__class__._instance = self
        self._nodes = {}
        self._disabled_nodes = {}
        self._avail_nodes = set()
        self._nodes_avail = pycos.Event()
        self._nodes_avail.clear()
        self._shared = False

        self._cur_computation = None
        self.__cur_client_auth = None
        self.__cur_node_allocations = []
        self.__pulse_interval = kwargs.pop('pulse_interval', MaxPulseInterval)
        self.__ping_interval = kwargs.pop('ping_interval', 0)
        self.__zombie_period = kwargs.pop('zombie_period', 100 * MaxPulseInterval)
        self._node_port = kwargs.pop('dispycosnode_port', 51351)
        self.__server_locations = set()
        self.__job_scheduler_task = None

        kwargs['name'] = 'dispycos_scheduler'
        clean = kwargs.pop('clean', False)
        nodes = kwargs.pop('nodes', [])
        self.pycos = pycos.Pycos.instance(**kwargs)
        self.__dest_path = os.path.join(self.pycos.dest_path, 'dispycos', 'dispycosscheduler')
        if clean:
            shutil.rmtree(self.__dest_path)
        self.pycos.dest_path = self.__dest_path

        self.__computation_sched_event = pycos.Event()
        self.__computation_scheduler_task = SysTask(self.__computation_scheduler_proc, nodes)
        self.__client_task = SysTask(self.__client_proc)
        self.__timer_task = SysTask(self.__timer_proc)
        Scheduler.__status_task = self.__status_task = SysTask(self.__status_proc)
        self.__client_task.register('dispycos_scheduler')
        self.pycos.discover_peers(port=self._node_port)
dispycos.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self, **kwargs):
        self.__class__._instance = self
        self._nodes = {}
        self._disabled_nodes = {}
        self._avail_nodes = set()
        self._nodes_avail = pycos.Event()
        self._nodes_avail.clear()
        self._shared = False

        self._cur_computation = None
        self.__cur_client_auth = None
        self.__cur_node_allocations = []
        self.__pulse_interval = kwargs.pop('pulse_interval', MaxPulseInterval)
        self.__ping_interval = kwargs.pop('ping_interval', 0)
        self.__zombie_period = kwargs.pop('zombie_period', 100 * MaxPulseInterval)
        self._node_port = kwargs.pop('dispycosnode_port', 51351)
        self.__server_locations = set()
        self.__job_scheduler_task = None

        kwargs['name'] = 'dispycos_scheduler'
        clean = kwargs.pop('clean', False)
        nodes = kwargs.pop('nodes', [])
        self.pycos = pycos.Pycos.instance(**kwargs)
        self.__dest_path = os.path.join(self.pycos.dest_path, 'dispycos', 'dispycosscheduler')
        if clean:
            shutil.rmtree(self.__dest_path)
        self.pycos.dest_path = self.__dest_path

        self.__computation_sched_event = pycos.Event()
        self.__computation_scheduler_task = SysTask(self.__computation_scheduler_proc, nodes)
        self.__client_task = SysTask(self.__client_proc)
        self.__timer_task = SysTask(self.__timer_proc)
        Scheduler.__status_task = self.__status_task = SysTask(self.__status_proc)
        self.__client_task.register('dispycos_scheduler')
        self.pycos.discover_peers(port=self._node_port)
charm_helpers_sync.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def sync_directory(src, dest, opts=None):
    if os.path.exists(dest):
        logging.debug('Removing existing directory: %s' % dest)
        shutil.rmtree(dest)
    logging.info('Syncing directory: %s -> %s.' % (src, dest))

    shutil.copytree(src, dest, ignore=get_filter(opts))
    ensure_init(dest)
data_io.py 文件源码 项目:AutoML5 作者: djajetic 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def rmdir(d):
    ''' Remove an existingdirectory'''
    if os.path.exists(d):
        shutil.rmtree(d)
makeself.py 文件源码 项目:Stitch 作者: nathanlopez 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def gen_makeself(conf_dir,alias):
    mkself_tmp = os.path.join(conf_dir,'tmp')
    conf_mkself = os.path.join(conf_dir,'Installers')
    if not os.path.exists(conf_mkself):
        os.makedirs(conf_mkself)
    if not os.path.exists(mkself_tmp):
        os.makedirs(mkself_tmp)
    if sys.platform.startswith('darwin'):
        alias_app = os.path.join(conf_dir,'{}.app'.format(alias))
        if os.path.exists(alias_app):
            run_command('cp -R {} {}'.format(alias_app,mkself_tmp))
            gen_osx_plist(alias,mkself_tmp)
            gen_st_setup(alias,mkself_tmp)
            mkself_installer = 'bash "{}" "{}" "{}/{}_Installer" "Stitch" bash st_setup.sh'.format(mkself_exe, mkself_tmp, conf_mkself,alias)
            st_log.info(mkself_installer)
            st_log.info(run_command(mkself_installer))
            shutil.rmtree(mkself_tmp)
    else:
        binry_dir = os.path.join(conf_dir,'Binaries')
        alias_dir = os.path.join(binry_dir, alias)
        if os.path.exists(alias_dir):
            run_command('cp -R {} {}'.format(alias_dir,mkself_tmp))
            gen_lnx_daemon(alias,mkself_tmp)
            gen_st_setup(alias,mkself_tmp)
            mkself_installer = 'bash "{}" "{}" "{}/{}_Installer" "Stitch" bash st_setup.sh'.format(mkself_exe, mkself_tmp, conf_mkself,alias)
            st_log.info(mkself_installer)
            st_log.info(run_command(mkself_installer))
            shutil.rmtree(mkself_tmp)
test_run_no_updates_available.py 文件源码 项目:pyupdater-wx-demo 作者: wettenhj 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def tearDown(self):
        """
        Destroy the app
        """
        if self.app:
            self.app.frame.Hide()
            self.app.OnCloseFrame(wx.PyEvent())
            self.app.frame.Destroy()
        del os.environ['PYUPDATER_FILESERVER_DIR']
        del os.environ['WXUPDATEDEMO_TESTING']
        shutil.rmtree(self.fileServerDir)
test_run_update_available.py 文件源码 项目:pyupdater-wx-demo 作者: wettenhj 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def tearDown(self):
        """
        Destroy the app
        """
        if self.app:
            self.app.frame.Hide()
            self.app.OnCloseFrame(wx.PyEvent())
            self.app.frame.Destroy()
        del os.environ['PYUPDATER_FILESERVER_DIR']
        del os.environ['WXUPDATEDEMO_TESTING']
        shutil.rmtree(self.fileServerDir)


问题


面经


文章

微信
公众号

扫码关注公众号