python类remove()的实例源码

download.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _copy_file(filename, location, link):
    copy = True
    download_location = os.path.join(location, link.filename)
    if os.path.exists(download_location):
        response = ask_path_exists(
            'The file %s exists. (i)gnore, (w)ipe, (b)ackup, (a)abort' %
            display_path(download_location), ('i', 'w', 'b', 'a'))
        if response == 'i':
            copy = False
        elif response == 'w':
            logger.warning('Deleting %s', display_path(download_location))
            os.remove(download_location)
        elif response == 'b':
            dest_file = backup_dir(download_location)
            logger.warning(
                'Backing up %s to %s',
                display_path(download_location),
                display_path(dest_file),
            )
            shutil.move(download_location, dest_file)
        elif response == 'a':
            sys.exit(-1)
    if copy:
        shutil.copy(filename, download_location)
        logger.info('Saved %s', display_path(download_location))
bdist_egg.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 56 收藏 0 点赞 0 评论 0
def zap_pyfiles(self):
        log.info("Removing .py files from temporary directory")
        for base, dirs, files in walk_egg(self.bdist_dir):
            for name in files:
                path = os.path.join(base, name)

                if name.endswith('.py'):
                    log.debug("Deleting %s", path)
                    os.unlink(path)

                if base.endswith('__pycache__'):
                    path_old = path

                    pattern = r'(?P<name>.+)\.(?P<magic>[^.]+)\.pyc'
                    m = re.match(pattern, name)
                    path_new = os.path.join(base, os.pardir, m.group('name') + '.pyc')
                    log.info("Renaming file from [%s] to [%s]" % (path_old, path_new))
                    try:
                        os.remove(path_new)
                    except OSError:
                        pass
                    os.rename(path_old, path_new)
status.py 文件源码 项目:picoCTF 作者: picoCTF 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def clean(args, config):
    """ Main entrypoint for clean """

    lock_file = join(HACKSPORTS_ROOT, "deploy.lock")

    # remove staging directories
    if os.path.isdir(STAGING_ROOT):
        logger.info("Removing the staging directories")
        shutil.rmtree(STAGING_ROOT)

    # remove lock file
    if os.path.isfile(lock_file):
        logger.info("Removing the stale lock file")
        os.remove(lock_file)

    #TODO: potentially perform more cleaning
windows.py 文件源码 项目:NeoVintageous 作者: NeoVintageous 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def filter_region(view, txt, command):
    try:
        contents = tempfile.NamedTemporaryFile(suffix='.txt', delete=False)
        contents.write(txt.encode('utf-8'))
        contents.close()

        script = tempfile.NamedTemporaryFile(suffix='.bat', delete=False)
        script.write(('@echo off\ntype %s | %s' % (contents.name, command)).encode('utf-8'))
        script.close()

        p = subprocess.Popen([script.name],
                             stdout=PIPE,
                             stderr=PIPE,
                             startupinfo=get_startup_info())

        out, err = p.communicate()
        return (out or err).decode(get_oem_cp()).replace('\r\n', '\n')[:-1].strip()
    finally:
        os.remove(script.name)
        os.remove(contents.name)
