def make_find_check_resolve_submit(finder: Finder, notSubmittedCheck: Checker, resolver: Resolver, submitter: Submitter) -> Processor:
def inner(text: str) -> Generator:
for found in finder(text):
print(found)
if notSubmittedCheck(found):
resolved = resolver(found)
yield submitter(found, resolved)
return inner
python类Generator()的实例源码
def _process_args(self, func_ast, code_lines, args, kwargs) -> Generator[DebugArgument, None, None]: # noqa: C901
arg_offsets = list(self._get_offsets(func_ast))
for arg, ast_node, i in zip(args, func_ast.args, range(1000)):
if isinstance(ast_node, ast.Name):
yield self.output_class.arg_class(arg, name=ast_node.id)
elif isinstance(ast_node, self.complex_nodes):
# TODO replace this hack with astor when it get's round to a new release
start_line, start_col = arg_offsets[i]
if i + 1 < len(arg_offsets):
end_line, end_col = arg_offsets[i + 1]
else:
end_line, end_col = len(code_lines) - 1, None
name_lines = []
for l_ in range(start_line, end_line + 1):
start_ = start_col if l_ == start_line else 0
end_ = end_col if l_ == end_line else None
name_lines.append(
code_lines[l_][start_:end_].strip(' ')
)
yield self.output_class.arg_class(arg, name=' '.join(name_lines).strip(' ,'))
else:
yield self.output_class.arg_class(arg)
kw_arg_names = {}
for kw in func_ast.keywords:
if isinstance(kw.value, ast.Name):
kw_arg_names[kw.arg] = kw.value.id
for name, value in kwargs.items():
yield self.output_class.arg_class(value, name=name, variable=kw_arg_names.get(name))
def _format(self, value: Any, indent_current: int, indent_first: bool):
if indent_first:
self._stream.write(indent_current * self._c)
value_repr = repr(value)
if len(value_repr) <= self._simple_cutoff and not isinstance(value, collections.Generator):
self._stream.write(value_repr)
else:
indent_new = indent_current + self._indent_step
for t, func in self._type_lookup:
if isinstance(value, t):
func(value, value_repr, indent_current, indent_new)
return
self._format_raw(value, value_repr, indent_current, indent_new)
def _format_generators(self, value: Generator, value_repr: str, indent_current: int, indent_new: int):
if self._repr_generators:
self._stream.write(value_repr)
else:
self._stream.write('(\n')
for v in value:
self._format(v, indent_new, True)
self._stream.write(',\n')
self._stream.write(indent_current * self._c + ')')
def get_meta(self) -> t.Generator[t.Dict, None, None]:
"""
:return: generator
list(metadata_obj.get_meta()) -> [abstract_field_obj1, abstract_field_obj2, ...]
"""
fields_by_name = OrderedDict()
for field_data in self.fields:
fields_by_name[field_data['name']] = field_data
for k, v_callable in self.__class__.__dict__.items():
# method `get_field_<NAME>` used for updates later
if k.startswith('get_field_'): # get_field_ ????????? ???????????? ????
continue
if not k.startswith('get_'):
continue
# check dynamic get_%s fields
# method get_%s must return {'name': '<NAME>'}, where <name> is a real field name
res = v_callable(self, self.request)
fields_by_name[res['name']] = res
fields_order = self.order or fields_by_name.keys()
for field_name in fields_order:
field_value = fields_by_name[field_name]
# method should update field with returned dict
method = getattr(self, 'get_field_%s' % field_name, None)
if callable(method):
field_value.update(method(field_name, self.request))
yield field_value
# noinspection PyMethodMayBeStatic,PyUnusedLocal
def restapiplugin_target() -> Generator:
"""Simulate the endpoints triggered by RESTAPIPlugin."""
fauxmo_device = Process(target=httpbin.core.app.run,
kwargs={"host": "127.0.0.1", "port": 8000},
daemon=True)
fauxmo_device.start()
time.sleep(1)
yield
fauxmo_device.terminate()
fauxmo_device.join()
def srand(seed=0):
# type: (KeyType) -> typing.Generator[int, None, None]
if isinstance(seed, six.string_types) or isinstance(seed, bytes):
if isinstance(seed, six.text_type):
seed = seed.encode('utf-8')
seed_int = int(hashlib.sha512(seed).hexdigest(), 16)
seed = typing.cast(int, seed_int)
rng = random.Random(seed)
while True:
yield rng.randint(0, sys.maxsize)
def _get_many_generator(self, result: Iterable[S], context: PipelineContext = None) -> Generator[T, None, None]:
for item in result:
LOGGER.info("Sending item \"{item}\" to sinks before converting".format(item=item))
for sink in self._before_transform:
sink.put(item, context)
LOGGER.info("Converting item \"{item}\" to request type".format(item=item))
item = self._transform(data=item, context=context)
LOGGER.info("Sending item \"{item}\" to sinks after converting".format(item=item))
for sink in self._after_transform:
sink.put(item, context)
yield item
def get_many_int(self, query: Mapping[str, Any], context: PipelineContext = None) -> Generator[int, None, None]:
value = query.get(VALUE_KEY)
count = query.get(COUNT_KEY)
try:
value = int(value)
except ValueError:
raise NotFoundError("Couldn't cast the query value to \"int\"")
return (value for _ in range(count))
def get_many_float(self, query: Mapping[str, Any], context: PipelineContext = None) -> Generator[float, None, None]:
value = query.get(VALUE_KEY)
count = query.get(COUNT_KEY)
try:
value = float(value)
except ValueError:
raise NotFoundError("Couldn't cast the query value to \"float\"")
if value not in self.items:
raise NotFoundError("Query value wasn't in store!")
return (value for _ in range(count))
def get_many(self, type: Type[T], query: Mapping[str, Any], context: PipelineContext = None) -> Generator[T, None, None]:
value = query.get(VALUE_KEY)
count = query.get(COUNT_KEY)
try:
# noinspection PyCallingNonCallable
value = type(value)
except ValueError:
raise NotFoundError("Couldn't cast the query value to \"{type}\"".format(type=type))
return (value for _ in range(count))
def get_many_int(self, query: Mapping[str, Any], context: PipelineContext = None) -> Generator[int, None, None]:
value = query.get(VALUE_KEY)
count = query.get(COUNT_KEY)
try:
value = int(value)
except ValueError:
raise NotFoundError("Couldn't cast the query value to \"int\"")
return (value for _ in range(count))
def get_many_str(self, query: Mapping[str, Any], context: PipelineContext = None) -> Generator[str, None, None]:
value = query.get(VALUE_KEY)
count = query.get(COUNT_KEY)
try:
value = str(value)
except ValueError:
raise NotFoundError("Couldn't cast the query value to \"str\"")
return (value for _ in range(count))
########################
# Unsupported Function #
########################
def __call__(self,
dataset: Dataset,
num_epochs: int = None,
shuffle: bool = True,
cuda_device: int = -1,
for_training: bool = True) -> Generator[Dict[str, Union[numpy.ndarray,
Dict[str, numpy.ndarray]]],
None, None]:
"""
Returns a generator that yields batches over the given dataset, forever.
Parameters
----------
dataset : ``Dataset``
num_epochs : ``int``, optional (default=``None``)
How times should we iterate over this dataset? If ``None``, we will iterate over it
forever.
shuffle : ``bool``, optional (default=``True``)
If ``True``, we will shuffle the instances in ``dataset`` before constructing batches
and iterating over the data.
cuda_device : ``int``
If cuda_device >= 0, GPUs are available and Pytorch was compiled with CUDA support, the
tensor will be copied to the cuda_device specified.
for_training : ``bool``, optional (default=``True``)
If ``False``, we will pass the ``volatile=True`` flag when constructing variables,
which disables gradient computations in the graph. This makes inference more efficient
(particularly in memory usage), but is incompatible with training models.
"""
if num_epochs is None:
while True:
yield from self._yield_one_epoch(dataset, shuffle, cuda_device, for_training)
else:
for _ in range(num_epochs):
yield from self._yield_one_epoch(dataset, shuffle, cuda_device, for_training)
def get_clients_groups(self) -> typing.Generator['FlyingUnit', None, None]:
for group in self.groups:
assert isinstance(group, Group)
if group.group_is_client_group:
yield group
def farps(self) -> typing.Generator['Static', None, None]:
for coa in [self.blue_coa, self.red_coa]:
for farp in coa.farps:
yield farp
# noinspection PyProtectedMember
def countries(self) -> typing.Generator['Country', None, None]:
for k in self._section_country:
if k not in self._countries.keys():
country = Country(self.d, self.l10n, self.coa_color, k)
self._countries[k] = country
self._countries_by_id[country.country_id] = country
self._countries_by_name[country.country_name] = country
yield self._countries[k]
def groups(self) -> typing.Generator['Group', None, None]:
for country in self.countries:
assert isinstance(country, Country)
for group in country.groups:
assert isinstance(group, Group)
yield group
def statics(self) -> typing.Generator['Static', None, None]:
for country in self.countries:
assert isinstance(country, Country)
for static in country.statics:
assert isinstance(static, Static)
yield static
def get_groups_from_category(self, category) -> typing.Generator['Group', None, None]:
Mission.validator_group_category.validate(category, 'get_groups_from_category')
for group in self.groups:
assert isinstance(group, Group)
if group.group_category == category:
yield group