python类remove()的实例源码

checkpoint.py 文件源码 项目:deep-summarization 作者: harpribot 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def delete_previous_checkpoints(self, num_previous=5):
        """
        Deletes all previous checkpoints that are <num_previous> before the present checkpoint.
        This is done to prevent blowing out of memory due to too many checkpoints

        :param num_previous:
        :return:
        """
        self.present_checkpoints = glob.glob(self.get_checkpoint_location() + '/*.ckpt')
        if len(self.present_checkpoints) > num_previous:
            present_ids = [self.__get_id(ckpt) for ckpt in self.present_checkpoints]
            present_ids.sort()
            ids_2_delete = present_ids[0:len(present_ids) - num_previous]
            for ckpt_id in ids_2_delete:
                ckpt_file_nm = self.get_checkpoint_location() + '/model_' + str(ckpt_id) + '.ckpt'
                os.remove(ckpt_file_nm)
download.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 57 收藏 0 点赞 0 评论 0
def _copy_file(filename, location, link):
    copy = True
    download_location = os.path.join(location, link.filename)
    if os.path.exists(download_location):
        response = ask_path_exists(
            'The file %s exists. (i)gnore, (w)ipe, (b)ackup, (a)abort' %
            display_path(download_location), ('i', 'w', 'b', 'a'))
        if response == 'i':
            copy = False
        elif response == 'w':
            logger.warning('Deleting %s', display_path(download_location))
            os.remove(download_location)
        elif response == 'b':
            dest_file = backup_dir(download_location)
            logger.warning(
                'Backing up %s to %s',
                display_path(download_location),
                display_path(dest_file),
            )
            shutil.move(download_location, dest_file)
        elif response == 'a':
            sys.exit(-1)
    if copy:
        shutil.copy(filename, download_location)
        logger.info('Saved %s', display_path(download_location))
__init__.py 文件源码 项目:geo-pyprint 作者: ioda-net 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def mapprint(request):
    payload = request.json_body

    output_file_name = MapPrint(payload).print_pdf()

    response = FileResponse(
        output_file_name,
        request=request
    )
    response.headers['Content-Disposition'] = ('attachement; filename="{}"'
                                               .format(output_file_name + '.pdf'))
    response.headers['Content-Type'] = 'application/pdf'

    if os.path.exists(output_file_name):
        os.remove(output_file_name)

    return response
dispycos_client9_server.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def setup_server(data_file, task=None):  # executed on remote server
    # variables declared as 'global' will be available in tasks for read/write
    # to all computations on a server.
    global hashlib, data, file_name
    import os, hashlib
    file_name = data_file
    print('%s processing %s' % (task.location, data_file))
    # note that files transferred to server are in the directory where
    # computations are executed (cf 'node_setup' in dispycos_client9_node.py)
    with open(data_file, 'rb') as fd:
        data = fd.read()
    os.remove(data_file)  # data_file is not needed anymore
    # generator functions must have at least one 'yield'
    yield 0 # indicate successful initialization with exit value 0

# 'compute' is executed at remote server process repeatedly to compute checksum
# of data in memory, initialized by 'setup_server'
dispycos_client9_node.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def node_setup(data_file):
    # 'node_setup' is executed on a node with the arguments returned by
    # 'node_available'. This task should return 0 to indicate successful
    # initialization.

    # variables declared as 'global' will be available (as read-only) in tasks.
    global os, hashlib, data, file_name
    import os, hashlib
    # note that files transferred to node are in parent directory of cwd where
    # each computation is run (in case such files need to be accessed in
    # computation).
    print('data_file: "%s"' % data_file)
    with open(data_file, 'rb') as fd:
        data = fd.read()
    os.remove(data_file)  # data_file is not needed anymore
    file_name = data_file
    yield 0  # task must have at least one 'yield' and 0 indicates success

# 'compute' is executed at remote server process repeatedly to compute checksum
# of data in memory, initialized by 'node_setup'
dispycos_client9_node.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 58 收藏 0 点赞 0 评论 0
def node_setup(data_file):
    # 'node_setup' is executed on a node with the arguments returned by
    # 'node_available'. This task should return 0 to indicate successful
    # initialization.

    # variables declared as 'global' will be available (as read-only) in tasks.
    global os, hashlib, data, file_name
    import os, hashlib
    # note that files transferred to node are in parent directory of cwd where
    # each computation is run (in case such files need to be accessed in
    # computation).
    print('data_file: "%s"' % data_file)
    with open(data_file, 'rb') as fd:
        data = fd.read()
    os.remove(data_file)  # data_file is not needed anymore
    file_name = data_file
    yield 0  # task must have at least one 'yield' and 0 indicates success

