如何避免在heapq中使用_siftup或_siftdown
我不知道如何在不使用_siftup
或的情况下有效解决以下问题_siftdown
:
当一个元素发生故障时,如何恢复堆不变式?
换句话说,更新old_value
中heap
至new_value
,并保持heap
工作。您可以假设old_value
堆中只有一个。功能定义如下:
def update_value_in_heap(heap, old_value, new_value):
这是我的真实情况,如果您有兴趣请阅读。
-
您可以想象它是一个小的自动完成系统。我需要计算单词的频率,并保持前k个最大数量的单词,这些单词随时准备输出。所以我用
heap
在这里。当一个单词计数++时,如果它在堆中,我需要对其进行更新。 -
所有的单词和计数都存储在trie-tree的叶子中,而堆
存储在trie-tree的中间节点中。如果您在意
堆外的单词,请放心,我可以从trie-tree的叶子节点获取它。 -
当用户键入一个单词时,它将首先从堆中读取,然后对其进行更新
。为了获得更好的性能,我们可以考虑通过批量更新来减少更新频率。
那么,当一个特定的字数增加时,如何更新堆呢?
这是_siftup或_siftdown版本的简单示例(不是我的情况):
>>> from heapq import _siftup, _siftdown, heapify, heappop
>>> data = [10, 5, 18, 2, 37, 3, 8, 7, 19, 1]
>>> heapify(data)
>>> old, new = 8, 22 # increase the 8 to 22
>>> i = data.index(old)
>>> data[i] = new
>>> _siftup(data, i)
>>> [heappop(data) for i in range(len(data))]
[1, 2, 3, 5, 7, 10, 18, 19, 22, 37]
>>> data = [10, 5, 18, 2, 37, 3, 8, 7, 19, 1]
>>> heapify(data)
>>> old, new = 8, 4 # decrease the 8 to 4
>>> i = data.index(old)
>>> data[i] = new
>>> _siftdown(data, 0, i)
>>> [heappop(data) for i in range(len(data))]
[1, 2, 3, 4, 5, 7, 10, 18, 19, 37]
它花费O(n)进行索引和O(logn)进行更新。heapify
是另一种解决方案,但效率不如_siftup
或_siftdown
。
但是_siftup
和_siftdown
是heapq中的受保护成员,因此不建议它们从外部访问。
那么,有没有更好,更有效的方法来解决这个问题呢?这种情况的最佳做法?
感谢您的阅读,非常感谢您的帮助。:)
已经参考了heapq python-
如何修改对堆进行排序的值,但没有解决我的问题
-
TL; DR 使用
heapify
。您必须牢记的重要一件事是,理论复杂性和性能是两个不同的事物(即使它们是相关的)。换句话说,实现也很重要。渐近复杂度为您提供了一些 下限
,您可以将其视为保证,例如O(n)中的算法可确保在最坏的情况下,您将执行许多与输入大小成线性关系的指令。这里有两件重要的事情:- 常量被忽略,但是常量在现实生活中很重要;
- 最坏的情况取决于您考虑的算法,而不仅取决于输入。
根据要考虑的主题/问题,第一点可能非常重要。在某些域中,隐藏在渐近复杂性中的常量是如此之大,以至于您甚至无法建立比常量大的输入(或者认为输入不现实)。这里不是这种情况,但是您始终必须记住这一点。
给出这两个结论,您不能真正说出: 实现B比A快,因为A源自O(n)算法,而B源自O(log n)算法
。即使从总体上来说这是一个很好的论据,但它并不总是足够的。当所有输入均等可能发生时,理论上的复杂性对于比较算法特别有用。换句话说,您的算法非常通用。如果您知道用例和输入将是什么,则可以直接测试性能。同时使用测试和渐进复杂度,可以使您很好地了解算法的性能(在极端情况和任意实际情况下)。
话虽如此,让我们在下面的类上运行一些性能测试,这些测试将实现三种不同的策略(这里实际上有四种策略,但是
Invalidate and Reinsert 在您的情况下似乎 不合适,
因为您将使每一项无效的时间为您会看到一个给定的单词)。我将包括我的大部分代码,以便您可以仔细检查我是否搞砸了(甚至可以检查完整的笔记本):from heapq import _siftup, _siftdown, heapify, heappop class Heap(list): def __init__(self, values, sort=False, heap=False): super().__init__(values) heapify(self) self._broken = False self.sort = sort self.heap = heap or not sort # Solution 1) repair using the knowledge we have after every update: def update(self, key, value): old, self[key] = self[key], value if value > old: _siftup(self, key) else: _siftdown(self, 0, key) # Solution 2 and 3) repair using sort/heapify in a lazzy way: def __setitem__(self, key, value): super().__setitem__(key, value) self._broken = True def __getitem__(self, key): if self._broken: self._repair() self._broken = False return super().__getitem__(key) def _repair(self): if self.sort: self.sort() elif self.heap: heapify(self) # … you'll also need to delegate all other heap functions, for example: def pop(self): self._repair() return heappop(self)
我们首先可以检查所有三种方法是否都有效:
data = [10, 5, 18, 2, 37, 3, 8, 7, 19, 1] heap = Heap(data[:]) heap.update(8, 22) heap.update(7, 4) print(heap) heap = Heap(data[:], sort_fix=True) heap[8] = 22 heap[7] = 4 print(heap) heap = Heap(data[:], heap_fix=True) heap[8] = 22 heap[7] = 4 print(heap)
然后,我们可以使用以下功能运行一些性能测试:
import time import random def rand_update(heap, lazzy_fix=False, **kwargs): index = random.randint(0, len(heap)-1) new_value = random.randint(max_int+1, max_int*2) if lazzy_fix: heap[index] = new_value else: heap.update(index, new_value) def rand_updates(n, heap, lazzy_fix=False, **kwargs): for _ in range(n): rand_update(heap, lazzy_fix) def run_perf_test(n, data, **kwargs): test_heap = Heap(data[:], **kwargs) t0 = time.time() rand_updates(n, test_heap, **kwargs) test_heap[0] return (time.time() - t0)*1e3 results = [] max_int = 500 nb_updates = 1 for i in range(3, 7): test_size = 10**i test_data = [random.randint(0, max_int) for _ in range(test_size)] perf = run_perf_test(nb_updates, test_data) results.append((test_size, "update", perf)) perf = run_perf_test(nb_updates, test_data, lazzy_fix=True, heap_fix=True) results.append((test_size, "heapify", perf)) perf = run_perf_test(nb_updates, test_data, lazzy_fix=True, sort_fix=True) results.append((test_size, "sort", perf))
结果如下:
import pandas as pd import seaborn as sns dtf = pd.DataFrame(results, columns=["heap size", "method", "duration (ms)"]) print(dtf) sns.lineplot( data=dtf, x="heap size", y="duration (ms)", hue="method", )
从这些测试中我们可以看出,这
heapify
似乎是最合理的选择,但在最坏的情况下它具有相当好的复杂度:O(n),并且在实践中表现更好。另一方面,研究其他选择(例如具有专用于该特定问题的数据结构,例如,使用bin将单词放入其中,然后将它们从bin移至下一个看起来像是一条可能的轨道),可能是个好主意。调查)。重要说明:这种情况(更新与阅读比例为1:1)对
heapify
和sort
解决方案均不利。所以,如果你管理有AK:1分的比例,这一结论将更加清晰(你可以替换nb_updates = 1
与nb_updates = k
上面的代码)。数据框详细信息:
heap size method duration in ms 0 1000 update 0.435114 1 1000 heapify 0.073195 2 1000 sort 0.101089 3 10000 update 1.668930 4 10000 heapify 0.480175 5 10000 sort 1.151085 6 100000 update 13.194084 7 100000 heapify 4.875898 8 100000 sort 11.922121 9 1000000 update 153.587103 10 1000000 heapify 51.237106 11 1000000 sort 145.306110