python类Generator()的实例源码

prettier.py 文件源码 项目:python-devtools 作者: samuelcolvin 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self,
                 indent_step=4,
                 indent_char=' ',
                 repr_strings=False,
                 simple_cutoff=10,
                 width=120,
                 yield_from_generators=True):
        self._indent_step = indent_step
        self._c = indent_char
        self._repr_strings = repr_strings
        self._repr_generators = not yield_from_generators
        self._simple_cutoff = simple_cutoff
        self._width = width
        self._type_lookup = [
            (dict, self._format_dict),
            (str, self._format_str),
            (bytes, self._format_bytes),
            (tuple, self._format_tuples),
            ((list, set, frozenset), self._format_list_like),
            (collections.Generator, self._format_generators),
        ]
meta.py 文件源码 项目:drf-metadata 作者: night-crawler 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_meta(self) -> t.Generator[t.Dict, None, None]:
        all_fields = self.model._meta.get_fields(
            include_parents=self.include_parents, include_hidden=self.include_hidden
        )
        for f in all_fields:
            if f.name in self.exclude:
                continue
            if self.fields and f.name not in self.fields:
                continue

            if f.name not in self.fields:
                if f.concrete not in self.concrete_in:
                    continue
                if f.auto_created not in self.auto_created_in:
                    continue
                if f.editable not in self.editable_in:
                    continue

            yield self.get_field_meta(f)
container.py 文件源码 项目:merakicommons 作者: meraki-analytics 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def enumerate(self, item: Any, reverse: bool = False) -> Generator[Tuple[int, Any], None, None]:
        items = self
        if reverse:
            max = len(items) - 1
            items = reversed(items)
        for index, x in enumerate(items):
            if x == item:
                yield max - index if reverse else index, x
                continue

            try:
                if item in x:
                    yield max - index if reverse else index, x
            except TypeError:
                # x doesn't define __contains__
                pass
container.py 文件源码 项目:merakicommons 作者: meraki-analytics 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def enumerate(self, item: Any) -> Generator[Tuple[Any, Any], None, None]:
        for key, value in self.items():
            if key == item:
                yield key, value
                continue

            try:
                if item in key:
                    yield key, value
                    continue
            except TypeError:
                # key doesn't define __contains__
                pass

            if value == item:
                yield key, value
                continue

            try:
                if item in value:
                    yield key, value
                    continue
            except TypeError:
                # value doesn't define __contains__
                pass
ddlsession.py 文件源码 项目:asyncqlio 作者: SunDwarf 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_columns(self, table_name: str = None) \
            -> 'typing.Generator[md_column.Column, None, None]':
        """
        Yields a :class:`.md_column.Column` for each column in the specified table,
        or for each column in the schema if no table is specified.

        These columns don't point to a :class:`.md_table.Table` since there
        might not be one, but accessing __name__ and __tablename__ of the column's
        table will still work as expected.

        :param table_name: The table to get indexes from, or all tables if omitted
        """
        params = {"table_name": table_name}
        emitter = self.bind.emit_param

        sql = self.bind.dialect.get_column_sql(table_name, emitter=emitter)
        cur = await self.transaction.cursor(sql, params)
        records = await cur.flatten()
        await cur.close()

        return self.bind.dialect.transform_rows_to_columns(*records, table_name=table_name)
ddlsession.py 文件源码 项目:asyncqlio 作者: SunDwarf 项目源码 文件源码 阅读 67 收藏 0 点赞 0 评论 0
def get_indexes(self, table_name: str = None) \
            -> 'typing.Generator[md_index.Index, None, None]':
        """
        Yields a :class:`.md_index.Index` for each index in the specified table,
        or for each index in the schema if no table is specified.

        These indexes don't point to a :class:`.md_table.Table` since there
        might not be one, but they have a table_name attribute.

        :param table_name: The table to get indexes from, or all tables if omitted
        """
        params = {"table_name": table_name}
        emitter = self.bind.emit_param

        sql = self.bind.dialect.get_index_sql(table_name, emitter=emitter)
        cur = await self.transaction.cursor(sql, params)
        records = await cur.flatten()
        await cur.close()

        return self.bind.dialect.transform_rows_to_indexes(*records, table_name=table_name)