# 'compute' is executed at remote server process repeatedly to compute checksum
# of data in memory, initialized by 'node_setup'
dispycos_client9_server.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def setup_server(data_file, task=None):  # executed on remote server
    # variables declared as 'global' will be available in tasks for read/write
    # to all computations on a server.
    global hashlib, data, file_name
    import os, hashlib
    file_name = data_file
    print('%s processing %s' % (task.location, data_file))
    # note that files transferred to server are in the directory where
    # computations are executed (cf 'node_setup' in dispycos_client9_node.py)
    with open(data_file, 'rb') as fd:
        data = fd.read()
    os.remove(data_file)  # data_file is not needed anymore
    # generator functions must have at least one 'yield'
    yield 0 # indicate successful initialization with exit value 0

# 'compute' is executed at remote server process repeatedly to compute checksum
# of data in memory, initialized by 'setup_server'
ceph.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def remove_cache_tier(self, cache_pool):
        """
        Removes a cache tier from Ceph.  Flushes all dirty objects from writeback pools and waits for that to complete.
        :param cache_pool: six.string_types.  The cache tier pool name to remove.
        :return: None
        """
        # read-only is easy, writeback is much harder
        mode = get_cache_mode(self.service, cache_pool)
        version = ceph_version()
        if mode == 'readonly':
            check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'none'])
            check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])

        elif mode == 'writeback':
            pool_forward_cmd = ['ceph', '--id', self.service, 'osd', 'tier',
                                'cache-mode', cache_pool, 'forward']
            if version >= '10.1':
                # Jewel added a mandatory flag
                pool_forward_cmd.append('--yes-i-really-mean-it')

            check_call(pool_forward_cmd)
            # Flush the cache and wait for it to return
            check_call(['rados', '--id', self.service, '-p', cache_pool, 'cache-flush-evict-all'])
            check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove-overlay', self.name])
            check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
workflow.py 文件源码 项目:alfred-mpd 作者: deanishe 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def unregister(self, name):
        """Remove registered serializer with ``name``.

        Raises a :class:`ValueError` if there is no such registered
        serializer.

        :param name: Name of serializer to remove
        :type name: ``unicode`` or ``str``
        :returns: serializer object

        """
        if name not in self._serializers:
            raise ValueError('No such serializer registered : {0}'.format(
                             name))

        serializer = self._serializers[name]
        del self._serializers[name]

        return serializer
workflow.py 文件源码 项目:alfred-mpd 作者: deanishe 项目源码 文件源码 阅读 50 收藏 0 点赞 0 评论 0
def atomic_writer(file_path, mode):
    """Atomic file writer.

    :param file_path: path of file to write to.
    :type file_path: ``unicode``
    :param mode: sames as for `func:open`
    :type mode: string

    .. versionadded:: 1.12

    Context manager that ensures the file is only written if the write
    succeeds. The data is first written to a temporary file.

    """
    temp_suffix = '.aw.temp'
    temp_file_path = file_path + temp_suffix
    with open(temp_file_path, mode) as file_obj:
        try:
            yield file_obj
            os.rename(temp_file_path, file_path)
        finally:
            try:
                os.remove(temp_file_path)
            except (OSError, IOError):
                pass
tools.py 文件源码 项目:txt2evernote 作者: Xunius 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def saveFile(abpath_out,text,overwrite=True,verbose=True):

    abpath_out=expandUser(abpath_out)
    if os.path.isfile(abpath_out):
        if overwrite:
            os.remove(abpath_out)
        else:
            abpath_out=autoRename(abpath_out)

    if verbose:
        print('\n# <saveFile>: Saving result to:')
        print(abpath_out)

    with open(abpath_out, mode='a') as fout:
        fout.write(enu(text))

    return




#------------------Expand user home "~" in file names------------------
gnsync.py 文件源码 项目:txt2evernote 作者: Xunius 项目源码 文件源码 阅读 46 收藏 0 点赞 0 评论 0
def reset_logpath(logpath):
    """
    Reset logpath to path from command line
    """
    global logger

    if not logpath:
        return

    # remove temporary log file if it's empty
    if os.path.isfile(def_logpath):
        if os.path.getsize(def_logpath) == 0:
            os.remove(def_logpath)

    # save previous handlers
    handlers = logger.handlers

    # remove old handlers
    for handler in handlers:
        logger.removeHandler(handler)

    # try to set new file handler
    handler = logging.FileHandler(logpath)
    handler.setFormatter(formatter)
    logger.addHandler(handler)
