python类islice()的实例源码

charfinder.py 文件源码 项目:notebooks 作者: fluentpython 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def find_chars(self, query, start=0, stop=None):
        stop = sys.maxsize if stop is None else stop
        result_sets = []
        for word in tokenize(query):
            chars = self.index.get(word)
            if chars is None:  # shorcut: no such word
                result_sets = []
                break
            result_sets.append(chars)

        if not result_sets:
            return QueryResult(0, ())

        result = functools.reduce(set.intersection, result_sets)
        result = sorted(result)  # must sort to support start, stop
        result_iter = itertools.islice(result, start, stop)
        return QueryResult(len(result),
                           (char for char in result_iter))
rrule.py 文件源码 项目:AshsSDK 作者: thehappydinoa 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __getitem__(self, item):
        if self._cache_complete:
            return self._cache[item]
        elif isinstance(item, slice):
            if item.step and item.step < 0:
                return list(iter(self))[item]
            else:
                return list(itertools.islice(self,
                                             item.start or 0,
                                             item.stop or sys.maxsize,
                                             item.step or 1))
        elif item >= 0:
            gen = iter(self)
            try:
                for i in range(item+1):
                    res = advance_iterator(gen)
            except StopIteration:
                raise IndexError
            return res
        else:
            return list(iter(self))[item]
heapq.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def nlargest(n, iterable):
    """Find the n largest elements in a dataset.

    Equivalent to:  sorted(iterable, reverse=True)[:n]
    """
    if n < 0:
        return []
    it = iter(iterable)
    result = list(islice(it, n))
    if not result:
        return result
    heapify(result)
    _heappushpop = heappushpop
    for elem in it:
        _heappushpop(result, elem)
    result.sort(reverse=True)
    return result
heapq.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def nsmallest(n, iterable):
    """Find the n smallest elements in a dataset.

    Equivalent to:  sorted(iterable)[:n]
    """
    if n < 0:
        return []
    it = iter(iterable)
    result = list(islice(it, n))
    if not result:
        return result
    _heapify_max(result)
    _heappushpop = _heappushpop_max
    for elem in it:
        _heappushpop(result, elem)
    result.sort()
    return result

# 'heap' is a heap at all indices >= startpos, except possibly for pos.  pos
# is the index of a leaf with a possibly out-of-order value.  Restore the
# heap invariant.
_base.py 文件源码 项目:luckyhorse 作者: alexmbird 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get(self, p):
    '''
    Look for events fulfilling period `p` in cache and if possible, return them.
    Otherwise raise a RecordNotExist error.
    '''
    try:
      with self._lock:
        # This assumes that new events get straight into the cache
        if p.lhs_ts >= self._cache_latest_ts[0] and p.rhs_ts <= HORSEYTIEM.time():
          lhs_index  = self._cache_latest_ts.bisect_left(p.lhs_ts)
          rhs_index  = self._cache_latest_ts.bisect_left(p.rhs_ts)
          return tuple(islice(self._cache_latest_ev, lhs_index, rhs_index))
    except IndexError:
      pass

    raise RecordNotExistError()
handlers.py 文件源码 项目:luckyhorse 作者: alexmbird 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def handleBucketUpdate(self, ind, t):
    '''
    A bucket has ended.  Compare the last prediction to what actially happened.
    '''
    # Compare the prediction to reality
    if self._last_prediction is not None:
      metric = self._hist[-1]
      #print("%s\t%s" % (self._last_prediction, metric))
      if simplifyMet(self._last_prediction) == simplifyMet(metric):
        self.stats['predict_correct'] += 1
      else:
        self.stats['predict_wrong'] += 1

    s = deque( islice(self._hist, 1, None) )
    prediction = self.tree.predict(s, closest=True)
    self._last_prediction = prediction
test_sched.py 文件源码 项目:dsq 作者: baverman 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_interval_timer():
    t = Timer()
    assert not list(t)

    t.add('foo', 10, 10)
    t.add('boo', 20, 20)
    t.add('bar', 30, 30)

    result = list(islice(t, 11))
    assert result == [(10, 'foo'), (20, 'foo'), (20, 'boo'), (30, 'foo'),
                      (30, 'bar'), (40, 'foo'), (40, 'boo'), (50, 'foo'),
                      (60, 'foo'), (60, 'boo'), (60, 'bar')]
utils.py 文件源码 项目:dsq 作者: baverman 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def iter_chunks(seq, chunk_size):
    it = iter(seq)
    while True:
        chunk = list(islice(it, chunk_size))
        if chunk:
            yield chunk
        else:
            break
