python类tee()的实例源码

__init__.py 文件源码 项目:ProtScan 作者: gianlucacorrado 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def random_partition_iter(iterable, n_splits, random_state=1234):
    """Partition a generator in a random way (should mantain the unbalance)."""
    iterable, iterable_ = tee(iterable)
    size = iterator_size(iterable_)
    part_ids = random_partition(size, n_splits=n_splits,
                                random_state=random_state)
    parts = list()
    for p in part_ids:
        iterable, iterable_ = tee(iterable)
        parts.append(selection_iterator(iterable_, p))
    return parts
__init__.py 文件源码 项目:ProtScan 作者: gianlucacorrado 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def balanced_split(sequences, bin_sites, n_splits,
                   random_state=1234):
    """Balanced split over binding/non-binding sequences."""
    # find the transcript names of positive and negatives
    sequences, sequences_ = tee(sequences)
    pos_ids = list()
    neg_ids = list()
    for i, (attr, _) in enumerate(sequences_):
        tr_name = attr['tr_name']
        is_binding = bin_sites.get(tr_name, False)
        if is_binding:
            pos_ids.append(i)
        else:
            neg_ids.append(i)

    random.seed(random_state)
    random.shuffle(pos_ids)
    random.shuffle(neg_ids)

    pos_split_points = \
        [int(len(pos_ids) * (float(i) / n_splits)) for i in range(1, n_splits)]
    neg_split_points = \
        [int(len(neg_ids) * (float(i) / n_splits)) for i in range(1, n_splits)]

    parts = list()
    for pos, neg in izip(np.split(pos_ids, pos_split_points),
                         np.split(neg_ids, neg_split_points)):
        sequences, sequences_ = tee(sequences)
        parts.append(selection_iterator(
            sequences_, np.concatenate([pos, neg])))
    return parts
__init__.py 文件源码 项目:ProtScan 作者: gianlucacorrado 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def balanced_fraction(sequences, bin_sites, opt_fraction=1.0,
                      random_state=1234):
    """Balanced sample of sequences (over binding/non-binding)."""
    # find the transcript names of positive and negatives
    sequences, sequences_ = tee(sequences)
    pos_names = list()
    neg_names = list()
    for attr, _ in sequences_:
        tr_name = attr['tr_name']
        is_binding = bin_sites.get(tr_name, False)
        if is_binding:
            pos_names.append(tr_name)
        else:
            neg_names.append(tr_name)
    # sample from positives and negatives
    selected = list()
    random.seed(random_state)
    k_pos = max(1, int(opt_fraction * len(pos_names)))
    selected.extend(random.sample(pos_names, k_pos))
    k_neg = max(1, int(opt_fraction * len(neg_names)))
    selected.extend(random.sample(neg_names, k_neg))
    # yield only sequences in selected
    for attr, s in sequences:
        tr_name = attr['tr_name']
        if tr_name in selected:
            yield attr, s
model.py 文件源码 项目:ProtScan 作者: gianlucacorrado 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def cross_vote(self, sequences, bin_sites, fit_batch_size=500,
                   pre_batch_size=200, max_splits=100000,
                   active_learning=False, random_state=1234, n_jobs=-1):
        """2-fold cross fit and vote."""
        votes = dict()
        part1, part2 = balanced_split(sequences, bin_sites, n_splits=2,
                                      random_state=random_state)

        part1, part1_ = tee(part1)
        part2, part2_ = tee(part2)

        # fold 1
        logger.debug("Fold 1")
        tr, te = part1, part2
        self._fit(tr, bin_sites, fit_batch_size, max_splits, active_learning,
                  random_state, n_jobs)
        part_votes = self.vote(
            te, pre_batch_size, max_splits, random_state, n_jobs)
        votes.update(part_votes)

        # fold 2
        logger.debug("Fold 2")
        tr, te = part2_, part1_
        self._fit(tr, bin_sites, fit_batch_size, max_splits, active_learning,
                  random_state, n_jobs)
        part_votes = self.vote(
            te, pre_batch_size, max_splits, random_state, n_jobs)
        votes.update(part_votes)
        return votes
