python类Generator()的实例源码

rrid.py 文件源码 项目:scibot 作者: SciCrunch 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def make_find_check_resolve_submit(finder: Finder, notSubmittedCheck: Checker, resolver: Resolver, submitter: Submitter) -> Processor:
    def inner(text: str) -> Generator:
        for found in finder(text):
            print(found)
            if notSubmittedCheck(found):
                resolved = resolver(found)
                yield submitter(found, resolved)
    return inner
debug.py 文件源码 项目:python-devtools 作者: samuelcolvin 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _process_args(self, func_ast, code_lines, args, kwargs) -> Generator[DebugArgument, None, None]:  # noqa: C901
        arg_offsets = list(self._get_offsets(func_ast))
        for arg, ast_node, i in zip(args, func_ast.args, range(1000)):
            if isinstance(ast_node, ast.Name):
                yield self.output_class.arg_class(arg, name=ast_node.id)
            elif isinstance(ast_node, self.complex_nodes):
                # TODO replace this hack with astor when it get's round to a new release
                start_line, start_col = arg_offsets[i]

                if i + 1 < len(arg_offsets):
                    end_line, end_col = arg_offsets[i + 1]
                else:
                    end_line, end_col = len(code_lines) - 1, None

                name_lines = []
                for l_ in range(start_line, end_line + 1):
                    start_ = start_col if l_ == start_line else 0
                    end_ = end_col if l_ == end_line else None
                    name_lines.append(
                        code_lines[l_][start_:end_].strip(' ')
                    )
                yield self.output_class.arg_class(arg, name=' '.join(name_lines).strip(' ,'))
            else:
                yield self.output_class.arg_class(arg)

        kw_arg_names = {}
        for kw in func_ast.keywords:
            if isinstance(kw.value, ast.Name):
                kw_arg_names[kw.arg] = kw.value.id
        for name, value in kwargs.items():
            yield self.output_class.arg_class(value, name=name, variable=kw_arg_names.get(name))
prettier.py 文件源码 项目:python-devtools 作者: samuelcolvin 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _format(self, value: Any, indent_current: int, indent_first: bool):
        if indent_first:
            self._stream.write(indent_current * self._c)

        value_repr = repr(value)
        if len(value_repr) <= self._simple_cutoff and not isinstance(value, collections.Generator):
            self._stream.write(value_repr)
        else:
            indent_new = indent_current + self._indent_step
            for t, func in self._type_lookup:
                if isinstance(value, t):
                    func(value, value_repr, indent_current, indent_new)
                    return
            self._format_raw(value, value_repr, indent_current, indent_new)
prettier.py 文件源码 项目:python-devtools 作者: samuelcolvin 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _format_generators(self, value: Generator, value_repr: str, indent_current: int, indent_new: int):
        if self._repr_generators:
            self._stream.write(value_repr)
        else:
            self._stream.write('(\n')
            for v in value:
                self._format(v, indent_new, True)
                self._stream.write(',\n')
            self._stream.write(indent_current * self._c + ')')
meta.py 文件源码 项目:drf-metadata 作者: night-crawler 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_meta(self) -> t.Generator[t.Dict, None, None]:
        """
        :return: generator
        list(metadata_obj.get_meta()) -> [abstract_field_obj1, abstract_field_obj2, ...]
        """
        fields_by_name = OrderedDict()
        for field_data in self.fields:
            fields_by_name[field_data['name']] = field_data

        for k, v_callable in self.__class__.__dict__.items():
            # method `get_field_<NAME>` used for updates later
            if k.startswith('get_field_'):  # get_field_ ????????? ???????????? ????
                continue
            if not k.startswith('get_'):
                continue
            # check dynamic get_%s fields
            # method get_%s must return {'name': '<NAME>'}, where <name> is a real field name
            res = v_callable(self, self.request)
            fields_by_name[res['name']] = res

        fields_order = self.order or fields_by_name.keys()

        for field_name in fields_order:
            field_value = fields_by_name[field_name]
            # method should update field with returned dict
            method = getattr(self, 'get_field_%s' % field_name, None)
            if callable(method):
                field_value.update(method(field_name, self.request))

            yield field_value

    # noinspection PyMethodMayBeStatic,PyUnusedLocal
