python类replace()的实例源码

state.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def __save(self):
        if self.__asynchronous == 0:
            state = {
                "version" : _BobState.CUR_VERSION,
                "byNameDirs" : self.__byNameDirs,
                "results" : self.__results,
                "inputs" : self.__inputs,
                "jenkins" : self.__jenkins,
                "dirStates" : self.__dirStates,
                "buildState" : self.__buildState,
            }
            tmpFile = self.__path+".new"
            try:
                with open(tmpFile, "wb") as f:
                    pickle.dump(state, f)
                    f.flush()
                    os.fsync(f.fileno())
                os.replace(tmpFile, self.__path)
            except OSError as e:
                raise ParseError("Error saving workspace state: " + str(e))
            self.__dirty = False
        else:
            self.__dirty = True
dataIO.py 文件源码 项目:youtube 作者: FishyFing 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def save_json(self, filename, data):
        """Atomically saves json file"""
        rnd = randint(1000, 9999)
        path, ext = os.path.splitext(filename)
        tmp_file = "{}-{}.tmp".format(path, rnd)
        self._save_json(tmp_file, data)
        try:
            self._read_json(tmp_file)
        except json.decoder.JSONDecodeError:
            self.logger.exception("Attempted to write file {} but JSON "
                                  "integrity check on tmp file has failed. "
                                  "The original file is unaltered."
                                  "".format(filename))
            return False
        os.replace(tmp_file, filename)
        return True
window.py 文件源码 项目:Quiver 作者: DeflatedPickle 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def zip_file(self):
        amount = functions.folder_files(self.parent.directory)
        progress = dialog.ProgressWindow(self.parent, title="Zipping Pack", maximum=amount)

        count = 0

        with zipfile.ZipFile(self.parent.directory + ".zip", "w") as z:
            for root, dirs, files in os.walk(self.parent.directory.replace("\\", "/"), topdown=True):
                new_root = root.replace("\\", "/").split("/")
                # print(root, dirs, files)
                for name in files:
                    z.write(os.path.join(root, name),
                            "/".join(new_root[new_root.index(self.parent.directory.split("/")[-1]) + 1:]) + "/" + name)

                    count += 1
                    progress.variable_name.set("Current File: " + name)
                    progress.variable_percent.set("{}% Complete".format(round(100 * float(count) / float(amount))))
                    progress.variable_progress.set(progress.variable_progress.get() + 1)

            z.close()

        progress.destroy()
        messagebox.showinfo(title="Information", message="Zipping complete.")
__init__.py 文件源码 项目:srctools 作者: TeamSpen210 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __exit__(self, exc_type, exc_value, tback):
        # Pass to tempfile, which also closes().
        temp_path = self.temp.name
        self.temp.__exit__(exc_type, exc_value, tback)
        self.temp = None
        if exc_type is not None:
            # An exception occurred, clean up.
            try:
                _os.remove(temp_path)
            except FileNotFoundError:
                pass
        else:
            # No exception, commit changes
            _os.replace(temp_path, self.filename)

        return False  # Don't cancel the exception.


# Import these, so people can reference 'srctools.Vec' instead of
# 'srctools.vec.Vec'.
# Should be done after other code, so everything's initialised.
# Not all classes are imported, just most-used ones.
dataIO.py 文件源码 项目:selfbot 作者: Discord-ian 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def save_json(self, filename, data):
        """Atomically saves json file"""
        rnd = randint(1000, 9999)
        path, ext = os.path.splitext(filename)
        tmp_file = "{}-{}.tmp".format(path, rnd)
        self._save_json(tmp_file, data)
        try:
            self._read_json(tmp_file)
        except json.decoder.JSONDecodeError:
            self.logger.exception("Attempted to write file {} but JSON "
                                  "integrity check on tmp file has failed. "
                                  "The original file is unaltered."
                                  "".format(filename))
            return False
        os.replace(tmp_file, filename)
        return True