reprlib.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _repr_iterable(self, x, level, left, right, maxiter, trail=''):
        n = len(x)
        if level <= 0 and n:
            s = '...'
        else:
            newlevel = level - 1
            repr1 = self.repr1
            pieces = [repr1(elem, newlevel) for elem in islice(x, maxiter)]
            if n > maxiter:  pieces.append('...')
            s = ', '.join(pieces)
            if n == 1 and trail:  right = trail + right
        return '%s%s%s' % (left, s, right)
reprlib.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def repr_dict(self, x, level):
        n = len(x)
        if n == 0: return '{}'
        if level <= 0: return '{...}'
        newlevel = level - 1
        repr1 = self.repr1
        pieces = []
        for key in islice(_possibly_sorted(x), self.maxdict):
            keyrepr = repr1(key, newlevel)
            valrepr = repr1(x[key], newlevel)
            pieces.append('%s: %s' % (keyrepr, valrepr))
        if n > self.maxdict: pieces.append('...')
        s = ', '.join(pieces)
        return '{%s}' % (s,)
pubsub.py 文件源码 项目:modernpython 作者: rhettinger 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def posts_by_user(user: User, limit: Optional[int] = None) -> List[Post]:
    return list(islice(user_posts[user], limit))
pubsub.py 文件源码 项目:modernpython 作者: rhettinger 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def posts_for_user(user: User, limit: Optional[int] = None) -> List[Post]:
    relevant = merge(*[user_posts[u] for u in following[user]], reverse=True)
    return list(islice(relevant, limit))
pubsub.py 文件源码 项目:modernpython 作者: rhettinger 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def search(phrase:str, limit: Optional[int] = None) -> List[Post]:
    # XXX this could benefit from caching and from preindexing
    return list(islice((post for post in posts if phrase in post.text), limit))
numa.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self, nodes, distance=None):
        if distance is None:
            self.nodes = itertools.repeat(None)
        elif distance == 0:
            self.nodes = itertools.repeat(nodes.next())
        else:
            self.nodes = itertools.islice(nodes, 0, None, distance)
formatting.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _pad_finite(iterable, padding, limit):
    return tuple(itertools.islice(_pad_infinite(iterable, padding), limit))
postulates.py 文件源码 项目:zellij 作者: nedbat 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def adjacent_pairs(seq):
    """From e0, e1, e2, e3, ... produce (e0,e1), (e1,e2), (e2,e3), ..."""
    return zip(seq, itertools.islice(seq, 1, None))
iter_utils.py 文件源码 项目:npstreams 作者: LaurentRDC 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def chunked(iterable, chunksize = 1):
    """
    Generator yielding multiple iterables of length 'chunksize'.

    Parameters
    ----------
    iterable : iterable
        Iterable to be chunked. 
    chunksize : int, optional
        Chunk size. 

    Yields
    ------
    chunk : iterable
        Iterable of size `chunksize`. In special case of iterable not being
        divisible by `chunksize`, the last `chunk` might be smaller.
    """
    iterable = iter(iterable)

    next_chunk = tuple(islice(iterable, chunksize))
    while next_chunk:   
        yield next_chunk
        next_chunk = tuple(islice(iterable, chunksize))
test_events.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_AfterOpen(self, ms):
        should_trigger = partial(
            AfterOpen(minutes=5, hours=1).should_trigger,
            env=self.env,
        )
        for m in islice(ms, 64):
            # Check the first 64 minutes of data.
            # We use 64 because the offset is from market open
            # at 13:30 UTC, meaning the first minute of data has an
            # offset of 1.
            self.assertFalse(should_trigger(m))
        for m in islice(ms, 64, None):
            # Check the rest of the day.
            self.assertTrue(should_trigger(m))
fastq.py 文件源码 项目:cellranger 作者: 10XGenomics 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def infer_barcode_reverse_complement(barcode_whitelist, read_iter):
    if barcode_whitelist is None:
        return False
    reg_valid_count = 0
    rc_valid_count = 0
    for name, seq, qual in itertools.islice(read_iter, cr_constants.NUM_CHECK_BARCODES_FOR_ORIENTATION):
        if seq in barcode_whitelist:
            reg_valid_count += 1
        if tk_seq.get_rev_comp(seq) in barcode_whitelist:
            rc_valid_count += 1

    frac_rc = tk_stats.robust_divide(rc_valid_count, rc_valid_count + reg_valid_count)
    return frac_rc >= cr_constants.REVCOMP_BARCODE_THRESHOLD
utils.py 文件源码 项目:cellranger 作者: 10XGenomics 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def detect_paired_end_bam(bam_filename):
    bam = tk_bam.create_bam_infile(bam_filename)
    for read in itertools.islice(bam, 100):
        if read.is_read1 or read.is_read2:
            return True
    return False
    bam.close()


问题


面经


文章

微信
公众号

扫码关注公众号