def list_voices(access_key, secret_key, voice_language, voice_gender):
"""List available Ivona voices"""
try:
ivona_api = IvonaAPI(access_key, secret_key)
except (ValueError, IvonaAPIException) as e:
raise click.ClickException("Something went wrong: {}".format(repr(e)))
click.echo("Listing available voices...")
voices_list = ivona_api.get_available_voices(
language=voice_language,
gender=voice_gender,
)
# Group voices by language
voices_dict = dict()
data = sorted(voices_list, key=lambda x: x['Language'])
for k, g in groupby(data, key=lambda x: x['Language']):
voices_dict[k] = list(g)
for ln, voices in voices_dict.items():
voice_names = [v['Name'] for v in voices]
click.echo("{}: {}".format(ln, ', '.join(voice_names)))
click.secho("All done", fg='green')
python类groupby()的实例源码
def get_forums(query_result, user):
"""Returns a tuple which contains the category and the forums as list.
This is the counterpart for get_categories_and_forums and especially
usefull when you just need the forums for one category.
For example::
(<Category 2>,
[(<Forum 3>, None),
(<Forum 4>, None)])
:param query_result: A tuple (KeyedTuple) with all categories and forums
:param user: The user object is needed because a signed out user does not
have the ForumsRead relation joined.
"""
it = itertools.groupby(query_result, operator.itemgetter(0))
if user.is_authenticated:
for key, value in it:
forums = key, [(item[1], item[2]) for item in value]
else:
for key, value in it:
forums = key, [(item[1], None) for item in value]
return forums
def group_battles(self, cache):
list = sorted(cache.values(), key=get_match_id)
grouped = groupby(list, lambda x: x['match_id'])
groups = []
for key, group in grouped:
matches = []
battle = {}
first = True
for item in group:
if first:
first = False
battle['guild'] = item['op_guild']
battle['type'] = item['type']
battle['match_id'] = item['match_id']
matches.append(item)
battle['matches'] = matches
groups.append(battle)
return groups
def group_articles_by_status(articles):
"""
Group articles by publish status
:param articles: Iterable of Article objects
:returns: Iterable like itertools.groupby with a key as the publish_status
and a list of articles for that status
"""
def status_key(a):
if a.publish_status == PUBLISHED:
cnt = 1
elif a.publish_status == IN_REVIEW:
cnt = 2
elif a.publish_status == DRAFT:
cnt = 3
else:
cnt = 4
return cnt
sorted_by_status = sorted(articles, key=status_key)
return itertools.groupby(sorted_by_status, key=lambda a: a.publish_status)
def get_entry_objects(entry_list):
"""
Retrieve a list of (entry, data_obj) pairs.
"""
# Grouping these together like this just to minimize the number of calls
# to get_database_table.
author_table = dh.get_database_table('authors')
for table_name, group in it.groupby(entry_list, key=lambda x: x.table):
table = dh.get_database_table(table_name)
for entry_obj in group:
data_obj = table[entry_obj.data_id]
# Retrieve the author objects as well
author_objs = [author_table[author_id] for author_id in data_obj.author_ids]
yield (entry_obj, data_obj, author_objs)
def natural_sort_key(cls, value):
"""
This is a sort key to do a "natural" lexographic sort, the string is
broken up into segments of strings and numbers, so that, e.g. `'Str 2'`
will be sorted before `'Str 15'`.
:param value:
The book name as it will be sorted.
:return:
Returns a book name tokenized such that it can be sorted.
"""
o = itertools.groupby(value, key=str.isdigit)
o = ((k, ''.join(g)) for k, g in o)
o = ((int(v) if k else v) for k, v in o)
return tuple(o)
def listenSsl(self, site, ports, ignore=[]):
privateKeyFile = open(self.options.sslPrivateKey, "r")
privateKey = privateKeyFile.read()
privateKeyFile.close()
certificateFile = open(self.options.sslCertificate)
certificate = certificateFile.read()
certificateFile.close()
import twisted.internet.ssl as ssl
cert = ssl.PrivateCertificate.loadPEM(privateKey + certificate)
contextFactory = cert.options()
import itertools
listenPorts = map(lambda x: x[0], itertools.groupby(sorted(ports)))
for port in listenPorts:
if port not in ignore:
reactor.listenSSL(port, site, contextFactory)
def _generate_endpoints(self, targets):
def _target_to_port(item):
_, (listen_port, target_port) = item
return {'port': target_port, 'name': str(listen_port)}
port_with_addrs = [
(p, [e[0] for e in grp])
for p, grp in itertools.groupby(
sorted(targets.items()), _target_to_port)]
return {
'subsets': [
{
'addresses': [
{
'ip': ip,
'targetRef': {
'kind': k_const.K8S_OBJ_POD,
'name': ip,
'namespace': 'default'
}
}
for ip in addrs
],
'ports': [port]
}
for port, addrs in port_with_addrs
]
}
def __str__(self):
lines = []
self.errors.sort(key=lambda e: e.order)
for cls, errors_of_cls in groupby(self.errors, lambda e: e.__class__):
lines.append(cls.head)
lines.extend(e.body() for e in errors_of_cls)
if lines:
return '\n'.join(lines)
def validate_msdos(module, partitions):
"""Validate limitations of MSDOS partition table"""
p_types = [p['type'] for p in partitions]
# NOTE(pas-ha) no more than 4 primary
if p_types.count('primary') > 4:
module.fail_json("Can not create more than 4 primary partitions "
"on a MSDOS partition table.")
if 'extended' in p_types:
# NOTE(pas-ha) only single extended
if p_types.count('extended') > 1:
module.fail_json("Can not create more than single extended "
"partition on a MSDOS partition table.")
allowed = ['primary', 'extended']
if 'logical' in p_types:
allowed.append('logical')
# NOTE(pas-ha) this produces list with subsequent duplicates
# removed
if [k for k, g in itertools.groupby(p_types)] != allowed:
module.fail_json("Incorrect partitions order: for MSDOS, "
"all primary, single extended, all logical")
elif 'logical' in p_types:
# NOTE(pas-ha) logical has sense only with extended
module.fail_json("Logical partition w/o extended one on MSDOS "
"partition table")
# TODO(pas-ha) add more validation, e.g.
# - add idempotency: first check the already existing partitions
# and do not run anything unless really needed, and only what's needed
# - if only change tags - use specific command
# - allow fuzziness in partition sizes when alligment is 'optimal'
# - estimate and validate available space
# - support more units
# - support negative units?
def __str__(self):
lines = []
self.errors.sort(key=lambda e: e.order)
for cls, errors_of_cls in groupby(self.errors, lambda e: e.__class__):
lines.append(cls.head)
lines.extend(e.body() for e in errors_of_cls)
if lines:
return '\n'.join(lines)
def render_template(docs, template):
grouped_docs = {f: list(sorted(list(dl), key=lambda d: d['name']))
for f, dl in groupby(docs, lambda d: d['file'])}
template = JINJA_ENV.get_template(template)
rendered = template.render(documentation=grouped_docs,
trim_blocks=True,
lstrip_blocks=True)
return rendered
def isPermutation(self, values):
from itertools import groupby
frequencies = [len(list(group)) for key, group in groupby(values)]
print(frequencies)
for frequency in frequencies:
if frequency != 1:
return False
return True
def create_structure_ers_from_relations(relations):
"""This function gets structured entity relationship.
Args:
relations (list): List of (:class:`FieldPath` :class:`FieldPath`)
Returns:
Structured ER dict. For example:
{'database_name': {'table_name': {'field_name': ['foreign_database_table_field']}}
A way might be used is
>>> print create_structure_ers_from_relations([(FieldPath('db', 'ac', 'id'), FieldPath('db', 'bc', 'id'))])
{'db': {'ac': {'id': ['db.bc.id']}, {'bc': {'id': ['db.ac.id']}}}}
"""
relations.extend([_[::-1] for _ in relations]) # add reverse
relations = sorted(list(set([tuple(_) for _ in relations])), key=lambda _: _[0].db) # remove duplicate
dbs = {}
for db_key, tb_grp in groupby(relations, key=lambda _: _[0].db): # group by db name
if db_key == '':
continue
tbs = {}
for tb_key, fd_grp in groupby(sorted(list(tb_grp), key=lambda _: _[0].tb), key=lambda _: _[0].tb):
fds = {}
for fd_key, foreign_grp in groupby(sorted(list(fd_grp), key=lambda _: _[0].fd), key=lambda _: _[0].fd):
fds[fd_key] = sorted([str(_[1]) for _ in list(foreign_grp)])
tbs[tb_key] = fds
dbs[db_key] = tbs
return dbs
def compute_largest_specs(history_specs):
"""
Maps a Frequency to the largest HistorySpec at that frequency from an
iterable of HistorySpecs.
"""
return {key: max(group, key=lambda f: f.bar_count)
for key, group in groupby(
sorted(history_specs, key=freq_str_and_bar_count),
key=lambda spec: spec.frequency)}
# tuples to store a change to the shape of a HistoryContainer
def _to_html(row_col_dict, **kwargs):
'''
Args:
row_col_dict: dict with (0, 0, 0, 0) : (Value, Style)
'''
def wrap_tr(offsets):
s = []
nesting_level = row_col_dict[offsets[0]].nesting_level
for offset in offsets:
row_span = offset.end_row - offset.start_row + 1
col_span = offset.end_col - offset.start_col + 1
value = row_col_dict[offset].value
style = row_col_dict[offset].style_wrapper.user_style
style = HTMLWriter.style_to_str(style)
td_attr = dict(
rowspan=row_span,
colspan=col_span, style=style)
if nesting_level > row_col_dict[offset].nesting_level:
# we have encountered a nested table
inner_html = HTMLWriter._to_html(value)
else:
inner_html = value
td = HTMLWriter._wrap_table_element('td', td_attr, inner_html)
s.extend(td)
tr = HTMLWriter._wrap_table_element('tr', {}, ''.join(s))
return tr
trs = []
for _, offsets in groupby(sorted(row_col_dict), key=lambda x: (x[0])):
trs.append(wrap_tr(list(offsets)))
table_attrs = kwargs or dict()
return HTMLWriter._wrap_table_element(
'table',
table_attrs,
''.join(trs))
def get_unannotated_intervals(self):
""" Return a list of Annotation objects corresponding to unannotated regions on the contig """
unannotated_intervals = []
annotation_indicator = np.zeros(len(self.sequence))
for annotation in self.annotations:
annotation_indicator[annotation.contig_match_start:annotation.contig_match_end] = 1
interval_start = 0
for annotated, region_iter in itertools.groupby(annotation_indicator, lambda x: x == 1):
region = list(region_iter)
if not annotated:
feature = vdj_reference.create_dummy_feature(display_name='UNANNOTATED',
region_type='UNANNOTATED',
sequence=None)
unannotated_intervals.append(Annotation(feature=feature,
cigar=None,
score=0,
annotation_length=len(region),
annotation_match_start=0,
annotation_match_end=len(region),
contig_match_start=interval_start,
contig_match_end=interval_start + len(region),
mismatches=[],
))
interval_start += len(region)
return unannotated_intervals
def load_cell_contigs_from_json(json_file, reference_path, group_key, require_high_conf=True):
"""Returns a list of CellContig objects based on annotations in a json.
The json is assumed to contain a list of AnnotatedContigs (in dict form).
The contigs are sorted and grouped by group_key and each such group is put
into a CellContig object.
group_key must be 'barcode' or 'clonotype'
"""
assert group_key in set(['barcode', 'clonotype'])
annotations = load_contig_list_from_json(open(json_file), reference_path)
cell_contigs = []
key_func = lambda x: x.__getattribute__(group_key)
anno_iter = itertools.groupby(sorted(annotations, key=key_func), key=key_func)
for clonotype_name, contig_annotations in anno_iter:
contigs = []
for new_contig in contig_annotations:
# Note, for consensus contigs is_cell=None
if new_contig.is_cell is not False \
and (new_contig.high_confidence or not require_high_conf):
contigs.append(new_contig)
if len(contigs) > 0:
cell_contigs.append(CellContigs(clonotype_name, contigs))
return cell_contigs
def iter_tables(all_stylediff_pairs, # type: List[StyleDiffSourcePairs]
enc='utf-8', # type: str
numhunks=1, # type: int
numlines=2, # type: int
wrapcolumn=0, # type: int
ccmode=CC_PROCESSES # type: str
):
# type: (...) -> Iterator[Tuple[List[str], int, int]]
def left_diff(sdp):
# type: (StyleDiffSourcePairs) -> str
return '\n'.join(set([sdtexts[1] for sdtexts in sdp.keys()]))
def sdkeys(item):
# type: (StyleDiffSourcePairs) -> List[bytes]
return list(item.keys())
idx = 0
grouped_sdpairs = itertools.groupby(all_stylediff_pairs, left_diff)
groups = [] # type: List[CallArgs]
grouped_sdp = sorted([(key, list(pairs)) for key, pairs in grouped_sdpairs])
for sdleft, stylediff_pairs in grouped_sdp:
args_lists = []
for sdpairs in sorted(stylediff_pairs, key=sdkeys):
for from_to_texts, pairs in sorted(sdpairs.items()):
args_lists.append((from_to_texts, pairs, numhunks, numlines, wrapcolumn, idx,
enc))
idx += 1
grouparg = (args_lists, ), {} # type: CallArgs
groups.append(grouparg)
for tidx, tables in enumerate(iter_parallel(calc_diff_groups, groups, ccmode=ccmode)):
yield tables, tidx, len(groups)
def condense_option_values(formatter, styles, condensed):
# type: (CodeFormatter, Iterable[Style], bool) -> List[Style]
# Use idx to prevent sorted to look at unorderable dicts.
triplets = [(keypaths(style), idx, style) for idx, style in enumerate(styles)]
triplets = sorted(triplets)
pairs = [(kp, style) for kp, idx, style in triplets]
if condensed:
equivalents = []
for kpaths, kp_styles in itertools.groupby(pairs, operator.itemgetter(0)):
styles = [kps[1] for kps in kp_styles]
for style in group_consecutive(formatter, styles, condensed):
equivalents.append(style)
else:
equivalents = [style for _, style in pairs]
return equivalents