utils.py 文件源码 项目:hatch 作者: ofek 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def temp_move_path(path, d):
    if os.path.exists(path):
        dst = shutil.move(path, d)

        try:
            yield dst
        finally:
            try:
                os.replace(dst, path)
            except OSError:  # no cov
                shutil.move(dst, path)
    else:
        try:
            yield
        finally:
            remove_path(path)
fileset.py 文件源码 项目:nimp 作者: dontnod 项目源码 文件源码 阅读 84 收藏 0 点赞 0 评论 0
def _run_fileset(self, env, file_mapper):
        stash_dir = env.format('{root_dir}/.nimp/stash')
        nimp.system.safe_makedirs(stash_dir)

        stash_file = os.path.join(stash_dir, env.fileset)
        nimp.system.safe_delete(stash_file)

        with open(stash_file, 'w') as stash:
            for src, _ in file_mapper():
                src = nimp.system.sanitize_path(src)
                if not os.path.isfile(src):
                    continue
                if src.endswith('.stash'):
                    continue
                md5 = hashlib.md5(src.encode('utf8')).hexdigest()
                os.replace(src, os.path.join(stash_dir, md5))
                logging.info('Stashing %s as %s', src, md5)
                stash.write('%s %s\n' % (md5, src))

        return True
fileset.py 文件源码 项目:nimp 作者: dontnod 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _run_fileset(self, env, file_mapper):
        stash_dir = env.format('{root_dir}/.nimp/stash')
        stash_file = os.path.join(stash_dir, env.fileset)

        success = True
        with open(stash_file, 'r') as stash:
            for dst in stash.readlines():
                try:
                    md5, dst = dst.strip().split()
                    src = os.path.join(stash_dir, md5)
                    logging.info('Unstashing %s as %s', md5, dst)
                    nimp.system.safe_delete(dst)
                    os.replace(src, dst)
                except Exception as ex: #pylint: disable=broad-except
                    logging.error(ex)
                    success = False
        nimp.system.safe_delete(stash_file)

        return success
atomic_write.py 文件源码 项目:gym 作者: openai 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def atomic_write(filepath, binary=False, fsync=False):
    """ Writeable file object that atomically updates a file (using a temporary file). In some cases (namely Python < 3.3 on Windows), this could result in an existing file being temporarily unlinked.

    :param filepath: the file path to be opened
    :param binary: whether to open the file in a binary mode instead of textual
    :param fsync: whether to force write the file to disk
    """

    tmppath = filepath + '~'
    while os.path.isfile(tmppath):
        tmppath += '~'
    try:
        with open(tmppath, 'wb' if binary else 'w') as file:
            yield file
            if fsync:
                file.flush()
                os.fsync(file.fileno())
        replace(tmppath, filepath)
    finally:
        try:
            os.remove(tmppath)
        except (IOError, OSError):
            pass
dataIO.py 文件源码 项目:goldmine 作者: Armored-Dragon 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def save_json(self, filename, data):
        """Atomically saves json file"""
        rnd = randint(1000, 9999)
        path, ext = os.path.splitext(filename)
        tmp_file = "{}-{}.tmp".format(path, rnd)
        self._save_json(tmp_file, data)
        try:
            self._read_json(tmp_file)
        except json.decoder.JSONDecodeError:
            self.logger.exception("Attempted to write file {} but JSON "
                                  "integrity check on tmp file has failed. "
                                  "The original file is unaltered."
                                  "".format(filename))
            return False
        os.replace(tmp_file, filename)
        return True
dataIO.py 文件源码 项目:Shallus-Bot 作者: cgropp 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def save_json(self, filename, data):
        """Atomically saves json file"""
        rnd = randint(1000, 9999)
        path, ext = os.path.splitext(filename)
        tmp_file = "{}-{}.tmp".format(path, rnd)
        self._save_json(tmp_file, data)
        try:
            self._read_json(tmp_file)
        except json.decoder.JSONDecodeError:
            self.logger.exception("Attempted to write file {} but JSON "
                                  "integrity check on tmp file has failed. "
                                  "The original file is unaltered."
                                  "".format(filename))
            return False
        os.replace(tmp_file, filename)
        return True
