def __init__(self,
indent_step=4,
indent_char=' ',
repr_strings=False,
simple_cutoff=10,
width=120,
yield_from_generators=True):
self._indent_step = indent_step
self._c = indent_char
self._repr_strings = repr_strings
self._repr_generators = not yield_from_generators
self._simple_cutoff = simple_cutoff
self._width = width
self._type_lookup = [
(dict, self._format_dict),
(str, self._format_str),
(bytes, self._format_bytes),
(tuple, self._format_tuples),
((list, set, frozenset), self._format_list_like),
(collections.Generator, self._format_generators),
]
python类Generator()的实例源码
def get_meta(self) -> t.Generator[t.Dict, None, None]:
all_fields = self.model._meta.get_fields(
include_parents=self.include_parents, include_hidden=self.include_hidden
)
for f in all_fields:
if f.name in self.exclude:
continue
if self.fields and f.name not in self.fields:
continue
if f.name not in self.fields:
if f.concrete not in self.concrete_in:
continue
if f.auto_created not in self.auto_created_in:
continue
if f.editable not in self.editable_in:
continue
yield self.get_field_meta(f)
def enumerate(self, item: Any, reverse: bool = False) -> Generator[Tuple[int, Any], None, None]:
items = self
if reverse:
max = len(items) - 1
items = reversed(items)
for index, x in enumerate(items):
if x == item:
yield max - index if reverse else index, x
continue
try:
if item in x:
yield max - index if reverse else index, x
except TypeError:
# x doesn't define __contains__
pass
def enumerate(self, item: Any) -> Generator[Tuple[Any, Any], None, None]:
for key, value in self.items():
if key == item:
yield key, value
continue
try:
if item in key:
yield key, value
continue
except TypeError:
# key doesn't define __contains__
pass
if value == item:
yield key, value
continue
try:
if item in value:
yield key, value
continue
except TypeError:
# value doesn't define __contains__
pass
def get_columns(self, table_name: str = None) \
-> 'typing.Generator[md_column.Column, None, None]':
"""
Yields a :class:`.md_column.Column` for each column in the specified table,
or for each column in the schema if no table is specified.
These columns don't point to a :class:`.md_table.Table` since there
might not be one, but accessing __name__ and __tablename__ of the column's
table will still work as expected.
:param table_name: The table to get indexes from, or all tables if omitted
"""
params = {"table_name": table_name}
emitter = self.bind.emit_param
sql = self.bind.dialect.get_column_sql(table_name, emitter=emitter)
cur = await self.transaction.cursor(sql, params)
records = await cur.flatten()
await cur.close()
return self.bind.dialect.transform_rows_to_columns(*records, table_name=table_name)
def get_indexes(self, table_name: str = None) \
-> 'typing.Generator[md_index.Index, None, None]':
"""
Yields a :class:`.md_index.Index` for each index in the specified table,
or for each index in the schema if no table is specified.
These indexes don't point to a :class:`.md_table.Table` since there
might not be one, but they have a table_name attribute.
:param table_name: The table to get indexes from, or all tables if omitted
"""
params = {"table_name": table_name}
emitter = self.bind.emit_param
sql = self.bind.dialect.get_index_sql(table_name, emitter=emitter)
cur = await self.transaction.cursor(sql, params)
records = await cur.flatten()
await cur.close()
return self.bind.dialect.transform_rows_to_indexes(*records, table_name=table_name)
def _get_module_commands(module):
# type: (ast.Module) -> typing.Generator[_EntryPoint, None, None]
"""Yield all Command objects represented by the python module.
Module commands consist of a docopt-style module docstring and a callable
Command class.
Args:
module: An ast.Module object used to retrieve docopt-style commands.
Yields:
Command objects that represent entry points to append to setup.py.
"""
cls = next((n for n in module.body
if isinstance(n, ast.ClassDef) and n.name == 'Command'), None)
if not cls:
return
methods = (n.name for n in cls.body if isinstance(n, ast.FunctionDef))
if '__call__' not in methods:
return
docstring = ast.get_docstring(module)
for commands, _ in usage.parse_commands(docstring):
yield _EntryPoint(commands[0], next(iter(commands[1:]), None), None)
def _get_class_commands(module):
# type: (ast.Module) -> typing.Generator[_EntryPoint, None, None]
"""Yield all Command objects represented by python classes in the module.
Class commands are detected by inspecting all callable classes in the
module for docopt-style docstrings.
Args:
module: An ast.Module object used to retrieve docopt-style commands.
Yields:
Command objects that represent entry points to append to setup.py.
"""
nodes = (n for n in module.body if isinstance(n, ast.ClassDef))
for cls in nodes:
methods = (n.name for n in cls.body if isinstance(n, ast.FunctionDef))
if '__call__' in methods:
docstring = ast.get_docstring(cls)
for commands, _ in usage.parse_commands(docstring):
yield _EntryPoint(commands[0], next(iter(commands[1:]), None),
cls.name)
def _get_function_commands(module):
# type: (ast.Module) -> typing.Generator[_EntryPoint, None, None]
"""Yield all Command objects represented by python functions in the module.
Function commands consist of all top-level functions that contain
docopt-style docstrings.
Args:
module: An ast.Module object used to retrieve docopt-style commands.
Yields:
Command objects that represent entry points to append to setup.py.
"""
nodes = (n for n in module.body if isinstance(n, ast.FunctionDef))
for func in nodes:
docstring = ast.get_docstring(func)
for commands, _ in usage.parse_commands(docstring):
yield _EntryPoint(commands[0], next(iter(commands[1:]), None),
func.name)
def _check_query_words(
self, query: ast.Str, parser: Parser,
) -> Generator[Tuple[int, int, str, type], Any, None]:
for token in parser:
word = token.value
if token.is_keyword or token.is_function_name:
if not word.isupper() and word.upper() not in self.excepted_names:
yield(
query.lineno, query.col_offset,
"Q440 keyword {} is not uppercase".format(word),
type(self),
)
if word.upper() in ABBREVIATED_KEYWORDS:
yield(
query.lineno, query.col_offset,
"Q442 avoid abbreviated keywords, {}".format(word),
type(self),
)
elif token.is_name and (not word.islower() or word.endswith('_')):
yield(
query.lineno, query.col_offset,
"Q441 name {} is not valid, must be snake_case, and cannot "
"end with `_`".format(word),
type(self),
)
def lexicon_iterator(path: str,
vocab_source: Dict[str, int],
vocab_target: Dict[str, int]) -> Generator[Tuple[int, int, float], None, None]:
"""
Yields lines from a translation table of format: src, trg, logprob.
:param path: Path to lexicon file.
:param vocab_source: Source vocabulary.
:param vocab_target: Target vocabulary.
:return: Generator returning tuples (src_id, trg_id, prob).
"""
assert C.UNK_SYMBOL in vocab_source
assert C.UNK_SYMBOL in vocab_target
src_unk_id = vocab_source[C.UNK_SYMBOL]
trg_unk_id = vocab_target[C.UNK_SYMBOL]
with smart_open(path) as fin:
for line in fin:
src, trg, logprob = line.rstrip("\n").split("\t")
prob = np.exp(float(logprob))
src_id = vocab_source.get(src, src_unk_id)
trg_id = vocab_target.get(trg, trg_unk_id)
yield src_id, trg_id, prob
def _uid_str(uid_list: str or [str] or Generator) -> str:
"""
Prepare list of uid for use in commands: delete/copy/move/seen
uid_list can be: str, list, tuple, set, fetch generator
"""
if not uid_list:
raise MailBox.MailBoxUidParamError('uid_list should not be empty')
if type(uid_list) is str:
uid_list = uid_list.split(',')
if inspect.isgenerator(uid_list):
uid_list = [msg.uid for msg in uid_list if msg.uid]
if type(uid_list) not in (list, tuple, set):
raise MailBox.MailBoxUidParamError('Wrong uid_list type: {}'.format(type(uid_list)))
for uid in uid_list:
if type(uid) is not str:
raise MailBox.MailBoxUidParamError('uid {} is not string'.format(str(uid)))
if not uid.strip().isdigit():
raise MailBox.MailBoxUidParamError('Wrong uid: {}'.format(uid))
return ','.join((i.strip() for i in uid_list))
def get_attachments(self) -> Generator:
"""
Attachments of the mail message (generator)
:return: generator of tuple(filename: str, payload: bytes)
"""
for part in self.obj.walk():
# multipart/* are just containers
if part.get_content_maintype() == 'multipart':
continue
if part.get('Content-Disposition') is None:
continue
filename = part.get_filename()
if not part.get_filename():
continue # this is what happens when Content-Disposition = inline
filename = self._decode_value(*decode_header(filename)[0])
payload = part.get_payload(decode=True)
if not payload:
continue
yield filename, payload
def create_iterator(self, start=0, step=1):
# type: (int, int) -> Generator[Address]
"""
Creates an iterator that can be used to progressively generate new
addresses.
:param start:
Starting index.
Warning: This method may take awhile to reset if ``start``
is a large number!
:param step:
Number of indexes to advance after each address.
Warning: The generator may take awhile to advance between
iterations if ``step`` is a large number!
"""
key_iterator = (
KeyGenerator(self.seed)
.create_iterator(start, step, self.security_level)
)
while True:
yield self._generate_address(key_iterator)
def iter_used_addresses(adapter, seed, start):
# type: (BaseAdapter, Seed, int) -> Generator[Tuple[Address, List[TransactionHash]]]
"""
Scans the Tangle for used addresses.
This is basically the opposite of invoking ``getNewAddresses`` with
``stop=None``.
"""
ft_command = FindTransactionsCommand(adapter)
for addy in AddressGenerator(seed).create_iterator(start):
ft_response = ft_command(addresses=[addy])
if ft_response['hashes']:
yield addy, ft_response['hashes']
else:
break
# Reset the command so that we can call it again.
ft_command.reset()
def test_find_deck_spawns(prov):
if prov == "holy":
provider = Holy(network="peercoin-testnet")
if prov == "mintr":
provider = Mintr(network="peercoin")
if prov == "cryptoid":
provider = Cryptoid(network="peercoin")
try:
if prov == "rpc":
provider = RpcNode(testnet=True)
except:
print("No RpcNode avaliable.")
assert isinstance(find_deck_spawns(provider), Generator)
def _generate_conditions(self,
filters: Generator[Tuple[str, RawSQLFilter], None, None]) -> Generator[str, None, None]:
"""
Returns generator, yields raw-sql conditions strings
E.g. 'field_name >= %s`
:param filters: Generator with filter's name and `RawSQLFilter` instance
"""
for name, filter_ in filters:
conds_and_values = self._request_filters.get(name)
if conds_and_values:
for condition, value in conds_and_values:
try:
sql = filter_.filter(name, condition, value)
except ValidationError as e:
raise ValidationError('Exception raised for {}: {}'.format(name, e))
yield sql
elif filter_.default is not None:
self.params = filter_.default
yield "{} = %s".format(name)
def modpack_file(path: Path) -> Generator[ModPack, None, None]:
"""Context manager for manipulation of existing mod-pack.
Keyword arguments:
path: Path to the existing ModPack file, which should be provided.
Yields:
ModPack loaded from path. If no exception occurs, the provided modpack
is written (with changes) back to the file on context exit.
"""
with path.open(encoding='utf-8', mode='r') as istream:
mp = ModPack.load(istream)
yield mp
with path.open(encoding='utf-8', mode='w') as ostream:
mp.dump(ostream)
def filter_obsoletes(
self: 'ModPack',
files: Iterable[File]
) -> Generator[File, None, None]:
"""Filter obsolete files.
Obsolete files are defined as being already installed, or being
an older version of already installed files.
Keyword arguments:
files: Iterable of mod :class:`File`s to filter.
Yields:
Original files without the obsoletes.
"""
for file in files:
current = self.installed.get(file.mod.id, None)
if current is None or current.date < file.date:
yield file
else:
continue
def orphans(self: 'ModPack', mods: Mapping[int, Mod]=None) -> Generator[File, None, None]:
"""Finds all no longer needed dependencies.
Keyword arguments:
mods: Optional mapping of installed mods [default: self.mods].
The purpose of this parameter is to be able to override
really installed mods without changing the property directly.
Yields:
Orphaned files.
"""
if mods is None:
mods = self.mods
needed = {}
for file in mods.values():
needed.update(resolve(file, pool=self.installed))
# Filter unneeded dependencies
yield from (
file for m_id, file in self.dependencies.items()
if m_id not in needed
)
def populate(comments: Sequence['Comment'], authors: Sequence['People'],
count=100) -> Generator['Article', None, None]:
import mimesis
aid = mimesis.Numbers()
article = mimesis.Text()
answers = list(comments)
def get_random_answers(max):
counter = 0
while answers and counter < max:
yield answers.pop(random.randint(0, len(answers) - 1))
counter += 1
return (
Article(
id=aid.between(1, count),
title=article.title(),
author=random.choice(authors),
comments=[c for c in get_random_answers(random.randint(1, 10))]
)
for _ in range(count)
)
def peek(nbytes=0) -> typing.Generator[_Action, Buffer, bytes]:
"""Read output without consuming it.
Read but **does not** consume data from the protocol input.
This is a *non-blocking* primitive, if less data than requested is available,
less data is returned. It is meant to be used in combination with :func:`~ohneio.wait`, but
edge cases and smart people can (and most likely will) prove me wrong.
Args:
nbytes (:obj:`int`, optional): amount of bytes to read *at most*. ``0`` meaning all bytes.
Returns:
bytes: data read from the buffer
"""
input_ = yield _get_input
return input_.peek(nbytes)
def exec_iter(
command: typing.List[str],
logger: typing.Optional[iocage.lib.Logger.Logger]=None
) -> typing.Generator[str, None, None]:
process = exec_raw(
command,
logger=logger,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True
)
for stdout_line in iter(process.stdout.readline, ""):
yield stdout_line
process.stdout.close()
return_code = process.wait()
if return_code:
raise subprocess.CalledProcessError(return_code, command)
def __iter__(
self
) -> typing.Generator[Resource, None, None]:
for child_dataset in self.dataset.children:
name = self._get_asset_name_from_dataset(child_dataset)
if self._filters is not None and \
self._filters.match_key("name", name) is not True:
# Skip all jails that do not even match the name
continue
# ToDo: Do not load jail if filters do not require to
resource = self._get_resource_from_dataset(child_dataset)
if self._filters is not None:
if self._filters.match_resource(resource):
yield resource
def _print_list(
resources: typing.Generator[
iocage.lib.Jails.JailsGenerator,
None,
None
],
columns: list,
show_header: bool,
separator: str=";"
) -> None:
if show_header is True:
print(separator.join(columns).upper())
for resource in resources:
print(separator.join(_lookup_resource_values(resource, columns)))
def _print_json(
resources: typing.Generator[
iocage.lib.Jails.JailsGenerator,
None,
None
],
columns: list,
**json_dumps_args
):
if "indent" not in json_dumps_args.keys():
json_dumps_args["indent"] = 2
if "sort_keys" not in json_dumps_args.keys():
json_dumps_args["sort_keys"] = True
output = []
for resource in resources:
output.append(dict(zip(
columns,
_lookup_resource_values(resource, columns)
)))
print(json.dumps(output, **json_dumps_args))
def _read_in_chunks(file_object: IO[bytes], chunk_size: int = 2*MB) -> Generator[bytes, None, None]:
"""Read a file in fixed-size chunks (to minimize memory usage for large files).
Args:
file_object: An opened file-like object supporting read().
chunk_size: Max size (in bytes) of each file chunk.
Yields:
File chunks, each of size at most chunk_size.
"""
while True:
chunk = file_object.read(chunk_size)
if chunk:
yield chunk
else:
return # End of file.
def get_output(self, limit: int = None) -> Generator:
if limit is None:
per_replica = None
else:
per_replica = round(limit / self.num_replicas)
if per_replica == 0:
logger.debug("{} forcibly setting replica "
"message limit to {}"
.format(self._node.name,
per_replica))
per_replica = 1
for replica in self._replicas:
num = 0
while replica.outBox:
yield replica.outBox.popleft()
num += 1
if per_replica and num >= per_replica:
break
def untyped_do(f: Callable[..., Generator[G, B, None]]) -> Callable[..., G]:
@functools.wraps(f)
def do_loop(*a: Any, **kw: Any) -> F[B]:
itr = f(*a, **kw)
if not isinstance(itr, GeneratorType):
raise Exception(f'function `{f.__qualname__}` decorated with `do` does not produce a generator')
init = itr.send(None)
m = Monad.fatal_for(init)
@functools.wraps(f)
def loop(val: B) -> F[B]:
try:
return m.flat_map(itr.send(val), loop)
except StopIteration:
return m.pure(val)
return m.flat_map(init, loop)
return do_loop
def _drain_find(self, abort: Callable[[A], bool]) -> Maybe[A]:
culprit = Empty()
def gen() -> Generator:
nonlocal culprit
while True:
try:
el = next(self.source)
yield el
if abort(el):
culprit = Just(el)
break
except StopIteration:
break
drained = List.wrap(list(gen()))
self.strict = self.strict + drained
return culprit