autodetect.py 文件源码 项目:rcli 作者: contains-io 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _get_module_commands(module):
    # type: (ast.Module) -> typing.Generator[_EntryPoint, None, None]
    """Yield all Command objects represented by the python module.

    Module commands consist of a docopt-style module docstring and a callable
    Command class.

    Args:
        module: An ast.Module object used to retrieve docopt-style commands.

    Yields:
        Command objects that represent entry points to append to setup.py.
    """
    cls = next((n for n in module.body
                if isinstance(n, ast.ClassDef) and n.name == 'Command'), None)
    if not cls:
        return
    methods = (n.name for n in cls.body if isinstance(n, ast.FunctionDef))
    if '__call__' not in methods:
        return
    docstring = ast.get_docstring(module)
    for commands, _ in usage.parse_commands(docstring):
        yield _EntryPoint(commands[0], next(iter(commands[1:]), None), None)
autodetect.py 文件源码 项目:rcli 作者: contains-io 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _get_class_commands(module):
    # type: (ast.Module) -> typing.Generator[_EntryPoint, None, None]
    """Yield all Command objects represented by python classes in the module.

    Class commands are detected by inspecting all callable classes in the
    module for docopt-style docstrings.

    Args:
        module: An ast.Module object used to retrieve docopt-style commands.

    Yields:
        Command objects that represent entry points to append to setup.py.
    """
    nodes = (n for n in module.body if isinstance(n, ast.ClassDef))
    for cls in nodes:
        methods = (n.name for n in cls.body if isinstance(n, ast.FunctionDef))
        if '__call__' in methods:
            docstring = ast.get_docstring(cls)
            for commands, _ in usage.parse_commands(docstring):
                yield _EntryPoint(commands[0], next(iter(commands[1:]), None),
                                  cls.name)
autodetect.py 文件源码 项目:rcli 作者: contains-io 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _get_function_commands(module):
    # type: (ast.Module) -> typing.Generator[_EntryPoint, None, None]
    """Yield all Command objects represented by python functions in the module.

    Function commands consist of all top-level functions that contain
    docopt-style docstrings.

    Args:
        module: An ast.Module object used to retrieve docopt-style commands.

    Yields:
        Command objects that represent entry points to append to setup.py.
    """
    nodes = (n for n in module.body if isinstance(n, ast.FunctionDef))
    for func in nodes:
        docstring = ast.get_docstring(func)
        for commands, _ in usage.parse_commands(docstring):
            yield _EntryPoint(commands[0], next(iter(commands[1:]), None),
                              func.name)
linter.py 文件源码 项目:flake8-sql 作者: pgjones 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _check_query_words(
            self, query: ast.Str, parser: Parser,
    ) -> Generator[Tuple[int, int, str, type], Any, None]:
        for token in parser:
            word = token.value
            if token.is_keyword or token.is_function_name:
                if not word.isupper() and word.upper() not in self.excepted_names:
                    yield(
                        query.lineno, query.col_offset,
                        "Q440 keyword {} is not uppercase".format(word),
                        type(self),
                    )
                if word.upper() in ABBREVIATED_KEYWORDS:
                    yield(
                        query.lineno, query.col_offset,
                        "Q442 avoid abbreviated keywords, {}".format(word),
                        type(self),
                    )
            elif token.is_name and (not word.islower() or word.endswith('_')):
                yield(
                    query.lineno, query.col_offset,
                    "Q441 name {} is not valid, must be snake_case, and cannot "
                    "end with `_`".format(word),
                    type(self),
                )
lexicon.py 文件源码 项目:sockeye 作者: awslabs 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def lexicon_iterator(path: str,
                     vocab_source: Dict[str, int],
                     vocab_target: Dict[str, int]) -> Generator[Tuple[int, int, float], None, None]:
    """
    Yields lines from a translation table of format: src, trg, logprob.

    :param path: Path to lexicon file.
    :param vocab_source: Source vocabulary.
    :param vocab_target: Target vocabulary.
    :return: Generator returning tuples (src_id, trg_id, prob).
    """
    assert C.UNK_SYMBOL in vocab_source
    assert C.UNK_SYMBOL in vocab_target
    src_unk_id = vocab_source[C.UNK_SYMBOL]
    trg_unk_id = vocab_target[C.UNK_SYMBOL]
    with smart_open(path) as fin:
        for line in fin:
            src, trg, logprob = line.rstrip("\n").split("\t")
            prob = np.exp(float(logprob))
            src_id = vocab_source.get(src, src_unk_id)
            trg_id = vocab_target.get(trg, trg_unk_id)
            yield src_id, trg_id, prob