myonichi.py 文件源码 项目:showroom 作者: wlerin 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def rename(srcpath, destpath, data):
    import glob
    import os

    episodes = sorted(data.keys())
    name_pattern = '{date} Showroom - AKB48 no Myonichi Yoroshiku! #{ep} ({name}).mp4'
    long_date_pattern = '20{}-{}-{}'

    for file in glob.glob('{}/*.mp4'.format(srcpath)):
        match = file_re.match(os.path.basename(file))
        date = match.groupdict()['date']
        long_date = long_date_pattern.format(*[date[i:i+2] for i in range(0, 6, 2)])
        new_file = name_pattern.format(
            date=date,
            ep=episodes.index(long_date)+1,
            name=data[long_date]['engName'],
        )
        os.replace(
            file,
            '{}/{}'.format(destpath, new_file)
        )
prune.py 文件源码 项目:showroom 作者: wlerin 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def prune_folder(folder, needed_list):
    oldcwd = os.getcwd()
    os.chdir(folder)

    os.makedirs('unneeded'.format(folder), exist_ok=True)
    files = glob.glob('*.mp4'.format(folder))
    for file in files:
        # print(repr(file))
        if file not in needed_list:
            # print('{} -> {}'.format(file, 'unneeded/{}'.format(file)))
            os.replace(file, 'unneeded/{}'.format(file))
        else:
            # print('Needed:', repr(file))
            pass

    os.chdir(oldcwd)
core.py 文件源码 项目:showroom 作者: wlerin 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def update_streaming_url_web(self):
        """Updates streaming urls from the showroom website.

        Fallback if api changes again"""
        r = self._session.get(self._room.long_url)

        if r.ok:
            match = hls_url_re1.search(r.text)
            # TODO: check if there was a match
            if not match:
                # no url found in the page
                # probably the stream has ended but is_live returned true
                # just don't update the urls
                # except what happens if they are still "" ?
                return
            hls_url = match.group(0)
            rtmps_url = match.group(1).replace('https', 'rtmps')
            rtmp_url = "rtmp://{}.{}.{}.{}:1935/liveedge/{}".format(*match.groups()[1:])
            with self._lock:
                self._rtmp_url = rtmp_url
                self._hls_url = hls_url
                self._rtmps_url = rtmps_url
backports.py 文件源码 项目:mlens 作者: flennerhag 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def concurrency_safe_rename(src, dst):
        """Renames ``src`` into ``dst`` overwriting ``dst`` if it exists.

        On Windows os.replace (or for Python 2.7 its implementation
        through MoveFileExW) can yield permission errors if executed by
        two different processes.
        """
        max_sleep_time = 1
        total_sleep_time = 0
        sleep_time = 0.001
        while total_sleep_time < max_sleep_time:
            try:
                replace(src, dst)
                break
            except Exception as exc:
                if getattr(exc, 'winerror', None) == error_access_denied:
                    time.sleep(sleep_time)
                    total_sleep_time += sleep_time
                    sleep_time *= 2
                else:
                    raise
        else:
            raise
io.py 文件源码 项目:pipenv 作者: pypa 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def replace(src, dst):
        # argument names match stdlib docs, docstring below
        try:
            # ReplaceFile fails if the dest file does not exist, so
            # first try to rename it into position
            os.rename(src, dst)
            return
        except WindowsError as we:
            if we.errno == errno.EEXIST:
                pass  # continue with the ReplaceFile logic below
            else:
                raise

        src = path_to_unicode(src)
        dst = path_to_unicode(dst)
        res = _ReplaceFile(c_wchar_p(dst), c_wchar_p(src),
                           None, 0, None, None)
        if not res:
            raise OSError('failed to replace %r with %r' % (dst, src))
        return
