def high_degree_nodes(k, G):
if nx.is_directed(G):
my_degree_function = G.out_degree
else:
my_degree_function = G.degree
# the top k nodes to be returned; initialization with first k elements
H = [(my_degree_function(i), i) for i in G.nodes()[0:k]]
hq.heapify(H)
# iterate through the remaining nodes; add to heap if larger than heap root
for i in G.nodes()[k:]:
if my_degree_function(i) > H[0][0]:
hq.heappushpop(H, (my_degree_function(i), i))
return H
# Generator variant of high_degree_nodes(k, G)
# More time-efficient for range calls if k log k > log V
# Generates _all_ sets of i nodes of highest degree, with 1 <= i <= k
# -> Time complexity: O(V log (V))
# -> Memory complexity: Theta(V)
python类heapify()的实例源码
def __next__(self):
try:
self.dt = advance_iterator(self.gen)
except StopIteration:
if self.genlist[0] is self:
heapq.heappop(self.genlist)
else:
self.genlist.remove(self)
heapq.heapify(self.genlist)
def nthUglyNumber(self, n):
"""
:type n: int
:rtype: int
"""
l=[1]
primes=[2,3,5]
heapq.heapify(l)
set1=set(l)
n-=1
while n>0:
z=heapq.heappop(l)
n-=1
for i in primes:
if z*i not in set1:
heapq.heappush(l, z*i)
set1.add(z*i)
return heapq.heappop(l)
def assign_jobs(self):
# Trivial case
if len(self.jobs) <= self.num_workers:
self._assigned_workers = [i for i in range(len(self._jobs))]
self._start_times = [0] * len(self._jobs)
return
# Create Heap
from collections import namedtuple
import heapq
MyThread = namedtuple('MyThread', 'start_time, id')
heap = [MyThread(0, i) for i in range(self._num_workers)]
heapq.heapify(heap)
for i, job in enumerate(self._jobs):
# Read the root contents
sched_thread_id = heap[0].id
sched_thread_start = heap[0].start_time
# Append to output
self._assigned_workers[i] = sched_thread_id
self._start_times[i] = sched_thread_start
# Update heap with next start time
heapq.heapreplace(heap, MyThread(sched_thread_start + job, sched_thread_id))
def convertBST(self, root):
"""
:type root: TreeNode
:rtype: TreeNode
"""
if not root:
return root
max_heap = []
self.makelist_traverse(root, max_heap)
heapq.heapify(max_heap)
prev_val = -heapq.heappop(max_heap)[0]
sum_val = prev_val
for i in range(len(max_heap)):
t = heapq.heappop(max_heap)[1]
cur_val = t.val
if t.val < prev_val:
t.val += sum_val
sum_val += cur_val
return root
358. Rearrange String k Distance Apart.py 文件源码
项目:py-leetcode
作者: coolmich
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def rearrangeString(self, str, k):
"""
:type str: str
:type k: int
:rtype: str
"""
mapi = Counter(list(str))
h = [(-mapi[key], key) for key in mapi]
heapq.heapify(h)
q, res = deque(), []
while h:
cnt, letter = heapq.heappop(h)
if not cnt: break
res.append(letter)
q.append((cnt + 1, letter))
if len(q) >= k:
heapq.heappush(h, q.popleft())
return ''.join(res) if len(res) == len(str) else ''
358_rearrange_string_k_distance_apart.py 文件源码
项目:Machine_Learning_Playground
作者: yao23
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def rearrangeString(self, s, k):
"""
:type s: str
:type k: int
:rtype: str
"""
if k == 0:
return s
h = [(-freq, ch) for (ch, freq) in collections.Counter(s).items()]
heapq.heapify(h)
res = []
while len(res) < len(s):
q = []
for _ in xrange(k):
if len(res) == len(s):
return ''.join(res)
if not h:
return ''
freq, ch = heapq.heappop(h)
res.append(ch)
if freq < -1:
q.append((freq+1, ch))
while q:
heapq.heappush(h, q.pop())
return ''.join(res)
def mergeSort(seqs):
"""
perform merge sort on a list of sorted iterators
"""
queue = []
for s in seqs:
s = assertIsSorted(s)
it = iter(s)
try:
queue.append((it.next(), it.next))
except StopIteration:
pass
heapq.heapify(queue)
while queue:
item, it = queue[0]
yield item
try:
heapq.heapreplace(queue, (it(), it))
except StopIteration:
heapq.heappop(queue)
# ---------------------------------------------------------------------------
def kthSmallest(self, matrix, k):
"""
Use heap
Time complexity: O(klog(n)+nlog(n))
:type matrix: List[List[int]]
:type k: int
:rtype: int
"""
import heapq
# (value, y, x)
# value means item in the matrix
# y means the y coordinate of the item
# x means the x coordinate of the item
h = [(matrix[0][x], 0, x) for x in range(len(matrix[0]))]
heapq.heapify(h)
for i in range(k):
val, y, x = heapq.heappop(h)
if y + 1 < len(matrix):
heapq.heappush(h, (matrix[y+1][x], y+1, x))
return val
def rebuild_heap(self):
'''
?????cancel?timer?????,????
:return:
'''
import heapq
tmp_timer_lst = []
for timer in self.heap.queue:
# ???????cancel ?timer ????????
if timer.canceled:
continue
tmp_timer_lst.append(timer)
# ????,??????????cancel????
self.heap.queue = tmp_timer_lst
heapq.heapify(self.heap.queue)
self.cancel_timer_count = 0
def build_huffman_tree(freqs):
nodes = [HierarchicalSoftmaxNode() for _ in freqs]
heap = list(zip(freqs, nodes))
heapq.heapify(heap)
def pop_initialize(parent, branch):
freq, node = heapq.heappop(heap)
node.parent = parent
node.branch = branch
return freq
idx = len(nodes) - 1
while len(heap) > 1:
idx += 1
node = HierarchicalSoftmaxNode()
nodes.append(node)
freq = pop_initialize(idx, True) + pop_initialize(idx, False)
heapq.heappush(heap, (freq, node))
assert len(heap) == 1
return nodes
def filter_out_loc(self, loc):
old_best = self.get_best_claim()
copy = list(self.heap)
for claim in self.heap:
# this is a problem for roots?
if loc is claim.get_parent().loc:
copy.remove(claim)
claim.set_heap(None)
if loc.gameMap().getLocation(self.site().loc, self.dir) == loc:
self.dir = None
self.heap = copy
heapq.heapify(self.heap)
self.check_heap(old_best)
# # logger.debug("Removed all children from %s that would come to %s" % (self.site().loc, loc ) )
def filter_out_loc(self, loc):
old_best = self.get_best_claim()
copy = list(self.heap)
for claim in self.heap:
# this is a problem for roots?
if loc is claim.get_parent().loc:
copy.remove(claim)
claim.set_heap(None)
if loc.gameMap.getLocation(self.site.loc, self.dir) == loc:
self.dir = None
self.heap = copy
heapq.heapify(self.heap)
self.check_heap(old_best)
# # logger.debug("Removed all children from %s that would come to %s" % (self.site.loc, loc ) )
def filter_out_loc(self, loc):
old_best = self.get_best_claim()
copy = list(self.heap)
for claim in self.heap:
# this is a problem for roots?
if loc is claim.get_parent().loc:
copy.remove(claim)
claim.set_heap(None)
if loc.gameMap().getLocation(self.site().loc, self.dir) == loc:
self.dir = None
self.heap = copy
heapq.heapify(self.heap)
self.check_heap(old_best)
# # logger.debug("Removed all children from %s that would come to %s" % (self.site().loc, loc ) )
def filter_out_loc(self, loc):
old_best = self.get_best_claim()
copy = list(self.heap)
for claim in self.heap:
# this is a problem for roots?
if loc is claim.get_parent().loc:
copy.remove(claim)
claim.set_heap(None)
if loc.gameMap().getLocation(self.site().loc, self.dir) == loc:
self.dir = None
self.heap = copy
heapq.heapify(self.heap)
self.check_heap(old_best)
# # logger.debug("Removed all children from %s that would come to %s" % (self.site().loc, loc ) )
def filter_out_loc(self, loc):
old_best = self.get_best_claim()
copy = list(self.heap)
for claim in self.heap:
# this is a problem for roots?
if loc is claim.get_parent().loc:
copy.remove(claim)
claim.set_heap(None)
if loc.gameMap.getLocation(self.site.loc, self.dir) == loc:
self.dir = None
self.heap = copy
heapq.heapify(self.heap)
self.check_heap(old_best)
# # logger.debug("Removed all children from %s that would come to %s" % (self.site.loc, loc ) )
def filter_out_loc(self, loc):
old_best = self.get_best_claim()
copy = list(self.heap)
for claim in self.heap:
# this is a problem for roots?
if loc is claim.get_parent().loc:
copy.remove(claim)
claim.set_heap(None)
if loc.gameMap.getLocation(self.site.loc, self.dir) == loc:
self.dir = None
self.heap = copy
heapq.heapify(self.heap)
self.check_heap(old_best)
# # logger.debug("Removed all children from %s that would come to %s" % (self.site.loc, loc ) )
def filter_out_loc(self, loc):
old_best = self.get_best_claim()
copy = list(self.heap)
for claim in self.heap:
# this is a problem for roots?
if loc is claim.get_parent().loc:
copy.remove(claim)
claim.set_heap(None)
if loc.gameMap().getLocation(self.site().loc, self.dir) == loc:
self.dir = None
self.heap = copy
heapq.heapify(self.heap)
self.check_heap(old_best)
# logger.debug("Removed all children from %s that would come to %s" % (self.site().loc, loc ) )
def cancel(self, event):
"""Remove an event from the queue.
This must be presented the ID as returned by enter().
If the event is not in the queue, this raises ValueError.
"""
self._queue.remove(event)
heapq.heapify(self._queue)
def _create_binary_tree(self):
"""
Create a binary Huffman tree using stored vocabulary word counts. Frequent words
will have shorter binary codes. Called internally from `build_vocab()`.
"""
vocab_size = len(self.vocab)
logger.info("constructing a huffman tree from %i words" % vocab_size)
# build the huffman tree
heap = list(self.vocab.values())
heapq.heapify(heap)
for i in range(vocab_size - 1):
min1, min2 = heapq.heappop(heap), heapq.heappop(heap)
heapq.heappush(heap, Vocab(count=min1.count + min2.count, index=i + vocab_size, left=min1, right=min2))
# recurse over the tree, assigning a binary code to each vocabulary word
if heap:
max_depth, stack = 0, [(heap[0], [], [])]
while stack:
node, codes, points = stack.pop()
if node.index < vocab_size:
# leaf node => store its path from the root
node.code, node.point = codes, points
max_depth = max(len(codes), max_depth)
else:
# inner node => continue recursion
points = np.array(list(points) + [node.index - vocab_size], dtype=int)
stack.append((node.left, np.array(list(codes) + [0], dtype=int), points))
stack.append((node.right, np.array(list(codes) + [1], dtype=int), points))
logger.info("built huffman tree with maximum node depth %i" % max_depth)
def __delitem__(self, item):
self._remove_from_dict(item)
self.heap = [(v,k) for v,k in self.heap if k != item]
heapq.heapify(self.heap)
def isorted(iterable):
lst = list(iterable)
heapq.heapify(lst)
pop = heapq.heappop
while lst:
yield pop(lst)
def remove(self, task):
"""
Removes the tasks from Queue
Currently it takes O(n) time to find , and O(log n) to remove, making it O(n)
further improvements can be done
:param task: task to removed from the Queue
"""
import heapq
for task_pair in self.heap:
if task_pair[2] == task:
self.heap.remove(task_pair)
heapq.heapify(self.heap)
def run(self, epochs=1):
for q in self.plugin_queues.values():
heapq.heapify(q)
for i in range(1, epochs + 1):
self.train()
self.call_plugins('epoch', i)
def cancel(self, event):
"""Remove an event from the queue.
This must be presented the ID as returned by enter().
If the event is not in the queue, this raises ValueError.
"""
self._queue.remove(event)
heapq.heapify(self._queue)
def huff_code(txt, ab, freq):
l = len(ab)
Q = [Node(i, freq[i]) for i in range(l)]
heapq.heapify(Q)
#for node in Q:
# print(node)
for i in range(l-1):
n1 = heapq.heappop(Q)
n2 = heapq.heappop(Q)
n = Node(-1, n1.freq+n2.freq, n1, n2)
heapq.heappush(Q,n)
return heapq.heappop(Q)
def __init__(self, items=None):
if items is None:
self._items = []
else:
heapq.heapify(items)
self._items = items
def __init__(self, initial=None, key=lambda x:x):
self.__key = key
if initial:
self._data = [(key(item), item) for item in initial]
heapq.heapify(self._data)
else:
self._data = []
def heapify(self, items):
return KeyHeapq(initial=items)
def create_huffman_tree(txt):
q = [TreeNode(c, cnt) for c, cnt in collections.Counter(txt).items()]
heapq.heapify(q)
while len(q) > 1:
a, b = heapq.heappop(q), heapq.heappop(q)
heapq.heappush(q, TreeNode('', a.cnt + b.cnt, a, b))
return q.pop()