sensor21-server.py 文件源码 项目:sensor21 作者: 21dotco 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def run(daemon):
        if daemon:
            pid_file = './sensor21.pid'
            if os.path.isfile(pid_file):
                pid = int(open(pid_file).read())
                os.remove(pid_file)
                try:
                    p = psutil.Process(pid)
                    p.terminate()
                except:
                    pass
            try:
                p = subprocess.Popen(['python3', 'sensor21-server.py'])
                open(pid_file, 'w').write(str(p.pid))
            except subprocess.CalledProcessError:
                raise ValueError("error starting sensor21-server.py daemon")
        else:
            print("Server running...")
            app.run(host='::', port=5002)
youtube.py 文件源码 项目:Zoom2Youtube 作者: Welltory 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def upload_from_dir(self, video_dir: str):
        assert os.path.isdir(video_dir), "Not found directory"
        files = self._get_files_from_dir(video_dir, 'mp4')
        for fname in files:
            fpath = os.path.join(video_dir, fname)
            if not os.path.exists(fpath):
                continue
            title = os.path.splitext(os.path.basename(fname))[0]
            options = dict(
                file=fpath,
                title=title,
                privacyStatus='unlisted',
            )
            video_id = self.upload_video(options)
            if not video_id:
                continue

            video_url = 'https://www.youtube.com/watch?v={}'.format(video_id)
            print('File uploaded: {}'.format(video_url))
            message = '{} - {}'.format(title, video_url)
            self.notify(message)
            os.remove(fpath)
util.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_resources_dests(resources_root, rules):
    """Find destinations for resources files"""

    def get_rel_path(base, path):
        # normalizes and returns a lstripped-/-separated path
        base = base.replace(os.path.sep, '/')
        path = path.replace(os.path.sep, '/')
        assert path.startswith(base)
        return path[len(base):].lstrip('/')


    destinations = {}
    for base, suffix, dest in rules:
        prefix = os.path.join(resources_root, base)
        for abs_base in iglob(prefix):
            abs_glob = os.path.join(abs_base, suffix)
            for abs_path in iglob(abs_glob):
                resource_file = get_rel_path(resources_root, abs_path)
                if dest is None:  # remove the entry if it was here
                    destinations.pop(resource_file, None)
                else:
                    rel_path = get_rel_path(abs_base, abs_path)
                    rel_dest = dest.replace(os.path.sep, '/').rstrip('/')
                    destinations[resource_file] = rel_dest + '/' + rel_path
    return destinations
util.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def convert_path(pathname):
    """Return 'pathname' as a name that will work on the native filesystem.

    The path is split on '/' and put back together again using the current
    directory separator.  Needed because filenames in the setup script are
    always supplied in Unix style, and have to be converted to the local
    convention before we can actually use them in the filesystem.  Raises
    ValueError on non-Unix-ish systems if 'pathname' either starts or
    ends with a slash.
    """
    if os.sep == '/':
        return pathname
    if not pathname:
        return pathname
    if pathname[0] == '/':
        raise ValueError("path '%s' cannot be absolute" % pathname)
    if pathname[-1] == '/':
        raise ValueError("path '%s' cannot end with '/'" % pathname)

    paths = pathname.split('/')
    while os.curdir in paths:
        paths.remove(os.curdir)
    if not paths:
        return os.curdir
    return os.path.join(*paths)
util.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 58 收藏 0 点赞 0 评论 0
def rollback(self):
        if not self.dry_run:
            for f in list(self.files_written):
                if os.path.exists(f):
                    os.remove(f)
            # dirs should all be empty now, except perhaps for
            # __pycache__ subdirs
            # reverse so that subdirs appear before their parents
            dirs = sorted(self.dirs_created, reverse=True)
            for d in dirs:
                flist = os.listdir(d)
                if flist:
                    assert flist == ['__pycache__']
                    sd = os.path.join(d, flist[0])
                    os.rmdir(sd)
                os.rmdir(d)     # should fail if non-empty
        self._init_record()
util.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def get_extras(requested, available):
    result = set()
    requested = set(requested or [])
    available = set(available or [])
    if '*' in requested:
        requested.remove('*')
        result |= available
    for r in requested:
        if r == '-':
            result.add(r)
        elif r.startswith('-'):
            unwanted = r[1:]
            if unwanted not in available:
                logger.warning('undeclared extra: %s' % unwanted)
            if unwanted in result:
                result.remove(unwanted)
        else:
            if r not in available:
                logger.warning('undeclared extra: %s' % r)
            result.add(r)
    return result