atomic_write.py 文件源码 项目:AI-Fight-the-Landlord 作者: YoungGer 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def atomic_write(filepath, binary=False, fsync=False):
    """ Writeable file object that atomically updates a file (using a temporary file). In some cases (namely Python < 3.3 on Windows), this could result in an existing file being temporarily unlinked.

    :param filepath: the file path to be opened
    :param binary: whether to open the file in a binary mode instead of textual
    :param fsync: whether to force write the file to disk
    """

    tmppath = filepath + '~'
    while os.path.isfile(tmppath):
        tmppath += '~'
    try:
        with open(tmppath, 'wb' if binary else 'w') as file:
            yield file
            if fsync:
                file.flush()
                os.fsync(file.fileno())
        replace(tmppath, filepath)
    finally:
        try:
            os.remove(tmppath)
        except (IOError, OSError):
            pass
utils.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def close(self):
            try:
                if self.__inFile:
                    self.__inFile.close()
                if self.__outFile:
                    self.__outFile.close()
                    os.replace(self.__outFile.name, self.__cachePath)
            except OSError as e:
                raise BuildError("Error closing hash cache: " + str(e))
input.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __generatePackages(self, nameFormatter, env, cacheKey, sandboxEnabled):
        # use separate caches with and without sandbox
        if sandboxEnabled:
            cacheName = ".bob-packages-sb.pickle"
        else:
            cacheName = ".bob-packages.pickle"

        # try to load the persisted packages
        states = { n:s() for (n,s) in self.__states.items() }
        rootPkg = Package()
        rootPkg.construct("<root>", [], nameFormatter, None, [], [], states,
            {}, {}, None, None, [], {}, -1)
        try:
            with open(cacheName, "rb") as f:
                persistedCacheKey = f.read(len(cacheKey))
                if cacheKey == persistedCacheKey:
                    tmp = PackageUnpickler(f, self.getRecipe, self.__plugins,
                                           nameFormatter).load()
                    return tmp.toStep(nameFormatter, rootPkg).getPackage()
        except (EOFError, OSError, pickle.UnpicklingError):
            pass

        # not cached -> calculate packages
        result = self.__rootRecipe.prepare(nameFormatter, env, sandboxEnabled,
                                           states)[0]

        # save package tree for next invocation
        tmp = CoreStepRef(rootPkg, result.getPackageStep())
        try:
            newCacheName = cacheName + ".new"
            with open(newCacheName, "wb") as f:
                f.write(cacheKey)
                PackagePickler(f, nameFormatter).dump(tmp)
            os.replace(newCacheName, cacheName)
        except OSError as e:
            print("Error saving internal state:", str(e), file=sys.stderr)

        return result
_compat.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _make_text_stream(stream, encoding, errors):
    if encoding is None:
        encoding = get_best_encoding(stream)
    if errors is None:
        errors = 'replace'
    return _NonClosingTextIOWrapper(stream, encoding, errors,
                                    line_buffering=True)
_compat.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def filename_to_ui(value):
        if isinstance(value, bytes):
            value = value.decode(get_filesystem_encoding(), 'replace')
        return value
_compat.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _force_correct_text_reader(text_reader, encoding, errors):
        if _is_binary_reader(text_reader, False):
            binary_reader = text_reader
        else:
            # If there is no target encoding set, we need to verify that the
            # reader is not actually misconfigured.
            if encoding is None and not _stream_is_misconfigured(text_reader):
                return text_reader

            if _is_compatible_text_stream(text_reader, encoding, errors):
                return text_reader

            # If the reader has no encoding, we try to find the underlying
            # binary reader for it.  If that fails because the environment is
            # misconfigured, we silently go with the same reader because this
            # is too common to happen.  In that case, mojibake is better than
            # exceptions.
            binary_reader = _find_binary_reader(text_reader)
            if binary_reader is None:
                return text_reader

        # At this point, we default the errors to replace instead of strict
        # because nobody handles those errors anyways and at this point
        # we're so fundamentally fucked that nothing can repair it.
        if errors is None:
            errors = 'replace'
        return _make_text_stream(binary_reader, encoding, errors)