main.py 文件源码 项目:imap_tools 作者: ikvk 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _uid_str(uid_list: str or [str] or Generator) -> str:
        """
        Prepare list of uid for use in commands: delete/copy/move/seen
        uid_list can be: str, list, tuple, set, fetch generator
        """
        if not uid_list:
            raise MailBox.MailBoxUidParamError('uid_list should not be empty')
        if type(uid_list) is str:
            uid_list = uid_list.split(',')
        if inspect.isgenerator(uid_list):
            uid_list = [msg.uid for msg in uid_list if msg.uid]
        if type(uid_list) not in (list, tuple, set):
            raise MailBox.MailBoxUidParamError('Wrong uid_list type: {}'.format(type(uid_list)))
        for uid in uid_list:
            if type(uid) is not str:
                raise MailBox.MailBoxUidParamError('uid {} is not string'.format(str(uid)))
            if not uid.strip().isdigit():
                raise MailBox.MailBoxUidParamError('Wrong uid: {}'.format(uid))
        return ','.join((i.strip() for i in uid_list))
main.py 文件源码 项目:imap_tools 作者: ikvk 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_attachments(self) -> Generator:
        """
        Attachments of the mail message (generator)
        :return: generator of tuple(filename: str, payload: bytes)
        """
        for part in self.obj.walk():
            # multipart/* are just containers
            if part.get_content_maintype() == 'multipart':
                continue
            if part.get('Content-Disposition') is None:
                continue
            filename = part.get_filename()
            if not part.get_filename():
                continue  # this is what happens when Content-Disposition = inline
            filename = self._decode_value(*decode_header(filename)[0])
            payload = part.get_payload(decode=True)
            if not payload:
                continue
            yield filename, payload
addresses.py 文件源码 项目:iota.lib.py 作者: iotaledger 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def create_iterator(self, start=0, step=1):
    # type: (int, int) -> Generator[Address]
    """
    Creates an iterator that can be used to progressively generate new
    addresses.

    :param start:
      Starting index.

      Warning: This method may take awhile to reset if ``start``
      is a large number!

    :param step:
      Number of indexes to advance after each address.

      Warning: The generator may take awhile to advance between
      iterations if ``step`` is a large number!
    """
    key_iterator = (
      KeyGenerator(self.seed)
        .create_iterator(start, step, self.security_level)
    )

    while True:
      yield self._generate_address(key_iterator)
utils.py 文件源码 项目:iota.lib.py 作者: iotaledger 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def iter_used_addresses(adapter, seed, start):
  # type: (BaseAdapter, Seed, int) -> Generator[Tuple[Address, List[TransactionHash]]]
  """
  Scans the Tangle for used addresses.

  This is basically the opposite of invoking ``getNewAddresses`` with
  ``stop=None``.
  """
  ft_command = FindTransactionsCommand(adapter)

  for addy in AddressGenerator(seed).create_iterator(start):
    ft_response = ft_command(addresses=[addy])

    if ft_response['hashes']:
      yield addy, ft_response['hashes']
    else:
      break

    # Reset the command so that we can call it again.
    ft_command.reset()
pautilstest.py 文件源码 项目:pypeerassets 作者: PeerAssets 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_find_deck_spawns(prov):

    if prov == "holy":
        provider = Holy(network="peercoin-testnet")

    if prov == "mintr":
        provider = Mintr(network="peercoin")

    if prov == "cryptoid":
        provider = Cryptoid(network="peercoin")

    try:
        if prov == "rpc":
            provider = RpcNode(testnet=True)
    except:
        print("No RpcNode avaliable.")

    assert isinstance(find_deck_spawns(provider), Generator)