#
# Extended metadata functionality
#
util.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def get_steps(self, final):
        if not self.is_step(final):
            raise ValueError('Unknown: %r' % final)
        result = []
        todo = []
        seen = set()
        todo.append(final)
        while todo:
            step = todo.pop(0)
            if step in seen:
                # if a step was already seen,
                # move it to the end (so it will appear earlier
                # when reversed on return) ... but not for the
                # final step, as that would be confusing for
                # users
                if step != final:
                    result.remove(step)
                    result.append(step)
            else:
                seen.add(step)
                result.append(step)
                preds = self._preds.get(step, ())
                todo.extend(preds)
        return reversed(result)
elasticsearch_driver.py 文件源码 项目:facerecognition 作者: guoxiaolu 项目源码 文件源码 阅读 56 收藏 0 点赞 0 评论 0
def delete_duplicates(self, path):
        """Delete all but one entries in elasticsearch whose `path` value is equivalent to that of path.
           need to modify!!!
        Args:
            path (string): path value to compare to those in the elastic search
        """
        result = self.es.search(body={'query':
                                 {'match':
                                      {'path': path}
                                  }
                             },
                       index=self.index)['hits']['hits']

        matching_paths = []
        matching_thumbnail = []
        for item in result:
            if item['_source']['path'] == path:
                matching_paths.append(item['_id'])
                matching_thumbnail.append(item['_source']['thumbnail'])

        if len(matching_paths) > 0:
            for i, id_tag in enumerate(matching_paths[1:]):
                self.es.delete(index=self.index, doc_type=self.doc_type, id=id_tag)
                if os.path.isfile(matching_thumbnail[i]):
                    os.remove(matching_thumbnail[i])
crystal.py 文件源码 项目:lammps-data-file 作者: kbsezginel 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def export(self, export_dir, file_format='xyz'):
        """
        Export MOF atom coordinates and names in .xyz format.

        Example usage:
         >>> mof.export(export_dir, file_format='xyz')
        """
        if file_format == 'xyz':
            xyz_path = os.path.join(export_dir, self.name + '.xyz')
            if os.path.exists(xyz_path):
                os.remove(xyz_path)

            with open(xyz_path, 'w') as xyz_file:
                xyz_file.write(str(len(self.atom_coors)) + '\n')
                xyz_file.write(self.name + '\n')
                for atom, coor in zip(self.atom_names, self.atom_coors):
                    xyz_file.write(atom + ' ' + str(coor[0]) + ' ' + str(coor[1]) + ' ' + str(coor[2]) + '\n')

        else:
            file_path = os.path.join(export_dir, self.name + '.' + file_format)
            ase.write(file_path, self.ase_atoms, file_format=file_format)
metadata.py 文件源码 项目:rca-evaluation 作者: sieve-microservices 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _atomic_write(filename):
    path = os.path.dirname(filename)
    try:
        file = tempfile.NamedTemporaryFile(delete=False, dir=path, mode="w+")
        yield file
        file.flush()
        os.fsync(file.fileno())
        os.rename(file.name, filename)
    finally:
        try:
            os.remove(file.name)
        except OSError as e:
            if e.errno == 2:
                pass
            else:
                raise e
power.py 文件源码 项目:ironic-staging-drivers 作者: openstack 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def _set_boot_device(conn, domain, device):
    """Set the boot device.

    :param conn: active libvirt connection.
    :param domain: libvirt domain object.
    :raises: LibvirtError if failed update domain xml.
    """

    parsed = ET.fromstring(domain.XMLDesc())
    os = parsed.find('os')
    boot_list = os.findall('boot')

    # Clear boot list
    for boot_el in boot_list:
        os.remove(boot_el)

    boot_el = ET.SubElement(os, 'boot')
    boot_el.set('dev', device)

    try:
        conn.defineXML(ET.tostring(parsed))
    except libvirt.libvirtError as e:
        raise isd_exc.LibvirtError(err=e)
test_deploy_integration.py 文件源码 项目:ardy 作者: avara1986 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_run_with_alias(self, create_artefact_mock, copytree_mock, pip_install_mock):
        zip_file = MockZipFile.create_zip("test")
        create_artefact_mock.return_value = zip_file

        self.deploy = Deploy(path=os.path.dirname(os.path.abspath(__file__)), filename="config_with_alias.json")

        # TODO: Search why moto rise errors
        try:
            # Create lambdas
            self.deploy.run("myexamplelambdaproject")

            self.assertTrue(pip_install_mock.called)
            self.assertTrue(copytree_mock.called)
            self.assertTrue(create_artefact_mock.called)

            # Update lambdas
            self.deploy.run("myexamplelambdaproject")

        except ConnectionError as e:
            print(e)

        os.remove(zip_file)
