python类Iterator()的实例源码

many_to_one.py 文件源码 项目:matchpy 作者: HPAC 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _match_sequence_variables(
            self,
            subjects: MultisetOfExpression,
            pattern_vars: Sequence[VariableWithCount],
            substitution: Substitution,
    ) -> Iterator[Substitution]:
        only_counts = [info for info, _ in pattern_vars]
        wrapped_vars = [name for (name, _, _, _), wrap in pattern_vars if wrap and name]
        for variable_substitution in commutative_sequence_variable_partition_iter(subjects, only_counts):
            for var in wrapped_vars:
                operands = variable_substitution[var]
                if isinstance(operands, (tuple, list, Multiset)):
                    if len(operands) > 1:
                        variable_substitution[var] = self.associative(*operands)
                    else:
                        variable_substitution[var] = next(iter(operands))
            try:
                result_substitution = substitution.union(variable_substitution)
            except ValueError:
                continue
            yield result_substitution
test_python_driver.py 文件源码 项目:python-driver 作者: bblfsh 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _extract_docs(inbuffer: InBuffer) -> Iterator[Response]:
        """
        This generator will read the inbuffer yielding the JSON
        docs when it finds the ending mark
        """
        line: str
        for line in inbuffer.readlines():
            yield json.loads(line)
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def iter_parallel_report(func,  # type: Callable[..., Any]
                         args_lists,  # type: Sequence[CallArgs]
                         ccmode=CC_PROCESSES):
    # type: (...) -> Iterator[Union[ExeResult, ExcInfo]]
    if ccmode == CC_OFF or len(args_lists) <= 1 or not multiprocessing:
        for args, kwargs in args_lists:
            yield func(*args, **kwargs)
        return

    processes = min(len(args_lists), multiprocessing.cpu_count())
    if ccmode == CC_THREADS:
        pool = multiprocessing.pool.ThreadPool(processes=processes)
    else:
        pool = multiprocessing.Pool(processes=processes, initializer=per_process_init)
    try:
        async_results = [pool.apply_async(func, args=args, kwds=kwargs)
                         for args, kwargs in args_lists]
        pool.close()
        while async_results:
            try:
                asyncres = async_results.pop(0)
                yield asyncres.get()
            except (KeyboardInterrupt, GeneratorExit):
                raise
            except Exception as e:
                t, v, tb = sys.exc_info()
                try:
                    # Report the textual traceback of the subprocess rather
                    # than this local exception which was triggered
                    # by the other side.
                    tb = e.traceback  # type: ignore
                except AttributeError:
                    pass
                yield ExcInfo((t, v, tb))
    except GeneratorExit:
        pool.terminate()
    except KeyboardInterrupt:
        pool.terminate()
        raise
    finally:
        pool.join()
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def iter_parallel(func,        # type: Callable
                  args_lists,  # type: Sequence[CallArgs]
                  ccmode=CC_PROCESSES):
    # type: (...) -> Iterator[Any]
    if not args_lists:
        return
    if ccmode != CC_OFF:
        args_lists = [((func, args, kwargs), {}) for args, kwargs in args_lists]
        wrappedfunc = tracebackwrapper
    else:
        wrappedfunc = func

    for result in iter_parallel_report(wrappedfunc, args_lists, ccmode=ccmode):
        if ccmode == CC_OFF:
            yield result
        else:
            tbtext = None
            try:
                if isinstance(result, ExcInfo):
                    t, v, tb = result.exc_info
                    if not isinstance(tb, types.TracebackType):
                        tbtext = tb
                        tb = None
                    reraise(t, v, tb)
                else:
                    yield result
            except Exception:
                if tbtext is not None:
                    raise Exception(tbtext)
                else:
                    traceback.print_exc()
                    raise

# ----------------------------------------------------------------------
# The data types option and style.
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def iter_options(self, style):
        # type: (Style) -> Iterator[TextPair]
        dump = self.style_dump(style)
        for optname, optvalue in parse_keyvalue_pairs(dump):
            optname = optname.lower()
            optvalue = optvalue.lower()
            yield optname, optvalue
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def iter_stylecombos(formatter, ignoreopts=()):
    # type: (CodeFormatter, Sequence[str]) -> Iterator[FormatOption]
    for option in styledef_options(formatter.styledefinition):
        if option_name(option) in ignoreopts:
            continue
        stylecombo = formatter.variants_for(option)
        if stylecombo:
            yield FormatOption(stylecombo)
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def iter_tbodies(table):
    # type: (str) -> Iterator[Tuple[str, str, str]]
    fragments = re.split(r'</?tbody>', table, flags=re.MULTILINE)
    if len(fragments) <= 1:
        return
    tbodies = fragments[1:-1:2]
    tablestart, tableend = fragments[0], fragments[-1]
    for tbody in tbodies:
        yield tablestart, '<tbody>%s</tbody>\n        ' % tbody, tableend
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def format_with_styles(formatter,           # type: CodeFormatter
                       styles,              # type: List[Style]
                       filenames,           # type: List[str]
                       reporterrors=True,   # type: bool
                       cache=None,          # type: Optional[Cache]
                       ccmode=CC_PROCESSES  # type: str
                       ):
    # type: (...) -> Iterator[Tuple[ExeCall, ExeResult]]
    """Reformat all files with all styles and yield pairs
    (job, jobresult) of all reformat operations.
    """
    jobs = []
    sourcecodes = []
    for style, filename in itertools.product(styles, filenames):
        cmdargs = formatter.cmdargs_for_style(style, filename)
        sourcedata = get_cached_file(filename)
        jobs.append(make_execall(formatter.exe, cmdargs, sourcedata, depfiles=[filename]))
        sourcecodes.append(sourcedata)
    jobresults = run_executables(jobs, cache, ccmode=ccmode)
    for srcdata, job, jobres in izip(sourcecodes, jobs, jobresults):
        if reporterrors:
            formatter.reporterrors(job, jobres)
        # A formatter reporting a valid result for non-empty input while returning empty
        # output indicates that the effective result is the unchanged input.
        if not jobres.stdout and srcdata and formatter.valid_job_result(job, jobres):
            jobres = jobres._replace(stdout=srcdata)
        yield job, jobres
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def distances_from_diffs_avglen(difftool,            # type: Tuple[str, str, List[str]]
                                diffargs,            # type: List[Tuple[str, bytes]]
                                cache=None,          # type: Optional[Cache]
                                ccmode=CC_PROCESSES  # type: str
                                ):
    # type: (...) -> Iterator[Tuple[Sequence[int], Iterable[int]]]
    """Returns pairs of (m, l) where m is the diff metric and l is the average line length
    difference. This improves the metric just a tiny bit.
    """
    metrics = distances_from_diffs(difftool, diffargs, cache=cache, ccmode=ccmode)
    lldiffs = avg_linelength_diffs(diffargs)
    return izip(metrics, lldiffs)
