def random_partition_iter(iterable, n_splits, random_state=1234):
"""Partition a generator in a random way (should mantain the unbalance)."""
iterable, iterable_ = tee(iterable)
size = iterator_size(iterable_)
part_ids = random_partition(size, n_splits=n_splits,
random_state=random_state)
parts = list()
for p in part_ids:
iterable, iterable_ = tee(iterable)
parts.append(selection_iterator(iterable_, p))
return parts
python类tee()的实例源码
def balanced_split(sequences, bin_sites, n_splits,
random_state=1234):
"""Balanced split over binding/non-binding sequences."""
# find the transcript names of positive and negatives
sequences, sequences_ = tee(sequences)
pos_ids = list()
neg_ids = list()
for i, (attr, _) in enumerate(sequences_):
tr_name = attr['tr_name']
is_binding = bin_sites.get(tr_name, False)
if is_binding:
pos_ids.append(i)
else:
neg_ids.append(i)
random.seed(random_state)
random.shuffle(pos_ids)
random.shuffle(neg_ids)
pos_split_points = \
[int(len(pos_ids) * (float(i) / n_splits)) for i in range(1, n_splits)]
neg_split_points = \
[int(len(neg_ids) * (float(i) / n_splits)) for i in range(1, n_splits)]
parts = list()
for pos, neg in izip(np.split(pos_ids, pos_split_points),
np.split(neg_ids, neg_split_points)):
sequences, sequences_ = tee(sequences)
parts.append(selection_iterator(
sequences_, np.concatenate([pos, neg])))
return parts
def balanced_fraction(sequences, bin_sites, opt_fraction=1.0,
random_state=1234):
"""Balanced sample of sequences (over binding/non-binding)."""
# find the transcript names of positive and negatives
sequences, sequences_ = tee(sequences)
pos_names = list()
neg_names = list()
for attr, _ in sequences_:
tr_name = attr['tr_name']
is_binding = bin_sites.get(tr_name, False)
if is_binding:
pos_names.append(tr_name)
else:
neg_names.append(tr_name)
# sample from positives and negatives
selected = list()
random.seed(random_state)
k_pos = max(1, int(opt_fraction * len(pos_names)))
selected.extend(random.sample(pos_names, k_pos))
k_neg = max(1, int(opt_fraction * len(neg_names)))
selected.extend(random.sample(neg_names, k_neg))
# yield only sequences in selected
for attr, s in sequences:
tr_name = attr['tr_name']
if tr_name in selected:
yield attr, s
def cross_vote(self, sequences, bin_sites, fit_batch_size=500,
pre_batch_size=200, max_splits=100000,
active_learning=False, random_state=1234, n_jobs=-1):
"""2-fold cross fit and vote."""
votes = dict()
part1, part2 = balanced_split(sequences, bin_sites, n_splits=2,
random_state=random_state)
part1, part1_ = tee(part1)
part2, part2_ = tee(part2)
# fold 1
logger.debug("Fold 1")
tr, te = part1, part2
self._fit(tr, bin_sites, fit_batch_size, max_splits, active_learning,
random_state, n_jobs)
part_votes = self.vote(
te, pre_batch_size, max_splits, random_state, n_jobs)
votes.update(part_votes)
# fold 2
logger.debug("Fold 2")
tr, te = part2_, part1_
self._fit(tr, bin_sites, fit_batch_size, max_splits, active_learning,
random_state, n_jobs)
part_votes = self.vote(
te, pre_batch_size, max_splits, random_state, n_jobs)
votes.update(part_votes)
return votes
def parse(self, file, boundary, content_length):
formstream, filestream = tee(
self.parse_parts(file, boundary, content_length), 2)
form = (p[1] for p in formstream if p[0] == 'form')
files = (p[1] for p in filestream if p[0] == 'file')
return self.cls(form), self.cls(files)
def pairwise(iterable):
"s -> (s0,s1), (s1,s2), (s2, s3), ..."
# from the itertools module documentation recipe
a, b = tee(iterable)
next(b, None)
return izip(a, b)
def __init__(self, iterable):
self._a, self._b = tee(iter(iterable), 2)
self._previous = None
self._peeked = self._b.next()
def pairwise(iterable):
a, b = tee(iterable)
next(b, None)
return izip(a, b)
def write(self, bucket, doc_type, rows, primary_key, update=False, as_generator=False):
if primary_key is None or len(primary_key) == 0:
raise ValueError('primary_key cannot be an empty list')
def actions(rows_, doc_type_, primary_key_, update_):
if update_:
for row_ in rows_:
yield {
'_op_type': 'update',
'_index': bucket,
'_type': doc_type_,
'_id': self.generate_doc_id(row_, primary_key_),
'_source': {
'doc': row_,
'doc_as_upsert': True
}
}
else:
for row_ in rows_:
yield {
'_op_type': 'index',
'_index': bucket,
'_type': doc_type_,
'_id': self.generate_doc_id(row_, primary_key_),
'_source': row_
}
iterables = itertools.tee(rows)
actions_iterable = actions(iterables[0], doc_type, primary_key, update)
iter = zip(streaming_bulk(self.__es, actions=actions_iterable), iterables[1])
if as_generator:
for result, row in iter:
yield row
else:
collections.deque(iter, maxlen=0)
self.__es.indices.flush(bucket)
def parse(self, file, boundary, content_length):
formstream, filestream = tee(
self.parse_parts(file, boundary, content_length), 2)
form = (p[1] for p in formstream if p[0] == 'form')
files = (p[1] for p in filestream if p[0] == 'file')
return self.cls(form), self.cls(files)
def pairwise(iterable):
"s -> (s0,s1), (s1,s2), (s2, s3), ..."
now, nxt = tee(iterable)
next(nxt, None)
return izip(now, nxt)
def _pairwise(iterable: Iterable[T]) -> Iterable[Tuple[T, T]]:
a, b = tee(iterable)
next(b, None)
return zip(a, b)
def _usage_for_periods(periods):
"""
Generate a sequence of dictionaries of usage data corresponding to periods,
each of which should be a tuple of (start, end) datetimes, where start is
inclusive and end is exclusive.
Each dictionary in the generated sequence has this form:
{
period: {
start: datetime,
end: datetime,
}
usage: {
registered_users: int,
activated_users: int,
active_users: int,
}
}
"""
rp, ap, periods = itertools.tee(periods, 3)
ir = (registered_users_as_of(end) for start, end in rp)
ia = (count_active_users(*p) for p in ap)
for p, r, active in izip(periods, ir, ia):
start, end = p
registered, activated = r
yield dict(
period=dict(
start=start,
end=end,
),
usage=dict(
registered_users=registered,
activated_users=activated,
active_users=active,
),
)
def parse(self, file, boundary, content_length):
formstream, filestream = tee(
self.parse_parts(file, boundary, content_length), 2)
form = (p[1] for p in formstream if p[0] == 'form')
files = (p[1] for p in filestream if p[0] == 'file')
return self.cls(form), self.cls(files)
def get_next(iterable):
item, next_item = itertools.tee(iterable, 2)
next_item = itertools.islice(next_item, 1, None)
return zip_longest(item, next_item)
def nsmallest(n, iterable, key=None):
"""Find the n smallest elements in a dataset.
Equivalent to: sorted(iterable, key=key)[:n]
"""
# Short-cut for n==1 is to use min() when len(iterable)>0
if n == 1:
it = iter(iterable)
head = list(islice(it, 1))
if not head:
return []
if key is None:
return [min(chain(head, it))]
return [min(chain(head, it), key=key)]
# When n>=size, it's faster to use sorted()
try:
size = len(iterable)
except (TypeError, AttributeError):
pass
else:
if n >= size:
return sorted(iterable, key=key)[:n]
# When key is none, use simpler decoration
if key is None:
it = izip(iterable, count()) # decorate
result = _nsmallest(n, it)
return map(itemgetter(0), result) # undecorate
# General case, slowest method
in1, in2 = tee(iterable)
it = izip(imap(key, in1), count(), in2) # decorate
result = _nsmallest(n, it)
return map(itemgetter(2), result) # undecorate
def nlargest(n, iterable, key=None):
"""Find the n largest elements in a dataset.
Equivalent to: sorted(iterable, key=key, reverse=True)[:n]
"""
# Short-cut for n==1 is to use max() when len(iterable)>0
if n == 1:
it = iter(iterable)
head = list(islice(it, 1))
if not head:
return []
if key is None:
return [max(chain(head, it))]
return [max(chain(head, it), key=key)]
# When n>=size, it's faster to use sorted()
try:
size = len(iterable)
except (TypeError, AttributeError):
pass
else:
if n >= size:
return sorted(iterable, key=key, reverse=True)[:n]
# When key is none, use simpler decoration
if key is None:
it = izip(iterable, count(0,-1)) # decorate
result = _nlargest(n, it)
return map(itemgetter(0), result) # undecorate
# General case, slowest method
in1, in2 = tee(iterable)
it = izip(imap(key, in1), count(0,-1), in2) # decorate
result = _nlargest(n, it)
return map(itemgetter(2), result) # undecorate
def pairwise(it):
a, b = itertools.tee(it)
next(b, None)
return itertools.izip(a, b)
def pairwise(iterable):
a, b = itertools.tee(iterable)
b.next()
return itertools.izip_longest(a, b)
def filters(iterable, *predicates):
"""Filter the iterable on each given predicate.
>>> div_by_two = lambda x: not x % 2
>>> div_by_three = lambda x: not x % 3
>>> twos, threes = filters(range(10), div_by_two, div_by_three)
>>> list(twos)
[0, 2, 4, 6, 8]
>>> list(threes)
[0, 3, 6, 9]
"""
tees = tee(iterable, len(predicates))
return tuple(filter(pred, t) for pred, t in zip(predicates, tees))