本文主要讲堆的具体实现
堆
堆是优先队列的一种实现。堆一般是由数组实现,逻辑上堆可以看作是一棵完全二叉树。即我们本质上是在操作数组,但是可以形象的把它想象成是在对一棵完全二叉树操作。
主要分为大根堆和小根堆:
- 大根堆:父节点总是大于任何一个子节点
- 小根堆:父节点总是小于任何一个子节点
本文的实现以小堆为例,python的heapq也是以小堆为模型
架构
堆的核心操作是插入和删除堆顶元素。
基本架构如:
class BinaryHeap(object):
"""利用数组 实现小堆"""
def __init__(self, max_size=float("inf")):
pass
def __len__(self):
"""求长度"""
pass
def heappush(self, *data):
"""插入元素"""
pass
def heappop(self):
"""弹出堆顶元素"""
pass
def get_min(self):
pass
def heapify(self, data):
"""
利用传入的可迭代对象之间创建一个小堆
"""
pass
def _siftup(self):
"""最后插入的元素上浮"""
pass
def _siftdown(self, idx):
"""序号为idx的元素下沉"""
pass
初始化
- _heap为堆的本质(数组)
- max_size是可选参数,指堆的最大容量,默认不限大小
- size为当前堆的大小(数据量)
-
__len__
魔法函数,用于len()方法,直接返回size即可
def __init__(self, max_size=float("inf")):
self._heap = [float("-inf")]
self.max_size = max_size
self.size = 0
def __len__(self):
"""求长度"""
return self.size
占位0
注意:在初始的数组中,我们先预置了一个元素,float("-inf")即无限小,作用是占位0索引,是比较常见的一种实现。(如果是大堆则放无限大)
为什么需要占0位索引?考虑如下:
常规的完全二叉树索引是这样的
则假设当前节点的索引为idx:
- 父节点:(idx-1) // 2
- 左子节点:2idx + 1
- 右子节点:2idx + 2
但如果将0索引占位后,我们的堆索引是从1开始的,如:
此时:
- 父节点:idx //2
- 左子节点:2idx
- 右子节点:2idx + 1
可见占位0索引在关系节点的索引表示上会稍微简洁一点,后续编码也会简便点。这是比较常见的实现方式,我们在具体实现和逻辑想象中将0索引透明即可。
当然不占0索引位也没有任何问题,只需要注意索引表示和空堆时的操作即可。
插入
插入是堆的核心操作之一:
- 对于实质数组而言:
- 将元素插到数组尾
- 调整数组,找到合适的位置
- 对于逻辑树而言:
- 将元素插到第一个空的叶子节点
- 节点上浮到合适的位置
上浮指逐层比较,如果比父节点更小,则与父节点交换位置。流程如:
实现
基于前面我们的占位0,此时我们找到父节点的索引只需要用当前节点整除2即可。
代码使用位操作 >>1 效率更高,运算结果等同于整除2
内部操作调用_siftup()
def _siftup(self):
"""上浮最后一个元素(新插入的元素)"""
pos = self.size + 1 # 插入的位置 由于占位0 所以需要+1
x = self._heap[-1] # 获取最后一个位置元素
while x < self._heap[pos >> 1]: # 当比父节点小时,执行交换操作
self._heap[pos] = self._heap[pos >> 1]
pos >>= 1
self._heap[pos] = x
注意两点:
- 实际操作中,我们没有逐层的交换节点,只是将被替换的节点赋值,然后记录下要替换的索引
- 如果需要交换N层:
- 每次都交换需要执行至少2N次(如果引入temp指辅助交换会更多)
- 而记录索引值,我们只需要N次赋值 + 1次最终节点的赋值
- 如果需要交换N层:
- 我们占0位时使用的是无穷小(float("-inf")),带来的好处有:
- while判断时不用担心pos会移动0索引,因为0索引的值比任何数都小
- 空堆时不会进入循环,直接赋值到1索引的位置
对外提供的接口是insert(),支持插入单个元素或插入1组元素。
from collections import Iterable
def heappush(self, *data):
"""向堆中插入元素"""
if isinstance(data[0], Iterable):
if len(data) > 1:
raise ValueError
data = data[0]
if len(self) + len(data) > self.max_size:
# 容量不足
raise ValueError("Capacity is insufficient")
for x in data:
self._heap.append(x)
self._siftup()
self.size += 1
整体流程是:
1. 对参数做判断
- 允许的参数:单个数字、单个可迭代对象(list、set、tuple等)
- 不允许的参数:嵌套的对象 如: [[1,2,3], [4,5]]
2. 对容量做判断
3. 循环调用_siftup()
兼容多种类型的参数
可以看到在参数中,我们使用的解包*data。主要是为了优化代码,避免多重判断,存在大量的冗余代码。
关于解包:
def test1(data):
print(data)
def test2(*data):
print(data)
a = [1,2,3,4]
test1(a) # [1, 2, 3, 4]
test2(a) # ([1, 2, 3, 4],)
test2(1) # (1, )
test2(1,2,3,4) # (1, 2, 3, 4)
此处同理缺省参数的 *args **kwargs
test1是我们日常传参的方式,传什么得什么。而test2是利用解包将参数组成一个元组。这样做的好处无论用户传入的是单值、还是可迭代对象,我们都可以用索引访问,并且最终都可以用for循环遍历。
当然我们也可以逐一判断,伪代码如:
if 是单个数字:
执行单个插入
elif 是可迭代对象:
执行循环插入
删除堆顶
(类比高优先级的元素出队列)
删除堆顶元素也是核心操作之一:
- 对于实质数组而言:
- 将首元素与数组尾元素交换
- 从数组头开始调整数组
- 对于逻辑树而言:
- 将根节点与最后一个叶子节点交换
- 根节点下沉到合适的位置
下沉指逐层比较,如果比子节点更小,则与子节点交换位置。流程如:
实现
对内接口为_siftdown()
def _siftdown(self, idx):
"""序号为i的元素下沉"""
temp = self._heap[idx]
length = self.size
while 1:
child_idx = idx << 1
if child_idx > length:
break
if child_idx != length and self._heap[child_idx] > self._heap[child_idx + 1]:
child_idx += 1
if temp > self._heap[child_idx]:
self._heap[idx] = self._heap[child_idx]
else:
break
idx = child_idx
self._heap[idx] = temp
此处同理用位操作代替乘2, 左子节点坐标为 2idx,右坐标为 2idx+1,如果当前值 比左右子节点中最小的值 还大的话,下沉(与更小的那个交换)。
对外接口delete_min()
def heappop(self):
"""删除堆顶元素"""
if not self.size:
raise IndexError("")
_min = self._heap[1]
last = self._heap.pop()
self.size -=1
if self.size: # 为空了就不需要向下了
self._heap[1] = last
self._siftdown(1)
return _min
建堆和堆排
建堆指将传入的数组整理成一个堆。
当然我们也可以逐个的执行push操作,但是这样的时间复杂度是O(N log N) :
遍历数据(N)+ 在树中找到对应的位置(log N)
利用传入的数组进行建堆的时间复杂度可以降低到O(N)。具体的时间复杂度可以推算出来,这里不展开。有兴趣可以看:https://blog.csdn.net/yuzhihui_no1/article/details/44258297
对于逻辑树来说:
- 需要让每个节点都满足堆特性,即当前节点都小于(大于)它的子节点
- 由于叶子节点没有子节点,所以无需判断
- 即需要判断所有的非叶节点
如例:则遍历的节点依次是 8、6、5、6、7、1. 每个节点仅需要和自己的子节点做比较即可。
流程如:
实现
有了前面的基础,实现则很简单。即从最后一个非叶节点开始遍历、执行sitdown即可
def heapify(self, data):
if not data:
return
self._heap.extend(data)
self.size += len(data)
for idx in range(self.size >> 1, 0, -1):
self._siftdown(idx)
堆排
而堆排序算法就是在以上的基础上,每次的弹出栈顶,再调整顺序,再弹出栈顶。依次取出当前栈的最小值,从而获得一个新的排序数组。时间复杂度O(N logN),空间复杂度O(N).
具体代码可以通过上述过程组合出来,不展开。
# 堆的应用之一, 堆排序
def heap_sort(data, reverse=False):
"""接受一个可迭代对象进行排序, 默认从小到大排序, 返回一个列表"""
heap = BinaryHeap() # 新建一个堆
heap.heapify(data) # 时间复杂度 O(N)
lst = []
# 整个for循环时间复杂度是 O(N logN)
for i in range(len(heap)): # 遍历时间复杂度 O(N)
lst.append(heap.heappop()) # 每次调整时间复杂度 O(log N)
if reverse:
return lst[::-1]
return lst
优化
真正的堆排可以将空间复杂度优化成O(1),即使用原地排序。思想也很容易理解:
- 大堆为例。数组索引为:0 - n
- heapify进行堆的初始化
- 此时0位置已是数组中最大值,我们将0和n(最后一位)交换。
- 交换后,我们对0位置到(n - 1)位置继续堆排
- 将0位置和n-1交换,循环...
即每次将堆的最大值放到组数中正确的位置(从n开始递减)
def heapify(arr, n, i):
largest = i
l = 2 * i + 1 # left = 2*i + 1
r = 2 * i + 2 # right = 2*i + 2
if l < n and arr[i] < arr[l]:
largest = l
if r < n and arr[largest] < arr[r]:
largest = r
if largest != i:
arr[i],arr[largest] = arr[largest],arr[i] # 交换
heapify(arr, n, largest)
def heapSort(arr):
n = len(arr)
# Build a maxheap.
for i in range(n, -1, -1):
heapify(arr, n, i)
# 一个个交换元素
for i in range(n-1, 0, -1):
arr[i], arr[0] = arr[0], arr[i] # 交换
heapify(arr, i, 0)
arr = [ 12, 11, 13, 5, 6, 7]
heapSort(arr)
n = len(arr)
print ("排序后")
for i in range(n):
print ("%d" %arr[i]),
前k大的数
利用小堆可以实现前k大的数,而利用大堆可以实现求后k小的数。
利用小堆求前k大的数流程:
- 创建一个k容量的小堆
- 先放入前k个数据
- 遍历剩下的数据,依次和堆顶比较
- 如果比堆顶小直接丢弃
- 如果比栈顶大,弹出栈顶元素,新元素入栈。
def nlargest(data, k):
"""获取前k个最大的数"""
heap = BinaryHeap(k)
heap.heapify(data[:k]) # 用前k个数建立一个小顶堆, 堆顶即是第k大的数
for d in data[k:]:
if heap.get_min() < d:
heap.heappop()
heap.heappush(d)
lst = [] # 获取堆里的元素
for i in range(k):
lst.append(heap.heappop())
return lst
完整代码以及其他重点数据结构在GitHub:https://github.com/Sssmeb/python_datastructure/tree/master
如果觉得有帮助可以在github上给我点个star~谢谢谢谢谢谢