test_15_LoggingConfig.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def test_comp_macro_directories_config_python(self):
        file_loc = os.getcwd()
        self.comp = sb.launch(self.cname, impl="python", execparams={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/logconfig.cfg'} )
        fp = None
        try:
            fp = open('foo/bar/test.log','r')
        except:
            pass
        try:
            os.remove('foo/bar/test.log')
        except:
            pass
        try:
            os.rmdir('foo/bar')
        except:
            pass
        try:
            os.rmdir('foo')
        except:
            pass
        self.assertNotEquals(fp, None)
test_15_LoggingConfig.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 51 收藏 0 点赞 0 评论 0
def test_comp_macro_directories_config_cpp(self):
        file_loc = os.getcwd()
        self.comp = sb.launch(self.cname, impl="cpp", execparams={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/logconfig.cfg'} )
        fp = None
        try:
            fp = open('foo/bar/test.log','r')
        except:
            pass
        try:
            os.remove('foo/bar/test.log')
        except:
            pass
        try:
            os.rmdir('foo/bar')
        except:
            pass
        try:
            os.rmdir('foo')
        except:
            pass
        self.assertNotEquals(fp, None)
test_15_LoggingConfig.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_comp_macro_directories_config_java(self):
        file_loc = os.getcwd()
        self.comp = sb.launch(self.cname, impl="java", execparams={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/logconfig.cfg'} )
        fp = None
        try:
            fp = open('foo/bar/test.log','r')
        except:
            pass
        try:
            os.remove('foo/bar/test.log')
        except:
            pass
        try:
            os.rmdir('foo/bar')
        except:
            pass
        try:
            os.rmdir('foo')
        except:
            pass
        self.assertNotEquals(fp, None)
test_07_PythonParsers.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_SADParser(self):
        sad = parsers.SADParser.parse("sdr/dom/waveforms/CommandWrapperWithPropertyOverride/CommandWrapper.sad.xml")
        self.assertEqual(sad.get_id(), "DCE:d206ab51-6342-4976-bac3-55e6902f3489")
        self.assertEqual(sad.get_name(), "CommandWrapperWithPropertyOverride")
        self.assertEqual(len(sad.componentfiles.get_componentfile()), 1)
        self.assertEqual(len(sad.partitioning.get_componentplacement()), 1)
        self.assertEqual(sad.partitioning.get_componentplacement()[0].componentfileref.refid, "CommandWrapper_592b8bd6-b011-4468-9417-705af45e907b")
        self.assertEqual(sad.partitioning.get_componentplacement()[0].get_componentinstantiation()[0].id_, "DCE:8c129782-a6a4-4095-8212-757f01de0c09")
        self.assertEqual(sad.partitioning.get_componentplacement()[0].get_componentinstantiation()[0].get_usagename(), "CommandWrapper1")
        self.assertEqual(sad.partitioning.get_componentplacement()[0].get_componentinstantiation()[0].componentproperties.get_simpleref()[0].refid, "DCE:a4e7b230-1d17-4a86-aeff-ddc6ea3df26e")
        self.assertEqual(sad.partitioning.get_componentplacement()[0].get_componentinstantiation()[0].componentproperties.get_simpleref()[0].value, "/bin/date")

        # Verify that we can write the output and still be DTD valid
        tmpfile = tempfile.mktemp()
        try:
            tmp = open(tmpfile, "w")
            sad.export(tmp, 0)
            tmp.close()
            status = self._xmllint(tmpfile, "SAD")
            self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML")
        finally:
            try:
                os.remove(tmpfile)
            except OSError:
                pass
ln2sql.py 文件源码 项目:ln2sql 作者: FerreroJeremy 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def remove_json(self, filename="output.json"):
        if os.path.exists(filename):
            os.remove(filename)
dispycos_client4.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def rtask_proc(task=None):
    import os
    # receive object from client_proc task
    cobj = yield task.receive()
    if not cobj:
        raise StopIteration
    # Input file is already copied at where this rtask is running (by client).
    # For given input file, create an output file with each line in the output
    # file computed as length of corresponding line in input file
    cobj.result_file = 'result-%s' % cobj.data_file
    with open(cobj.data_file, 'r') as data_fd:
        with open(cobj.result_file, 'w') as result_fd:
            for lineno, line in enumerate(data_fd, start=1):
                result_fd.write('%d: %d\n' % (lineno, len(line)-1))
    # 'sleep' to simulate computing
    yield task.sleep(cobj.n)
    # transfer the result file to client
    status = yield pycos.Pycos().send_file(cobj.client.location, cobj.result_file,
                                                 overwrite=True, timeout=30)
    if status:
        print('Could not send %s to %s' % (cobj.result_file, cobj.client.location))
        cobj.result_file = None
    cobj.client.send(cobj)
    os.remove(cobj.data_file)
    os.remove(cobj.result_file)


# this generator function is used to create local task (at the client) to
# communicate with a remote task
netpycos.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def _udp_proc(self, location, addrinfo, task=None):
        """
        Internal use only.
        """
        task.set_daemon()
        sock = addrinfo.udp_sock
        while 1:
            msg, addr = yield sock.recvfrom(1024)
            if not msg.startswith('ping:'):
                logger.warning('ignoring UDP message from %s:%s', addr[0], addr[1])
                continue
            try:
                ping_info = deserialize(msg[len('ping:'):])
            except:
                continue
            peer_location = ping_info.get('location', None)
            if not isinstance(peer_location, Location) or peer_location in self._locations:
                continue
            if ping_info['version'] != __version__:
                logger.warning('Peer %s version %s is not %s',
                               peer_location, ping_info['version'], __version__)
                continue
            if self._ignore_peers:
                continue
            if self._secret is None:
                auth_code = None
            else:
                auth_code = ping_info.get('signature', '') + self._secret
                auth_code = hashlib.sha1(auth_code.encode()).hexdigest()
            _Peer._lock.acquire()
            peer = _Peer.peers.get((peer_location.addr, peer_location.port), None)
            _Peer._lock.release()
            if peer and peer.auth != auth_code:
                _Peer.remove(peer_location)
                peer = None

            if not peer:
                SysTask(self._acquaint_, peer_location, ping_info['signature'], addrinfo)
netpycos.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def remove(location):
        _Peer._lock.acquire()
        peer = _Peer.peers.pop((location.addr, location.port), None)
        _Peer._lock.release()
        if peer:
            logger.debug('%s: peer %s terminated', peer.addrinfo.location, peer.location)
            peer.stream = False
            RTI._peer_closed_(peer.location)
            _Peer._sign_locations.pop(peer.signature, None)
            if peer.req_task:
                peer.req_task.terminate()
            if _Peer.status_task:
                _Peer.status_task.send(PeerStatus(peer.location, peer.name, PeerStatus.Offline))
dispycos_client4.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def rtask_proc(task=None):
    import os
    # receive object from client_proc task
    cobj = yield task.receive()
    if not cobj:
        raise StopIteration
    # Input file is already copied at where this rtask is running (by client).
    # For given input file, create an output file with each line in the output
    # file computed as length of corresponding line in input file
    cobj.result_file = 'result-%s' % cobj.data_file
    with open(cobj.data_file, 'r') as data_fd:
        with open(cobj.result_file, 'w') as result_fd:
            for lineno, line in enumerate(data_fd, start=1):
                result_fd.write('%d: %d\n' % (lineno, len(line)-1))
    # 'sleep' to simulate computing
    yield task.sleep(cobj.n)
    # transfer the result file to client
    status = yield pycos.Pycos().send_file(cobj.client.location, cobj.result_file,
                                                 overwrite=True, timeout=30)
    if status:
        print('Could not send %s to %s' % (cobj.result_file, cobj.client.location))
        cobj.result_file = None
    cobj.client.send(cobj)
    os.remove(cobj.data_file)
    os.remove(cobj.result_file)


# this generator function is used to create local task (at the client) to
# communicate with a remote task
netpycos.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 56 收藏 0 点赞 0 评论 0
def _udp_proc(self, location, addrinfo, task=None):
        """
        Internal use only.
        """
        task.set_daemon()
        sock = addrinfo.udp_sock
        while 1:
            msg, addr = yield sock.recvfrom(1024)
            if not msg.startswith(b'ping:'):
                logger.warning('ignoring UDP message from %s:%s', addr[0], addr[1])
                continue
            try:
                ping_info = deserialize(msg[len(b'ping:'):])
            except:
                continue
            peer_location = ping_info.get('location', None)
            if not isinstance(peer_location, Location) or peer_location in self._locations:
                continue
            if ping_info['version'] != __version__:
                logger.warning('Peer %s version %s is not %s',
                               peer_location, ping_info['version'], __version__)
                continue
            if self._ignore_peers:
                continue
            if self._secret is None:
                auth_code = None
            else:
                auth_code = ping_info.get('signature', '') + self._secret
                auth_code = hashlib.sha1(auth_code.encode()).hexdigest()
            _Peer._lock.acquire()
            peer = _Peer.peers.get((peer_location.addr, peer_location.port), None)
            _Peer._lock.release()
            if peer and peer.auth != auth_code:
                _Peer.remove(peer_location)
                peer = None

            if not peer:
                SysTask(self._acquaint_, peer_location, ping_info['signature'], addrinfo)
netpycos.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def remove(location):
        _Peer._lock.acquire()
        peer = _Peer.peers.pop((location.addr, location.port), None)
        _Peer._lock.release()
        if peer:
            logger.debug('%s: peer %s terminated', peer.addrinfo.location, peer.location)
            RTI._peer_closed_(peer.location)
            peer.stream = False
            _Peer._sign_locations.pop(peer.signature, None)
            if peer.req_task:
                peer.req_task.terminate()
            if _Peer.status_task:
                _Peer.status_task.send(PeerStatus(peer.location, peer.name, PeerStatus.Offline))
dispycos_client4.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def rtask_proc(task=None):
    import os
    # receive object from client_proc task
    cobj = yield task.receive()
    if not cobj:
        raise StopIteration
    # Input file is already copied at where this rtask is running (by client).
    # For given input file, create an output file with each line in the output
    # file computed as length of corresponding line in input file
    cobj.result_file = 'result-%s' % cobj.data_file
    with open(cobj.data_file, 'r') as data_fd:
        with open(cobj.result_file, 'w') as result_fd:
            for lineno, line in enumerate(data_fd, start=1):
                result_fd.write('%d: %d\n' % (lineno, len(line)-1))
    # 'sleep' to simulate computing
    yield task.sleep(cobj.n)
    # transfer the result file to client
    status = yield pycos.Pycos().send_file(cobj.client.location, cobj.result_file,
                                                 overwrite=True, timeout=30)
    if status:
        print('Could not send %s to %s' % (cobj.result_file, cobj.client.location))
        cobj.result_file = None
    cobj.client.send(cobj)
    os.remove(cobj.data_file)
    os.remove(cobj.result_file)


# this generator function is used to create local task (at the client) to
# communicate with a remote task
project.py 文件源码 项目:pygrunt 作者: elementbound 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def unhook_stage(self, stage, hook):
        self.stage_hooks[self._hook_key(stage)].remove(hook)

    # Run project
hookenv.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def relation_set(relation_id=None, relation_settings=None, **kwargs):
    """Set relation information for the current unit"""
    relation_settings = relation_settings if relation_settings else {}
    relation_cmd_line = ['relation-set']
    accepts_file = "--file" in subprocess.check_output(
        relation_cmd_line + ["--help"], universal_newlines=True)
    if relation_id is not None:
        relation_cmd_line.extend(('-r', relation_id))
    settings = relation_settings.copy()
    settings.update(kwargs)
    for key, value in settings.items():
        # Force value to be a string: it always should, but some call
        # sites pass in things like dicts or numbers.
        if value is not None:
            settings[key] = "{}".format(value)
    if accepts_file:
        # --file was introduced in Juju 1.23.2. Use it by default if
        # available, since otherwise we'll break if the relation data is
        # too big. Ideally we should tell relation-set to read the data from
        # stdin, but that feature is broken in 1.23.2: Bug #1454678.
        with tempfile.NamedTemporaryFile(delete=False) as settings_file:
            settings_file.write(yaml.safe_dump(settings).encode("utf-8"))
        subprocess.check_call(
            relation_cmd_line + ["--file", settings_file.name])
        os.remove(settings_file.name)
    else:
        for key, value in settings.items():
            if value is None:
                relation_cmd_line.append('{}='.format(key))
            else:
                relation_cmd_line.append('{}={}'.format(key, value))
        subprocess.check_call(relation_cmd_line)
    # Flush cache of any relation-gets for local unit
    flush(local_unit())
ceph.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def delete_keyring(service):
    """Delete an existing Ceph keyring."""
    keyring = _keyring_path(service)
    if not os.path.exists(keyring):
        log('Keyring does not exist at %s' % keyring, level=WARNING)
        return

    os.remove(keyring)
    log('Deleted ring at %s.' % keyring, level=INFO)
mainplugin.py 文件源码 项目:qgis_wp 作者: Zverik 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def openOSM(self, filename=None):
        """Converts an OSM file to GeoPackage, loads and styles it."""
        if not filename:
            filename = QFileDialog.getOpenFileName(
                parent=None,
                caption=self.tr(u'Select OpenStreetMap file'),
                filter=self.tr(u'OSM or GeoPackage File') + u' (*.osm *.pbf *.gpkg)')
        if not filename or not os.path.isfile(filename):
            return
        filename = os.path.abspath(filename)
        gpkgFile = os.path.splitext(filename)[0] + '.gpkg'
        if filename.endswith('.gpkg'):
            self.openGeoPackage(filename)
            return

        if os.path.isfile(gpkgFile):
            os.remove(gpkgFile)
        if isWindows():
            cmd = ['cmd.exe', '/C', 'ogr2ogr.exe']
        else:
            cmd = ['ogr2ogr']

        cmd.extend(['--config', 'OSM_USE_CUSTOM_INDEXING', 'NO'])
        iniFile = os.path.join(self.path, 'res', 'osmconf.ini')
        cmd.extend(['--config', 'OSM_CONFIG_FILE', iniFile])
        cmd.extend(['-t_srs', 'EPSG:3857'])
        cmd.extend(['-overwrite'])
        cmd.extend(['-f', 'GPKG', gpkgFile, filename])
        try:
            GdalUtils.runGdal(cmd, ProgressMock())
        except IOError as e:
            self.iface.messageBar().pushCritical(
                self.tr(u'Open OSM Data'), self.tr(u'Error running ogr2ogr: {}').format(e))
            return
        if 'FAILURE' in GdalUtils.consoleOutput:
            self.iface.messageBar().pushCritical(
                self.tr(u'Open OSM Data'), self.tr(u'Error converting OSM to GeoPackage.'))
            return
        self.openGeoPackage(gpkgFile)


问题


面经


文章

微信
公众号

扫码关注公众号