def add_time (self, t):
bisect.insort (self.times, t)
python类insort()的实例源码
def add (self, k, evt):
if k in self.k2e:
self.k2e[k].append (evt)
else:
self.k2e[k] = [evt]
bisect.insort (self.k, k)
def _validate(self):
super(ComputableTerm, self)._validate()
if self.inputs is NotSpecified:
raise TermInputsNotSpecified(termname=type(self).__name__)
if self.outputs is NotSpecified:
pass
elif not self.outputs:
raise TermOutputsEmpty(termname=type(self).__name__)
else:
# Raise an exception if there are any naming conflicts between the
# term's output names and certain attributes.
disallowed_names = [
attr for attr in dir(ComputableTerm)
if not attr.startswith('_')
]
# The name 'compute' is an added special case that is disallowed.
# Use insort to add it to the list in alphabetical order.
insort(disallowed_names, 'compute')
for output in self.outputs:
if output.startswith('_') or output in disallowed_names:
raise InvalidOutputName(
output_name=output,
termname=type(self).__name__,
disallowed_names=disallowed_names,
)
if self.window_length is NotSpecified:
raise WindowLengthNotSpecified(termname=type(self).__name__)
if self.mask is NotSpecified:
# This isn't user error, this is a bug in our code.
raise AssertionError("{term} has no mask".format(term=self))
if self.window_length:
for child in self.inputs:
if not child.window_safe:
raise NonWindowSafeInput(parent=self, child=child)
def add(self, item):
bisect.insort(self._items, item)
def build(target, s_ref):
func_shadows = target.shadow_chains[s_ref.func_name]
shadow = ScriptShadow(s_ref.script_func, s_ref.build_args['priority'])
bisect.insort(func_shadows, shadow)
def add_variable(self, var):
self.variables[var.conc_addr] = var
bisect.insort(self.addr_list, var.conc_addr)
def _free(self, block):
# free location and try to merge with neighbours
(arena, start, stop) = block
try:
prev_block = self._stop_to_block[(arena, start)]
except KeyError:
pass
else:
start, _ = self._absorb(prev_block)
try:
next_block = self._start_to_block[(arena, stop)]
except KeyError:
pass
else:
_, stop = self._absorb(next_block)
block = (arena, start, stop)
length = stop - start
try:
self._len_to_seq[length].append(block)
except KeyError:
self._len_to_seq[length] = [block]
bisect.insort(self._lengths, length)
self._start_to_block[(arena, start)] = block
self._stop_to_block[(arena, stop)] = block
def nsmallest(n, iterable):
"""Find the n smallest elements in a dataset.
Equivalent to: sorted(iterable)[:n]
"""
if n < 0:
return []
if hasattr(iterable, '__len__') and n * 10 <= len(iterable):
# For smaller values of n, the bisect method is faster than a minheap.
# It is also memory efficient, consuming only n elements of space.
it = iter(iterable)
result = sorted(islice(it, 0, n))
if not result:
return result
insort = bisect.insort
pop = result.pop
los = result[-1] # los --> Largest of the nsmallest
for elem in it:
if elem < los:
insort(result, elem)
pop()
los = result[-1]
return result
# An alternative approach manifests the whole iterable in memory but
# saves comparisons by heapifying all at once. Also, saves time
# over bisect.insort() which has O(n) data movement time for every
# insertion. Finding the n smallest of an m length iterable requires
# O(m) + O(n log m) comparisons.
h = list(iterable)
heapify(h)
return list(map(heappop, repeat(h, min(n, len(h)))))
# 'heap' is a heap at all indices >= startpos, except possibly for pos. pos
# is the index of a leaf with a possibly out-of-order value. Restore the
# heap invariant.
def queue_command(self, after, cmd, persists=False, unique=True):
log.debug('Queueing command "%s" to execute in %s second(s)', cmd.__name__, after)
timestamp = datetime.datetime.now() + datetime.timedelta(seconds=after)
queued_command = QueuedCommand(after, timestamp, cmd, persists)
if not unique or queued_command not in self.queue:
bisect.insort(self.queue, queued_command)
else:
log.warning('Failed to queue command "%s" because it\'s already queued.', cmd.__name__)
def addIntervalBlock(self, intervalBlock):
assert self.isParentOf(intervalBlock)
bisect.insort(self.intervalBlocks, intervalBlock)
intervalBlock.meterReading = self
def _freeLine(self, offset):
self._file.seek(offset)
self._file.write(b'#')
self._file.flush()
line = self._file.readline()
size = len(line) + 1 # one character was written beforehand
if size > 5:
bisect.insort(self._free_lines, (len(line)+1, offset) )
def _free(self, block):
# free location and try to merge with neighbours
(arena, start, stop) = block
try:
prev_block = self._stop_to_block[(arena, start)]
except KeyError:
pass
else:
start, _ = self._absorb(prev_block)
try:
next_block = self._start_to_block[(arena, stop)]
except KeyError:
pass
else:
_, stop = self._absorb(next_block)
block = (arena, start, stop)
length = stop - start
try:
self._len_to_seq[length].append(block)
except KeyError:
self._len_to_seq[length] = [block]
bisect.insort(self._lengths, length)
self._start_to_block[(arena, start)] = block
self._stop_to_block[(arena, stop)] = block
def _free(self, block):
# free location and try to merge with neighbours
(arena, start, stop) = block
try:
prev_block = self._stop_to_block[(arena, start)]
except KeyError:
pass
else:
start, _ = self._absorb(prev_block)
try:
next_block = self._start_to_block[(arena, stop)]
except KeyError:
pass
else:
_, stop = self._absorb(next_block)
block = (arena, start, stop)
length = stop - start
try:
self._len_to_seq[length].append(block)
except KeyError:
self._len_to_seq[length] = [block]
bisect.insort(self._lengths, length)
self._start_to_block[(arena, start)] = block
self._stop_to_block[(arena, stop)] = block
def add_row(self, member):
if not self.have_member(member):
bisect.insort(self.items, member)
self.refresh_collisions()
self.modelReset.emit()
def _queue_request(self, n):
bisect.insort(self._queue, n)
if not self._in_flight:
self._consume_queue()
def _free(self, block):
# free location and try to merge with neighbours
(arena, start, stop) = block
try:
prev_block = self._stop_to_block[(arena, start)]
except KeyError:
pass
else:
start, _ = self._absorb(prev_block)
try:
next_block = self._start_to_block[(arena, stop)]
except KeyError:
pass
else:
_, stop = self._absorb(next_block)
block = (arena, start, stop)
length = stop - start
try:
self._len_to_seq[length].append(block)
except KeyError:
self._len_to_seq[length] = [block]
bisect.insort(self._lengths, length)
self._start_to_block[(arena, start)] = block
self._stop_to_block[(arena, stop)] = block
def containsNearbyAlmostDuplicate(self, nums, k, t):
"""
:type nums: List[int]
:type k: int
:type t: int
:rtype: bool
"""
import bisect
l = len(nums)
if k < 1 or t < 0 or not nums or l < 2:
return False
k+=1
win=nums[:k]
win.sort()
for i in range(len(win)-1):
if win[i+1]-win[i]<=t:
return True
for x in range(l-k):
win.remove(nums[x])
bisect.insort(win,nums[k+x])
pos=bisect.bisect_left(win,nums[k+x])
if pos>0 and win[pos]-win[pos-1]<=t:
return True
l=len(win)
if pos<l-1 and win[pos+1]-win[pos]<=t:
return True
return False
def bisect_demo():
"""
??????????????
:return:
"""
import bisect
demo_list = [2, 6, 1, 8, 4, 9, 0]
demo_list.sort()
bisect.insort(demo_list, 5)
print demo_list
print bisect.bisect(demo_list, 3)
def addToLookup(entry):
global allHomeEntriesByTime, trips
if entry.tripId in trips:
return
if home_stop.upper() in stops[entry.stopId]['stop_name'].upper():
testExistsIndex = bisect.bisect_left(allHomeEntriesByTime, entry)
if testExistsIndex < len(allHomeEntriesByTime) and allHomeEntriesByTime[testExistsIndex].tripId == entry.tripId:
return
bisect.insort(allHomeEntriesByTime, entry)