rest_framework.py 文件源码 项目:django_stored_procedures 作者: derfenix 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _generate_conditions(self,
                             filters: Generator[Tuple[str, RawSQLFilter], None, None]) -> Generator[str, None, None]:
        """
        Returns generator, yields raw-sql conditions strings

        E.g. 'field_name >= %s`

        :param filters: Generator with filter's name and `RawSQLFilter` instance
        """
        for name, filter_ in filters:
            conds_and_values = self._request_filters.get(name)
            if conds_and_values:
                for condition, value in conds_and_values:
                    try:
                        sql = filter_.filter(name, condition, value)
                    except ValidationError as e:
                        raise ValidationError('Exception raised for {}: {}'.format(name, e))
                    yield sql
            elif filter_.default is not None:
                self.params = filter_.default
                yield "{} = %s".format(name)
cli.py 文件源码 项目:mccurse 作者: khardix 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def modpack_file(path: Path) -> Generator[ModPack, None, None]:
    """Context manager for manipulation of existing mod-pack.

    Keyword arguments:
        path: Path to the existing ModPack file, which should be provided.

    Yields:
        ModPack loaded from path. If no exception occurs, the provided modpack
        is written (with changes) back to the file on context exit.
    """

    with path.open(encoding='utf-8', mode='r') as istream:
        mp = ModPack.load(istream)

    yield mp

    with path.open(encoding='utf-8', mode='w') as ostream:
        mp.dump(ostream)
pack.py 文件源码 项目:mccurse 作者: khardix 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def filter_obsoletes(
        self: 'ModPack',
        files: Iterable[File]
    ) -> Generator[File, None, None]:
        """Filter obsolete files.

        Obsolete files are defined as being already installed, or being
        an older version of already installed files.

        Keyword arguments:
            files: Iterable of mod :class:`File`s to filter.

        Yields:
            Original files without the obsoletes.
        """

        for file in files:
            current = self.installed.get(file.mod.id, None)

            if current is None or current.date < file.date:
                yield file
            else:
                continue
pack.py 文件源码 项目:mccurse 作者: khardix 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def orphans(self: 'ModPack', mods: Mapping[int, Mod]=None) -> Generator[File, None, None]:
        """Finds all no longer needed dependencies.

        Keyword arguments:
            mods: Optional mapping of installed mods [default: self.mods].
                The purpose of this parameter is to be able to override
                really installed mods without changing the property directly.

        Yields:
            Orphaned files.
        """

        if mods is None:
            mods = self.mods

        needed = {}
        for file in mods.values():
            needed.update(resolve(file, pool=self.installed))

        # Filter unneeded dependencies
        yield from (
            file for m_id, file in self.dependencies.items()
            if m_id not in needed
        )
models.py 文件源码 项目:aiohttp_json_api 作者: vovanbo 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def populate(comments: Sequence['Comment'], authors: Sequence['People'],
                 count=100) -> Generator['Article', None, None]:
        import mimesis

        aid = mimesis.Numbers()
        article = mimesis.Text()
        answers = list(comments)

        def get_random_answers(max):
            counter = 0
            while answers and counter < max:
                yield answers.pop(random.randint(0, len(answers) - 1))
                counter += 1

        return (
            Article(
                id=aid.between(1, count),
                title=article.title(),
                author=random.choice(authors),
                comments=[c for c in get_random_answers(random.randint(1, 10))]
            )
            for _ in range(count)
        )
ohneio.py 文件源码 项目:ohneio 作者: acatton 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def peek(nbytes=0) -> typing.Generator[_Action, Buffer, bytes]:
    """Read output without consuming it.

    Read but **does not** consume data from the protocol input.

    This is a *non-blocking* primitive, if less data than requested is available,
    less data is returned. It is meant to be used in combination with :func:`~ohneio.wait`, but
    edge cases and smart people can (and most likely will) prove me wrong.

    Args:
        nbytes (:obj:`int`, optional): amount of bytes to read *at most*. ``0`` meaning all bytes.

    Returns:
        bytes: data read from the buffer
    """
    input_ = yield _get_input
    return input_.peek(nbytes)
helpers.py 文件源码 项目:libiocage 作者: iocage 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def exec_iter(
    command: typing.List[str],
    logger: typing.Optional[iocage.lib.Logger.Logger]=None
) -> typing.Generator[str, None, None]:

    process = exec_raw(
        command,
        logger=logger,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        universal_newlines=True
    )

    for stdout_line in iter(process.stdout.readline, ""):
        yield stdout_line

    process.stdout.close()

    return_code = process.wait()
    if return_code:
        raise subprocess.CalledProcessError(return_code, command)