test_restapiplugin.py 文件源码 项目:fauxmo-plugins 作者: n8henrie 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def restapiplugin_target() -> Generator:
    """Simulate the endpoints triggered by RESTAPIPlugin."""
    fauxmo_device = Process(target=httpbin.core.app.run,
                            kwargs={"host": "127.0.0.1", "port": 8000},
                            daemon=True)

    fauxmo_device.start()
    time.sleep(1)

    yield

    fauxmo_device.terminate()
    fauxmo_device.join()
rendezvous.py 文件源码 项目:proxenos 作者: darvid 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def srand(seed=0):
    # type: (KeyType) -> typing.Generator[int, None, None]
    if isinstance(seed, six.string_types) or isinstance(seed, bytes):
        if isinstance(seed, six.text_type):
            seed = seed.encode('utf-8')
        seed_int = int(hashlib.sha512(seed).hexdigest(), 16)
        seed = typing.cast(int, seed_int)
    rng = random.Random(seed)
    while True:
        yield rng.randint(0, sys.maxsize)
pipelines.py 文件源码 项目:datapipelines-python 作者: meraki-analytics 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _get_many_generator(self, result: Iterable[S], context: PipelineContext = None) -> Generator[T, None, None]:
        for item in result:
            LOGGER.info("Sending item \"{item}\" to sinks before converting".format(item=item))
            for sink in self._before_transform:
                sink.put(item, context)

            LOGGER.info("Converting item \"{item}\" to request type".format(item=item))
            item = self._transform(data=item, context=context)

            LOGGER.info("Sending item \"{item}\" to sinks after converting".format(item=item))
            for sink in self._after_transform:
                sink.put(item, context)

            yield item
test_pipelines.py 文件源码 项目:datapipelines-python 作者: meraki-analytics 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_many_int(self, query: Mapping[str, Any], context: PipelineContext = None) -> Generator[int, None, None]:
        value = query.get(VALUE_KEY)
        count = query.get(COUNT_KEY)

        try:
            value = int(value)
        except ValueError:
            raise NotFoundError("Couldn't cast the query value to \"int\"")

        return (value for _ in range(count))
test_pipelines.py 文件源码 项目:datapipelines-python 作者: meraki-analytics 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_many_float(self, query: Mapping[str, Any], context: PipelineContext = None) -> Generator[float, None, None]:
        value = query.get(VALUE_KEY)
        count = query.get(COUNT_KEY)

        try:
            value = float(value)
        except ValueError:
            raise NotFoundError("Couldn't cast the query value to \"float\"")

        if value not in self.items:
            raise NotFoundError("Query value wasn't in store!")

        return (value for _ in range(count))
test_sources.py 文件源码 项目:datapipelines-python 作者: meraki-analytics 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_many(self, type: Type[T], query: Mapping[str, Any], context: PipelineContext = None) -> Generator[T, None, None]:
        value = query.get(VALUE_KEY)
        count = query.get(COUNT_KEY)

        try:
            # noinspection PyCallingNonCallable
            value = type(value)
        except ValueError:
            raise NotFoundError("Couldn't cast the query value to \"{type}\"".format(type=type))

        return (value for _ in range(count))
test_sources.py 文件源码 项目:datapipelines-python 作者: meraki-analytics 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_many_int(self, query: Mapping[str, Any], context: PipelineContext = None) -> Generator[int, None, None]:
        value = query.get(VALUE_KEY)
        count = query.get(COUNT_KEY)

        try:
            value = int(value)
        except ValueError:
            raise NotFoundError("Couldn't cast the query value to \"int\"")

        return (value for _ in range(count))