_compat.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _force_correct_text_writer(text_writer, encoding, errors):
        if _is_binary_writer(text_writer, False):
            binary_writer = text_writer
        else:
            # If there is no target encoding set, we need to verify that the
            # writer is not actually misconfigured.
            if encoding is None and not _stream_is_misconfigured(text_writer):
                return text_writer

            if _is_compatible_text_stream(text_writer, encoding, errors):
                return text_writer

            # If the writer has no encoding, we try to find the underlying
            # binary writer for it.  If that fails because the environment is
            # misconfigured, we silently go with the same writer because this
            # is too common to happen.  In that case, mojibake is better than
            # exceptions.
            binary_writer = _find_binary_writer(text_writer)
            if binary_writer is None:
                return text_writer

        # At this point, we default the errors to replace instead of strict
        # because nobody handles those errors anyways and at this point
        # we're so fundamentally fucked that nothing can repair it.
        if errors is None:
            errors = 'replace'
        return _make_text_stream(binary_writer, encoding, errors)
_compat.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def filename_to_ui(value):
        if isinstance(value, bytes):
            value = value.decode(get_filesystem_encoding(), 'replace')
        else:
            value = value.encode('utf-8', 'surrogateescape') \
                .decode('utf-8', 'replace')
        return value
_compat.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _make_text_stream(stream, encoding, errors):
    if encoding is None:
        encoding = get_best_encoding(stream)
    if errors is None:
        errors = 'replace'
    return _NonClosingTextIOWrapper(stream, encoding, errors,
                                    line_buffering=True)
_compat.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def filename_to_ui(value):
        if isinstance(value, bytes):
            value = value.decode(get_filesystem_encoding(), 'replace')
        return value
_compat.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _force_correct_text_reader(text_reader, encoding, errors):
        if _is_binary_reader(text_reader, False):
            binary_reader = text_reader
        else:
            # If there is no target encoding set, we need to verify that the
            # reader is not actually misconfigured.
            if encoding is None and not _stream_is_misconfigured(text_reader):
                return text_reader

            if _is_compatible_text_stream(text_reader, encoding, errors):
                return text_reader

            # If the reader has no encoding, we try to find the underlying
            # binary reader for it.  If that fails because the environment is
            # misconfigured, we silently go with the same reader because this
            # is too common to happen.  In that case, mojibake is better than
            # exceptions.
            binary_reader = _find_binary_reader(text_reader)
            if binary_reader is None:
                return text_reader

        # At this point, we default the errors to replace instead of strict
        # because nobody handles those errors anyways and at this point
        # we're so fundamentally fucked that nothing can repair it.
        if errors is None:
            errors = 'replace'
        return _make_text_stream(binary_reader, encoding, errors)
_compat.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def _force_correct_text_writer(text_writer, encoding, errors):
        if _is_binary_writer(text_writer, False):
            binary_writer = text_writer
        else:
            # If there is no target encoding set, we need to verify that the
            # writer is not actually misconfigured.
            if encoding is None and not _stream_is_misconfigured(text_writer):
                return text_writer

            if _is_compatible_text_stream(text_writer, encoding, errors):
                return text_writer

            # If the writer has no encoding, we try to find the underlying
            # binary writer for it.  If that fails because the environment is
            # misconfigured, we silently go with the same writer because this
            # is too common to happen.  In that case, mojibake is better than
            # exceptions.
            binary_writer = _find_binary_writer(text_writer)
            if binary_writer is None:
                return text_writer

        # At this point, we default the errors to replace instead of strict
        # because nobody handles those errors anyways and at this point
        # we're so fundamentally fucked that nothing can repair it.
        if errors is None:
            errors = 'replace'
        return _make_text_stream(binary_writer, encoding, errors)
_compat.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def filename_to_ui(value):
        if isinstance(value, bytes):
            value = value.decode(get_filesystem_encoding(), 'replace')
        else:
            value = value.encode('utf-8', 'surrogateescape') \
                .decode('utf-8', 'replace')
        return value
_compat.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _make_text_stream(stream, encoding, errors):
    if encoding is None:
        encoding = get_best_encoding(stream)
    if errors is None:
        errors = 'replace'
    return _NonClosingTextIOWrapper(stream, encoding, errors,
                                    line_buffering=True)


问题


面经


文章

微信
公众号

扫码关注公众号