def find_chars(self, query, start=0, stop=None):
stop = sys.maxsize if stop is None else stop
result_sets = []
for word in tokenize(query):
chars = self.index.get(word)
if chars is None: # shorcut: no such word
result_sets = []
break
result_sets.append(chars)
if not result_sets:
return QueryResult(0, ())
result = functools.reduce(set.intersection, result_sets)
result = sorted(result) # must sort to support start, stop
result_iter = itertools.islice(result, start, stop)
return QueryResult(len(result),
(char for char in result_iter))
python类islice()的实例源码
def __getitem__(self, item):
if self._cache_complete:
return self._cache[item]
elif isinstance(item, slice):
if item.step and item.step < 0:
return list(iter(self))[item]
else:
return list(itertools.islice(self,
item.start or 0,
item.stop or sys.maxsize,
item.step or 1))
elif item >= 0:
gen = iter(self)
try:
for i in range(item+1):
res = advance_iterator(gen)
except StopIteration:
raise IndexError
return res
else:
return list(iter(self))[item]
def nlargest(n, iterable):
"""Find the n largest elements in a dataset.
Equivalent to: sorted(iterable, reverse=True)[:n]
"""
if n < 0:
return []
it = iter(iterable)
result = list(islice(it, n))
if not result:
return result
heapify(result)
_heappushpop = heappushpop
for elem in it:
_heappushpop(result, elem)
result.sort(reverse=True)
return result
def nsmallest(n, iterable):
"""Find the n smallest elements in a dataset.
Equivalent to: sorted(iterable)[:n]
"""
if n < 0:
return []
it = iter(iterable)
result = list(islice(it, n))
if not result:
return result
_heapify_max(result)
_heappushpop = _heappushpop_max
for elem in it:
_heappushpop(result, elem)
result.sort()
return result
# 'heap' is a heap at all indices >= startpos, except possibly for pos. pos
# is the index of a leaf with a possibly out-of-order value. Restore the
# heap invariant.
def get(self, p):
'''
Look for events fulfilling period `p` in cache and if possible, return them.
Otherwise raise a RecordNotExist error.
'''
try:
with self._lock:
# This assumes that new events get straight into the cache
if p.lhs_ts >= self._cache_latest_ts[0] and p.rhs_ts <= HORSEYTIEM.time():
lhs_index = self._cache_latest_ts.bisect_left(p.lhs_ts)
rhs_index = self._cache_latest_ts.bisect_left(p.rhs_ts)
return tuple(islice(self._cache_latest_ev, lhs_index, rhs_index))
except IndexError:
pass
raise RecordNotExistError()
def handleBucketUpdate(self, ind, t):
'''
A bucket has ended. Compare the last prediction to what actially happened.
'''
# Compare the prediction to reality
if self._last_prediction is not None:
metric = self._hist[-1]
#print("%s\t%s" % (self._last_prediction, metric))
if simplifyMet(self._last_prediction) == simplifyMet(metric):
self.stats['predict_correct'] += 1
else:
self.stats['predict_wrong'] += 1
s = deque( islice(self._hist, 1, None) )
prediction = self.tree.predict(s, closest=True)
self._last_prediction = prediction
def test_interval_timer():
t = Timer()
assert not list(t)
t.add('foo', 10, 10)
t.add('boo', 20, 20)
t.add('bar', 30, 30)
result = list(islice(t, 11))
assert result == [(10, 'foo'), (20, 'foo'), (20, 'boo'), (30, 'foo'),
(30, 'bar'), (40, 'foo'), (40, 'boo'), (50, 'foo'),
(60, 'foo'), (60, 'boo'), (60, 'bar')]
def iter_chunks(seq, chunk_size):
it = iter(seq)
while True:
chunk = list(islice(it, chunk_size))
if chunk:
yield chunk
else:
break
def _repr_iterable(self, x, level, left, right, maxiter, trail=''):
n = len(x)
if level <= 0 and n:
s = '...'
else:
newlevel = level - 1
repr1 = self.repr1
pieces = [repr1(elem, newlevel) for elem in islice(x, maxiter)]
if n > maxiter: pieces.append('...')
s = ', '.join(pieces)
if n == 1 and trail: right = trail + right
return '%s%s%s' % (left, s, right)
def repr_dict(self, x, level):
n = len(x)
if n == 0: return '{}'
if level <= 0: return '{...}'
newlevel = level - 1
repr1 = self.repr1
pieces = []
for key in islice(_possibly_sorted(x), self.maxdict):
keyrepr = repr1(key, newlevel)
valrepr = repr1(x[key], newlevel)
pieces.append('%s: %s' % (keyrepr, valrepr))
if n > self.maxdict: pieces.append('...')
s = ', '.join(pieces)
return '{%s}' % (s,)
def posts_by_user(user: User, limit: Optional[int] = None) -> List[Post]:
return list(islice(user_posts[user], limit))
def posts_for_user(user: User, limit: Optional[int] = None) -> List[Post]:
relevant = merge(*[user_posts[u] for u in following[user]], reverse=True)
return list(islice(relevant, limit))
def search(phrase:str, limit: Optional[int] = None) -> List[Post]:
# XXX this could benefit from caching and from preindexing
return list(islice((post for post in posts if phrase in post.text), limit))
def __init__(self, nodes, distance=None):
if distance is None:
self.nodes = itertools.repeat(None)
elif distance == 0:
self.nodes = itertools.repeat(nodes.next())
else:
self.nodes = itertools.islice(nodes, 0, None, distance)
def _pad_finite(iterable, padding, limit):
return tuple(itertools.islice(_pad_infinite(iterable, padding), limit))
def adjacent_pairs(seq):
"""From e0, e1, e2, e3, ... produce (e0,e1), (e1,e2), (e2,e3), ..."""
return zip(seq, itertools.islice(seq, 1, None))
def chunked(iterable, chunksize = 1):
"""
Generator yielding multiple iterables of length 'chunksize'.
Parameters
----------
iterable : iterable
Iterable to be chunked.
chunksize : int, optional
Chunk size.
Yields
------
chunk : iterable
Iterable of size `chunksize`. In special case of iterable not being
divisible by `chunksize`, the last `chunk` might be smaller.
"""
iterable = iter(iterable)
next_chunk = tuple(islice(iterable, chunksize))
while next_chunk:
yield next_chunk
next_chunk = tuple(islice(iterable, chunksize))
def test_AfterOpen(self, ms):
should_trigger = partial(
AfterOpen(minutes=5, hours=1).should_trigger,
env=self.env,
)
for m in islice(ms, 64):
# Check the first 64 minutes of data.
# We use 64 because the offset is from market open
# at 13:30 UTC, meaning the first minute of data has an
# offset of 1.
self.assertFalse(should_trigger(m))
for m in islice(ms, 64, None):
# Check the rest of the day.
self.assertTrue(should_trigger(m))
def infer_barcode_reverse_complement(barcode_whitelist, read_iter):
if barcode_whitelist is None:
return False
reg_valid_count = 0
rc_valid_count = 0
for name, seq, qual in itertools.islice(read_iter, cr_constants.NUM_CHECK_BARCODES_FOR_ORIENTATION):
if seq in barcode_whitelist:
reg_valid_count += 1
if tk_seq.get_rev_comp(seq) in barcode_whitelist:
rc_valid_count += 1
frac_rc = tk_stats.robust_divide(rc_valid_count, rc_valid_count + reg_valid_count)
return frac_rc >= cr_constants.REVCOMP_BARCODE_THRESHOLD
def detect_paired_end_bam(bam_filename):
bam = tk_bam.create_bam_infile(bam_filename)
for read in itertools.islice(bam, 100):
if read.is_read1 or read.is_read2:
return True
return False
bam.close()