formparser.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def parse(self, file, boundary, content_length):
        formstream, filestream = tee(
            self.parse_parts(file, boundary, content_length), 2)
        form = (p[1] for p in formstream if p[0] == 'form')
        files = (p[1] for p in filestream if p[0] == 'file')
        return self.cls(form), self.cls(files)
recipe-577515.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def pairwise(iterable):
    "s -> (s0,s1), (s1,s2), (s2, s3), ..."
    # from the itertools module documentation recipe

    a, b = tee(iterable)
    next(b, None)
    return izip(a, b)
recipe-577196.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, iterable):
        self._a, self._b = tee(iter(iterable), 2)
        self._previous = None
        self._peeked   = self._b.next()
orch.py 文件源码 项目:ravel 作者: ravel-net 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def pairwise(iterable):
    a, b = tee(iterable)
    next(b, None)
    return izip(a, b)
storage.py 文件源码 项目:tableschema-elasticsearch-py 作者: frictionlessdata 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def write(self, bucket, doc_type, rows, primary_key, update=False, as_generator=False):

        if primary_key is None or len(primary_key) == 0:
            raise ValueError('primary_key cannot be an empty list')

        def actions(rows_, doc_type_, primary_key_, update_):
            if update_:
                for row_ in rows_:
                    yield {
                        '_op_type': 'update',
                        '_index': bucket,
                        '_type': doc_type_,
                        '_id': self.generate_doc_id(row_, primary_key_),
                        '_source': {
                            'doc': row_,
                            'doc_as_upsert': True
                        }
                    }
            else:
                for row_ in rows_:
                    yield {
                        '_op_type': 'index',
                        '_index': bucket,
                        '_type': doc_type_,
                        '_id': self.generate_doc_id(row_, primary_key_),
                        '_source': row_
                    }

        iterables = itertools.tee(rows)
        actions_iterable = actions(iterables[0], doc_type, primary_key, update)

        iter = zip(streaming_bulk(self.__es, actions=actions_iterable), iterables[1])

        if as_generator:
            for result, row in iter:
                yield row
        else:
            collections.deque(iter, maxlen=0)

        self.__es.indices.flush(bucket)
formparser.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def parse(self, file, boundary, content_length):
        formstream, filestream = tee(
            self.parse_parts(file, boundary, content_length), 2)
        form = (p[1] for p in formstream if p[0] == 'form')
        files = (p[1] for p in filestream if p[0] == 'file')
        return self.cls(form), self.cls(files)
utils.py 文件源码 项目:TrackToTrip 作者: ruipgil 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def pairwise(iterable):
    "s -> (s0,s1), (s1,s2), (s2, s3), ..."
    now, nxt = tee(iterable)
    next(nxt, None)
    return izip(now, nxt)
pipelines.py 文件源码 项目:datapipelines-python 作者: meraki-analytics 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _pairwise(iterable: Iterable[T]) -> Iterable[Tuple[T, T]]:
    a, b = tee(iterable)
    next(b, None)
    return zip(a, b)
reports.py 文件源码 项目:django-souvenirs 作者: appsembler 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def _usage_for_periods(periods):
    """
    Generate a sequence of dictionaries of usage data corresponding to periods,
    each of which should be a tuple of (start, end) datetimes, where start is
    inclusive and end is exclusive.

    Each dictionary in the generated sequence has this form:

        {
            period: {
                start: datetime,
                end: datetime,
            }
            usage: {
                registered_users: int,
                activated_users: int,
                active_users: int,
            }
        }

    """
    rp, ap, periods = itertools.tee(periods, 3)
    ir = (registered_users_as_of(end) for start, end in rp)
    ia = (count_active_users(*p) for p in ap)
    for p, r, active in izip(periods, ir, ia):
        start, end = p
        registered, activated = r
        yield dict(
            period=dict(
                start=start,
                end=end,
            ),
            usage=dict(
                registered_users=registered,
                activated_users=activated,
                active_users=active,
            ),
        )
