def largest_export_versions(n):
"""Creates a filter that keeps the largest n export versions.
Args:
n: number of versions to keep.
Returns:
A filter function that keeps the n largest paths.
"""
def keep(paths):
heap = []
for idx, path in enumerate(paths):
if path.export_version is not None:
heapq.heappush(heap, (path.export_version, idx))
keepers = [paths[i] for _, i in heapq.nlargest(n, heap)]
return sorted(keepers)
return keep
python类heappush()的实例源码
def add_open(self, node, parent=None):
self.m_CurDepth = self.m_CurDepth + 1
node.m_Status = STATUS_OPEN
node.m_Parent = parent
node.m_gScore = self.compute_g(node, parent)
node.m_hScore = self.compute_h(node)
node.m_fScore = node.m_gScore + node.m_hScore
heapq.heappush(self.m_OpenList, (node.m_fScore,node))
if self.m_CurDepth >= self.m_MaxDepth:
goal_node = self.get_goal_node()
if not goal_node in self.m_OpenList:
return True
def accumulate(self, predictions, actuals, num_positives=None):
"""Accumulate the predictions and their ground truth labels.
After the function call, we may call peek_ap_at_n to actually calculate
the average precision.
Note predictions and actuals must have the same shape.
Args:
predictions: a list storing the prediction scores.
actuals: a list storing the ground truth labels. Any value
larger than 0 will be treated as positives, otherwise as negatives.
num_positives = If the 'predictions' and 'actuals' inputs aren't complete,
then it's possible some true positives were missed in them. In that case,
you can provide 'num_positives' in order to accurately track recall.
Raises:
ValueError: An error occurred when the format of the input is not the
numpy 1-D array or the shape of predictions and actuals does not match.
"""
if len(predictions) != len(actuals):
raise ValueError("the shape of predictions and actuals does not match.")
if not num_positives is None:
if not isinstance(num_positives, numbers.Number) or num_positives < 0:
raise ValueError("'num_positives' was provided but it wan't a nonzero number.")
if not num_positives is None:
self._total_positives += num_positives
else:
self._total_positives += numpy.size(numpy.where(actuals > 0))
topk = self._top_n
heap = self._heap
for i in range(numpy.size(predictions)):
if topk is None or len(heap) < topk:
heapq.heappush(heap, (predictions[i], actuals[i]))
else:
if predictions[i] > heap[0][0]: # heap[0] is the smallest
heapq.heappop(heap)
heapq.heappush(heap, (predictions[i], actuals[i]))
def accumulate(self, predictions, actuals, num_positives=None):
"""Accumulate the predictions and their ground truth labels.
After the function call, we may call peek_ap_at_n to actually calculate
the average precision.
Note predictions and actuals must have the same shape.
Args:
predictions: a list storing the prediction scores.
actuals: a list storing the ground truth labels. Any value
larger than 0 will be treated as positives, otherwise as negatives.
num_positives = If the 'predictions' and 'actuals' inputs aren't complete,
then it's possible some true positives were missed in them. In that case,
you can provide 'num_positives' in order to accurately track recall.
Raises:
ValueError: An error occurred when the format of the input is not the
numpy 1-D array or the shape of predictions and actuals does not match.
"""
if len(predictions) != len(actuals):
raise ValueError("the shape of predictions and actuals does not match.")
if not num_positives is None:
if not isinstance(num_positives, numbers.Number) or num_positives < 0:
raise ValueError("'num_positives' was provided but it wan't a nonzero number.")
if not num_positives is None:
self._total_positives += num_positives
else:
self._total_positives += numpy.size(numpy.where(actuals > 0))
topk = self._top_n
heap = self._heap
for i in range(numpy.size(predictions)):
if topk is None or len(heap) < topk:
heapq.heappush(heap, (predictions[i], actuals[i]))
else:
if predictions[i] > heap[0][0]: # heap[0] is the smallest
heapq.heappop(heap)
heapq.heappush(heap, (predictions[i], actuals[i]))
def accumulate(self, predictions, actuals, num_positives=None):
"""Accumulate the predictions and their ground truth labels.
After the function call, we may call peek_ap_at_n to actually calculate
the average precision.
Note predictions and actuals must have the same shape.
Args:
predictions: a list storing the prediction scores.
actuals: a list storing the ground truth labels. Any value
larger than 0 will be treated as positives, otherwise as negatives.
num_positives = If the 'predictions' and 'actuals' inputs aren't complete,
then it's possible some true positives were missed in them. In that case,
you can provide 'num_positives' in order to accurately track recall.
Raises:
ValueError: An error occurred when the format of the input is not the
numpy 1-D array or the shape of predictions and actuals does not match.
"""
if len(predictions) != len(actuals):
raise ValueError("the shape of predictions and actuals does not match.")
if not num_positives is None:
if not isinstance(num_positives, numbers.Number) or num_positives < 0:
raise ValueError("'num_positives' was provided but it wan't a nonzero number.")
if not num_positives is None:
self._total_positives += num_positives
else:
self._total_positives += numpy.size(numpy.where(actuals > 0))
topk = self._top_n
heap = self._heap
for i in range(numpy.size(predictions)):
if topk is None or len(heap) < topk:
heapq.heappush(heap, (predictions[i], actuals[i]))
else:
if predictions[i] > heap[0][0]: # heap[0] is the smallest
heapq.heappop(heap)
heapq.heappush(heap, (predictions[i], actuals[i]))
def initialize_heap(offsets, weights):
all_heaps = []
[n, k] = offsets.shape
for i in range(0,n):
h = []
for j in range(0, k):
heapq.heappush(h, (weights[i, j], offsets[i, j]))
all_heaps.append(h)
return all_heaps
def merge_by_key(bam_filenames, key_func, bam_out):
file_cache = tk_cache.FileHandleCache(mode='rb', open_func=pysam.Samfile)
total_reads = 0
heap = []
for bam_filename in bam_filenames:
try:
bam = file_cache.get(bam_filename)
first_read = bam.next()
heapq.heappush(heap, (key_func(first_read), first_read, bam_filename))
except StopIteration:
pass
while len(heap) > 0:
# Get the minimum item and write it to the bam.
key, read, bam_filename = heapq.heappop(heap)
bam = file_cache.get(bam_filename)
bam_out.write(read)
total_reads += 1
# Get the next read from the source bam we just wrote from
# If that BAM file is out of reads, then we leave that one out
try:
next_read = bam.next()
heapq.heappush(heap, (key_func(next_read), next_read, bam_filename))
except StopIteration:
pass
return total_reads
def enterabs(self, time, priority, action, argument):
"""Enter a new event in the queue at an absolute time.
Returns an ID for the event which can be used to remove it,
if necessary.
"""
event = Event(time, priority, action, argument)
heapq.heappush(self._queue, event)
return event # The ID
def run(self):
"""Execute events until the queue is empty.
When there is a positive delay until the first event, the
delay function is called and the event is left in the queue;
otherwise, the event is removed from the queue and executed
(its action function is called, passing it the argument). If
the delay function returns prematurely, it is simply
restarted.
It is legal for both the delay function and the action
function to to modify the queue or to raise an exception;
exceptions are not caught but the scheduler's state remains
well-defined so run() may be called again.
A questionable hack is added to allow other threads to run:
just after an event is executed, a delay of 0 is executed, to
avoid monopolizing the CPU when other threads are also
runnable.
"""
# localize variable access to minimize overhead
# and to improve thread safety
q = self._queue
delayfunc = self.delayfunc
timefunc = self.timefunc
pop = heapq.heappop
while q:
time, priority, action, argument = checked_event = q[0]
now = timefunc()
if now < time:
delayfunc(time - now)
else:
event = pop(q)
# Verify that the event was not removed or altered
# by another thread after we last looked at q[0].
if event is checked_event:
action(*argument)
delayfunc(0) # Let other threads run
else:
heapq.heappush(q, event)
def _put(self, item, heappush=heapq.heappush):
heappush(self.queue, item)
def report_best_styles(formatter, finished_styles, evaluations, bestofround, metricdesc,
roundnr):
# type: (CodeFormatter, List[AttemptResult], List[AttemptResult], int, str, int) -> None
"""Report the best style and its metric for the round.
Also report the next best styles with their metrics relative to the best style.
"""
attempts = finished_styles[:]
bestofround = max(0, bestofround)
for attempt in heapq.nsmallest(bestofround, evaluations):
heapq.heappush(attempts, attempt)
for idx, attemptresult in enumerate(heapq.nsmallest(bestofround, attempts)):
if idx == 0:
bestresult = attemptresult
bestmsg = '\nBest distance %s round %d: %s' % (metricdesc, roundnr,
attemptresult.distance)
iprint(INFO_USER, cyan(bestmsg))
iprint(INFO_USER, formatter.styletext(attemptresult.formatstyle))
else:
place = '%d. ' % (idx + 1)
m_diff = distdifference(attemptresult.distance, bestresult.distance)
iprint(INFO_USER, yellow('\n%sbest differential distance %s round %d: %s' %
(place, metricdesc, roundnr, m_diff)))
unique_from, unique_to = deep_difference(bestresult.formatstyle,
attemptresult.formatstyle)
text_from = formatter.styletext(style_make(unique_from))
text_to = formatter.styletext(style_make(unique_to))
separator = ' | '
block = alignedblocks(text_from, text_to, separator, color_right=YELLOW)
iprint(INFO_USER, block)
def __init__(self, n=2):
pair = PairGen._Pair(*tuple(0 for _ in range(n)))
self._n = n
self._memo = {}
self._pq = []
# max_sizes, only generate up to this.
self.max_sizes = [-1 for _ in range(n)]
self.continuation = {}
heapq.heappush(self._pq, pair)
def update(self, i):
'''Increment max_sizes for index i
'''
self.max_sizes[i] += 1
if i in self.continuation:
t = self.continuation[i]
pair = PairGen._Pair(*t)
heapq.heappush(self._pq, pair)
del self.continuation[i]
def _create_binary_tree(self):
"""
Create a binary Huffman tree using stored vocabulary word counts. Frequent words
will have shorter binary codes. Called internally from `build_vocab()`.
"""
vocab_size = len(self.vocab)
logger.info("constructing a huffman tree from %i words" % vocab_size)
# build the huffman tree
heap = list(self.vocab.values())
heapq.heapify(heap)
for i in range(vocab_size - 1):
min1, min2 = heapq.heappop(heap), heapq.heappop(heap)
heapq.heappush(heap, Vocab(count=min1.count + min2.count, index=i + vocab_size, left=min1, right=min2))
# recurse over the tree, assigning a binary code to each vocabulary word
if heap:
max_depth, stack = 0, [(heap[0], [], [])]
while stack:
node, codes, points = stack.pop()
if node.index < vocab_size:
# leaf node => store its path from the root
node.code, node.point = codes, points
max_depth = max(len(codes), max_depth)
else:
# inner node => continue recursion
points = np.array(list(points) + [node.index - vocab_size], dtype=int)
stack.append((node.left, np.array(list(codes) + [0], dtype=int), points))
stack.append((node.right, np.array(list(codes) + [1], dtype=int), points))
logger.info("built huffman tree with maximum node depth %i" % max_depth)
def create_component(cls, sketch_layer, parent=None):
if sketch_layer.component:
props = sketch_layer.component.get_react_native_props()
else:
props = dict()
layers = []
for layer in sketch_layer.layers:
if layer.is_shape_group() and layer.has_fills():
dimensions = layer.get_dimensions()
heapq.heappush(layers, (-dimensions['width'] *
dimensions['height'], layer))
fill_style = combine_styles(*layers[0][1].get_fill_styles())
props['backgroundColor'] = fill_style.get('backgroundColor', None)
component = StatusBar(parent=parent, props=props, layer=sketch_layer)
return component
def _add_timer(self, deadline, callback):
callbacks = self._timers.get(deadline)
if callbacks is None:
callbacks = set()
self._timers[deadline] = callbacks
heapq.heappush(self._timers_heap, deadline)
if deadline < self._next_deadline:
self._next_deadline = deadline
callbacks.add(callback)
def __add_candidate(self, candidate):
if candidate is None:
return
if candidate.is_terminal:
self.__result.append(candidate.cell.id())
return
assert candidate.num_children == 0
num_levels = self.__level_mod
if candidate.cell.level() < self.__min_level:
num_levels = 1
num_terminals = self.__expand_children(candidate,
candidate.cell,
num_levels)
if candidate.num_children == 0:
""" Not needed due to GC """
elif not self.__interior_covering \
and num_terminals == 1 << self.__max_children_shift() \
and candidate.cell.level() >= self.__min_level:
candidate.is_terminal = True
self.__add_candidate(candidate)
else:
priority = (
(
(
(candidate.cell.level() <<
self.__max_children_shift()
) + candidate.num_children
) << self.__max_children_shift()
) + num_terminals
)
heapq.heappush(self.__pq, (priority, candidate))
def accumulate(self, predictions, actuals, num_positives=None):
"""Accumulate the predictions and their ground truth labels.
After the function call, we may call peek_ap_at_n to actually calculate
the average precision.
Note predictions and actuals must have the same shape.
Args:
predictions: a list storing the prediction scores.
actuals: a list storing the ground truth labels. Any value
larger than 0 will be treated as positives, otherwise as negatives.
num_positives = If the 'predictions' and 'actuals' inputs aren't complete,
then it's possible some true positives were missed in them. In that case,
you can provide 'num_positives' in order to accurately track recall.
Raises:
ValueError: An error occurred when the format of the input is not the
numpy 1-D array or the shape of predictions and actuals does not match.
"""
if len(predictions) != len(actuals):
raise ValueError("the shape of predictions and actuals does not match.")
if not num_positives is None:
if not isinstance(num_positives, numbers.Number) or num_positives < 0:
raise ValueError("'num_positives' was provided but it wan't a nonzero number.")
if not num_positives is None:
self._total_positives += num_positives
else:
self._total_positives += numpy.size(numpy.where(actuals > 0))
topk = self._top_n
heap = self._heap
for i in range(numpy.size(predictions)):
if topk is None or len(heap) < topk:
heapq.heappush(heap, (predictions[i], actuals[i]))
else:
if predictions[i] > heap[0][0]: # heap[0] is the smallest
heapq.heappop(heap)
heapq.heappush(heap, (predictions[i], actuals[i]))
def push(self, item, priority):
# FIXME: restored old behaviour to check against old results better
# FIXED: restored to stable behaviour
entry = (priority, self.count, item)
# entry = (priority, item)
heapq.heappush(self.heap, entry)
self.count += 1
def _put(self, item):
heapq.heappush(self._queue, item)