Resource.py 文件源码 项目:libiocage 作者: iocage 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __iter__(
        self
    ) -> typing.Generator[Resource, None, None]:

        for child_dataset in self.dataset.children:

            name = self._get_asset_name_from_dataset(child_dataset)
            if self._filters is not None and \
               self._filters.match_key("name", name) is not True:
                # Skip all jails that do not even match the name
                continue

            # ToDo: Do not load jail if filters do not require to
            resource = self._get_resource_from_dataset(child_dataset)
            if self._filters is not None:
                if self._filters.match_resource(resource):
                    yield resource
list.py 文件源码 项目:libiocage 作者: iocage 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _print_list(
    resources: typing.Generator[
        iocage.lib.Jails.JailsGenerator,
        None,
        None
    ],
    columns: list,
    show_header: bool,
    separator: str=";"
) -> None:

    if show_header is True:
        print(separator.join(columns).upper())

    for resource in resources:
        print(separator.join(_lookup_resource_values(resource, columns)))
list.py 文件源码 项目:libiocage 作者: iocage 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _print_json(
    resources: typing.Generator[
        iocage.lib.Jails.JailsGenerator,
        None,
        None
    ],
    columns: list,
    **json_dumps_args
):

    if "indent" not in json_dumps_args.keys():
        json_dumps_args["indent"] = 2

    if "sort_keys" not in json_dumps_args.keys():
        json_dumps_args["sort_keys"] = True

    output = []

    for resource in resources:
        output.append(dict(zip(
            columns,
            _lookup_resource_values(resource, columns)
        )))

    print(json.dumps(output, **json_dumps_args))
file_hash.py 文件源码 项目:binaryalert 作者: airbnb 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _read_in_chunks(file_object: IO[bytes], chunk_size: int = 2*MB) -> Generator[bytes, None, None]:
    """Read a file in fixed-size chunks (to minimize memory usage for large files).

    Args:
        file_object: An opened file-like object supporting read().
        chunk_size: Max size (in bytes) of each file chunk.

    Yields:
        File chunks, each of size at most chunk_size.
    """
    while True:
        chunk = file_object.read(chunk_size)
        if chunk:
            yield chunk
        else:
            return  # End of file.
replicas.py 文件源码 项目:indy-plenum 作者: hyperledger 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_output(self, limit: int = None) -> Generator:
        if limit is None:
            per_replica = None
        else:
            per_replica = round(limit / self.num_replicas)
            if per_replica == 0:
                logger.debug("{} forcibly setting replica "
                             "message limit to {}"
                             .format(self._node.name,
                                     per_replica))
                per_replica = 1
        for replica in self._replicas:
            num = 0
            while replica.outBox:
                yield replica.outBox.popleft()
                num += 1
                if per_replica and num >= per_replica:
                    break
do.py 文件源码 项目:amino 作者: tek 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def untyped_do(f: Callable[..., Generator[G, B, None]]) -> Callable[..., G]:
    @functools.wraps(f)
    def do_loop(*a: Any, **kw: Any) -> F[B]:
        itr = f(*a, **kw)
        if not isinstance(itr, GeneratorType):
            raise Exception(f'function `{f.__qualname__}` decorated with `do` does not produce a generator')
        init = itr.send(None)
        m = Monad.fatal_for(init)
        @functools.wraps(f)
        def loop(val: B) -> F[B]:
            try:
                return m.flat_map(itr.send(val), loop)
            except StopIteration:
                return m.pure(val)
        return m.flat_map(init, loop)
    return do_loop
lazy_list.py 文件源码 项目:amino 作者: tek 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def _drain_find(self, abort: Callable[[A], bool]) -> Maybe[A]:
        culprit = Empty()
        def gen() -> Generator:
            nonlocal culprit
            while True:
                try:
                    el = next(self.source)
                    yield el
                    if abort(el):
                        culprit = Just(el)
                        break
                except StopIteration:
                    break
        drained = List.wrap(list(gen()))
        self.strict = self.strict + drained
        return culprit


问题


面经


文章

微信
公众号

扫码关注公众号