formparser.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def parse(self, file, boundary, content_length):
        formstream, filestream = tee(
            self.parse_parts(file, boundary, content_length), 2)
        form = (p[1] for p in formstream if p[0] == 'form')
        files = (p[1] for p in filestream if p[0] == 'file')
        return self.cls(form), self.cls(files)
netcfg.py 文件源码 项目:iosxr-ansible 作者: ios-xr 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_next(iterable):
    item, next_item = itertools.tee(iterable, 2)
    next_item = itertools.islice(next_item, 1, None)
    return zip_longest(item, next_item)
heapq.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def nsmallest(n, iterable, key=None):
    """Find the n smallest elements in a dataset.

    Equivalent to:  sorted(iterable, key=key)[:n]
    """
    # Short-cut for n==1 is to use min() when len(iterable)>0
    if n == 1:
        it = iter(iterable)
        head = list(islice(it, 1))
        if not head:
            return []
        if key is None:
            return [min(chain(head, it))]
        return [min(chain(head, it), key=key)]

    # When n>=size, it's faster to use sorted()
    try:
        size = len(iterable)
    except (TypeError, AttributeError):
        pass
    else:
        if n >= size:
            return sorted(iterable, key=key)[:n]

    # When key is none, use simpler decoration
    if key is None:
        it = izip(iterable, count())                        # decorate
        result = _nsmallest(n, it)
        return map(itemgetter(0), result)                   # undecorate

    # General case, slowest method
    in1, in2 = tee(iterable)
    it = izip(imap(key, in1), count(), in2)                 # decorate
    result = _nsmallest(n, it)
    return map(itemgetter(2), result)                       # undecorate
heapq.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def nlargest(n, iterable, key=None):
    """Find the n largest elements in a dataset.

    Equivalent to:  sorted(iterable, key=key, reverse=True)[:n]
    """

    # Short-cut for n==1 is to use max() when len(iterable)>0
    if n == 1:
        it = iter(iterable)
        head = list(islice(it, 1))
        if not head:
            return []
        if key is None:
            return [max(chain(head, it))]
        return [max(chain(head, it), key=key)]

    # When n>=size, it's faster to use sorted()
    try:
        size = len(iterable)
    except (TypeError, AttributeError):
        pass
    else:
        if n >= size:
            return sorted(iterable, key=key, reverse=True)[:n]

    # When key is none, use simpler decoration
    if key is None:
        it = izip(iterable, count(0,-1))                    # decorate
        result = _nlargest(n, it)
        return map(itemgetter(0), result)                   # undecorate

    # General case, slowest method
    in1, in2 = tee(iterable)
    it = izip(imap(key, in1), count(0,-1), in2)             # decorate
    result = _nlargest(n, it)
    return map(itemgetter(2), result)                       # undecorate
utils.py 文件源码 项目:wltrace 作者: jhshi 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def pairwise(it):
    a, b = itertools.tee(it)
    next(b, None)
    return itertools.izip(a, b)
timetable.py 文件源码 项目:pyconjp-website 作者: pyconjp 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def pairwise(iterable):
    a, b = itertools.tee(iterable)
    b.next()
    return itertools.izip_longest(a, b)
itercools.py 文件源码 项目:phredutils 作者: doctaphred 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def filters(iterable, *predicates):
    """Filter the iterable on each given predicate.

    >>> div_by_two = lambda x: not x % 2
    >>> div_by_three = lambda x: not x % 3
    >>> twos, threes = filters(range(10), div_by_two, div_by_three)
    >>> list(twos)
    [0, 2, 4, 6, 8]
    >>> list(threes)
    [0, 3, 6, 9]
    """
    tees = tee(iterable, len(predicates))
    return tuple(filter(pred, t) for pred, t in zip(predicates, tees))


问题


面经


文章

微信
公众号

扫码关注公众号