flask_app.py 文件源码 项目:Lebanese-Channels 作者: ChadiEM 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __filter_locations(channel_list: List[Channel], location: str) -> Iterator[Channel]:
    return filter(lambda current_channel: current_channel.available_in(location), channel_list)
active.py 文件源码 项目:irisett 作者: beebyte 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def iter_monitors(self) -> Iterator['ActiveMonitor']:
        """List all monitors that use this monitor def."""
        for monitor in self.manager.monitors.values():
            if monitor.monitor_def.id == self.id:
                yield monitor
input_provider.py 文件源码 项目:Main 作者: N-BodyPhysicsSimulator 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_generator(self) -> Iterator[BodyState]:
        """ Method to receive bodies. Returns a generator. """
        pass # codecov ignore
csv_input_provider.py 文件源码 项目:Main 作者: N-BodyPhysicsSimulator 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_generator(self) -> Iterator[BodyState]:
        path = self.args.get('csv_input_path')
        separator = self.args.get('separator')

        bodies = self.__get_bodies_from_path(path, separator)

        yield BodyState.from_dict({
            'bodies': bodies,
            'ticks': 0,
            'time': 0,
            'delta_time': self.args.get('delta_time')
        })
json_input_provider.py 文件源码 项目:Main 作者: N-BodyPhysicsSimulator 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_generator(self) -> Iterator[BodyState]:
        path = self.args.get('json_input_path')

        with open(path) as f:
            for line in f.readlines():
                yield BodyState.from_dict(
                    json.loads(line)
                )
ontonotes.py 文件源码 项目:allennlp 作者: allenai 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def dataset_iterator(self, file_path) -> Iterator[OntonotesSentence]:
        """
        An iterator over the entire dataset, yielding all sentences processed.
        """
        for conll_file in self.dataset_path_iterator(file_path):
            yield from self.sentence_iterator(conll_file)
ontonotes.py 文件源码 项目:allennlp 作者: allenai 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def dataset_path_iterator(file_path: str) -> Iterator[str]:
        """
        An iterator returning file_paths in a directory
        containing CONLL-formatted files.
        """
        logger.info("Reading CONLL sentences from dataset files at: %s", file_path)
        for root, _, files in tqdm.tqdm(list(os.walk(file_path))):
            for data_file in files:
                # These are a relic of the dataset pre-processing. Every
                # file will be duplicated - one file called filename.gold_skel
                # and one generated from the preprocessing called filename.gold_conll.
                if not data_file.endswith("gold_conll"):
                    continue

                yield os.path.join(root, data_file)
tests.py 文件源码 项目:greenstalk 作者: mayhewj 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def assert_seconds(n: int) -> Iterator[None]:
    start = datetime.now()
    yield
    duration = datetime.now() - start
    assert duration >= timedelta(seconds=n)
    assert duration <= timedelta(seconds=n, milliseconds=50)
extract_tokens.py 文件源码 项目:python-miio 作者: rytilahti 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def read_apple(self) -> Iterator[DeviceConfig]:
        """Read Apple-specific database file."""
        _LOGGER.info("Reading tokens from Apple DB")
        c = self.conn.execute("SELECT * FROM ZDEVICE WHERE ZTOKEN IS NOT '';")
        for dev in c.fetchall():
            if self.dump_raw:
                BackupDatabaseReader.dump_raw(dev)
            ip = dev['ZLOCALIP']
            mac = dev['ZMAC']
            model = dev['ZMODEL']
            name = dev['ZNAME']
            token = BackupDatabaseReader.decrypt_ztoken(dev['ZTOKEN'])

            config = DeviceConfig(name=name, mac=mac, ip=ip, model=model, token=token)
            yield config
logging.py 文件源码 项目:EMFT 作者: 132nd-etcher 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def filter_records(
        minimum_level=logging.NOTSET,
        msg_filter: str or None = None,
        module_filter: str or None = None,
        thread_filter: str or None = None,
    ) -> typing.Iterator[logging.LogRecord]:
        records = Records(persistent_logging_handler.records)
        records \
            .filter_by_level(minimum_level) \
            .filter_by_message(msg_filter) \
            .filter_by_module(module_filter) \
            .filter_by_thread(thread_filter)
        for rec in records:
            yield rec
av_build.py 文件源码 项目:EMFT 作者: 132nd-etcher 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def successful_only(self) -> typing.Iterator[AVBuild]:
        for x in self:
            if x.status == 'success':
                yield x


问题


面经


文章

微信
公众号

扫码关注公众号