test_deploy_integration.py 文件源码 项目:ardy 作者: avara1986 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_run_with_trigger_s3(self, create_artefact_mock, copytree_mock, pip_install_mock):
        zip_file = MockZipFile.create_zip("test")
        create_artefact_mock.return_value = zip_file

        self.deploy = Deploy(path=os.path.dirname(os.path.abspath(__file__)), filename="config_with_triggers.json",
                             lambdas_to_deploy=["LambdaExample_S3_7", ])
        # TODO: Search why moto rise errors
        try:
            # Create lambdas
            self.deploy.run("myexamplelambdaproject")

            self.assertTrue(pip_install_mock.called)
            self.assertTrue(copytree_mock.called)
            self.assertTrue(create_artefact_mock.called)

            # Update lambdas
            self.deploy.run("myexamplelambdaproject")

        except ConnectionError as e:
            print(e)

        os.remove(zip_file)
util.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def get_resources_dests(resources_root, rules):
    """Find destinations for resources files"""

    def get_rel_path(base, path):
        # normalizes and returns a lstripped-/-separated path
        base = base.replace(os.path.sep, '/')
        path = path.replace(os.path.sep, '/')
        assert path.startswith(base)
        return path[len(base):].lstrip('/')


    destinations = {}
    for base, suffix, dest in rules:
        prefix = os.path.join(resources_root, base)
        for abs_base in iglob(prefix):
            abs_glob = os.path.join(abs_base, suffix)
            for abs_path in iglob(abs_glob):
                resource_file = get_rel_path(resources_root, abs_path)
                if dest is None:  # remove the entry if it was here
                    destinations.pop(resource_file, None)
                else:
                    rel_path = get_rel_path(abs_base, abs_path)
                    rel_dest = dest.replace(os.path.sep, '/').rstrip('/')
                    destinations[resource_file] = rel_dest + '/' + rel_path
    return destinations
util.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def convert_path(pathname):
    """Return 'pathname' as a name that will work on the native filesystem.

    The path is split on '/' and put back together again using the current
    directory separator.  Needed because filenames in the setup script are
    always supplied in Unix style, and have to be converted to the local
    convention before we can actually use them in the filesystem.  Raises
    ValueError on non-Unix-ish systems if 'pathname' either starts or
    ends with a slash.
    """
    if os.sep == '/':
        return pathname
    if not pathname:
        return pathname
    if pathname[0] == '/':
        raise ValueError("path '%s' cannot be absolute" % pathname)
    if pathname[-1] == '/':
        raise ValueError("path '%s' cannot end with '/'" % pathname)

    paths = pathname.split('/')
    while os.curdir in paths:
        paths.remove(os.curdir)
    if not paths:
        return os.curdir
    return os.path.join(*paths)
util.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def rollback(self):
        if not self.dry_run:
            for f in list(self.files_written):
                if os.path.exists(f):
                    os.remove(f)
            # dirs should all be empty now, except perhaps for
            # __pycache__ subdirs
            # reverse so that subdirs appear before their parents
            dirs = sorted(self.dirs_created, reverse=True)
            for d in dirs:
                flist = os.listdir(d)
                if flist:
                    assert flist == ['__pycache__']
                    sd = os.path.join(d, flist[0])
                    os.rmdir(sd)
                os.rmdir(d)     # should fail if non-empty
        self._init_record()
util.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_extras(requested, available):
    result = set()
    requested = set(requested or [])
    available = set(available or [])
    if '*' in requested:
        requested.remove('*')
        result |= available
    for r in requested:
        if r == '-':
            result.add(r)
        elif r.startswith('-'):
            unwanted = r[1:]
            if unwanted not in available:
                logger.warning('undeclared extra: %s' % unwanted)
            if unwanted in result:
                result.remove(unwanted)
        else:
            if r not in available:
                logger.warning('undeclared extra: %s' % r)
            result.add(r)
    return result
#
# Extended metadata functionality
#
util.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 66 收藏 0 点赞 0 评论 0
def get_steps(self, final):
        if not self.is_step(final):
            raise ValueError('Unknown: %r' % final)
        result = []
        todo = []
        seen = set()
        todo.append(final)
        while todo:
            step = todo.pop(0)
            if step in seen:
                # if a step was already seen,
                # move it to the end (so it will appear earlier
                # when reversed on return) ... but not for the
                # final step, as that would be confusing for
                # users
                if step != final:
                    result.remove(step)
                    result.append(step)
            else:
                seen.add(step)
                result.append(step)
                preds = self._preds.get(step, ())
                todo.extend(preds)
        return reversed(result)


问题


面经


文章

微信
公众号

扫码关注公众号