test_sources.py 文件源码 项目:datapipelines-python 作者: meraki-analytics 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_many_str(self, query: Mapping[str, Any], context: PipelineContext = None) -> Generator[str, None, None]:
        value = query.get(VALUE_KEY)
        count = query.get(COUNT_KEY)

        try:
            value = str(value)
        except ValueError:
            raise NotFoundError("Couldn't cast the query value to \"str\"")

        return (value for _ in range(count))


########################
# Unsupported Function #
########################
data_iterator.py 文件源码 项目:allennlp 作者: allenai 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __call__(self,
                 dataset: Dataset,
                 num_epochs: int = None,
                 shuffle: bool = True,
                 cuda_device: int = -1,
                 for_training: bool = True) -> Generator[Dict[str, Union[numpy.ndarray,
                                                                         Dict[str, numpy.ndarray]]],
                                                         None, None]:
        """
        Returns a generator that yields batches over the given dataset, forever.

        Parameters
        ----------
        dataset : ``Dataset``
        num_epochs : ``int``, optional (default=``None``)
            How times should we iterate over this dataset?  If ``None``, we will iterate over it
            forever.
        shuffle : ``bool``, optional (default=``True``)
            If ``True``, we will shuffle the instances in ``dataset`` before constructing batches
            and iterating over the data.
        cuda_device : ``int``
            If cuda_device >= 0, GPUs are available and Pytorch was compiled with CUDA support, the
            tensor will be copied to the cuda_device specified.
        for_training : ``bool``, optional (default=``True``)
            If ``False``, we will pass the ``volatile=True`` flag when constructing variables,
            which disables gradient computations in the graph.  This makes inference more efficient
            (particularly in memory usage), but is incompatible with training models.
        """
        if num_epochs is None:
            while True:
                yield from self._yield_one_epoch(dataset, shuffle, cuda_device, for_training)
        else:
            for _ in range(num_epochs):
                yield from self._yield_one_epoch(dataset, shuffle, cuda_device, for_training)
mission.py 文件源码 项目:EMFT 作者: 132nd-etcher 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_clients_groups(self) -> typing.Generator['FlyingUnit', None, None]:
        for group in self.groups:
            assert isinstance(group, Group)
            if group.group_is_client_group:
                yield group
mission.py 文件源码 项目:EMFT 作者: 132nd-etcher 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def farps(self) -> typing.Generator['Static', None, None]:
        for coa in [self.blue_coa, self.red_coa]:
            for farp in coa.farps:
                yield farp


# noinspection PyProtectedMember
mission.py 文件源码 项目:EMFT 作者: 132nd-etcher 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def countries(self) -> typing.Generator['Country', None, None]:
        for k in self._section_country:
            if k not in self._countries.keys():
                country = Country(self.d, self.l10n, self.coa_color, k)
                self._countries[k] = country
                self._countries_by_id[country.country_id] = country
                self._countries_by_name[country.country_name] = country
            yield self._countries[k]
mission.py 文件源码 项目:EMFT 作者: 132nd-etcher 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def groups(self) -> typing.Generator['Group', None, None]:
        for country in self.countries:
            assert isinstance(country, Country)
            for group in country.groups:
                assert isinstance(group, Group)
                yield group
mission.py 文件源码 项目:EMFT 作者: 132nd-etcher 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def statics(self) -> typing.Generator['Static', None, None]:
        for country in self.countries:
            assert isinstance(country, Country)
            for static in country.statics:
                assert isinstance(static, Static)
                yield static
mission.py 文件源码 项目:EMFT 作者: 132nd-etcher 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_groups_from_category(self, category) -> typing.Generator['Group', None, None]:
        Mission.validator_group_category.validate(category, 'get_groups_from_category')
        for group in self.groups:
            assert isinstance(group, Group)
            if group.group_category == category:
                yield group


问题


面经


文章

微信
公众号

扫码关注公众号