一、问题描述和基本要求
对一幅BMP格式的灰度图像(如个人证件照片)进行二元霍夫曼编码和译码。
二、算法思想
霍夫曼在1952年提出了一中构造最佳码的方法,这种编码方法被称为霍夫曼编码。霍夫曼编码适用于多元独立信源,对于多元独立信源来说他是最佳码。它充分利用了信源概率分布的特性进行编码,完全依据字符出现概率来构造异字头的平均长度最短的码字,因此它是一种最佳的逐个符号的编码方法。
元霍夫曼码的编码步骤如下:
① 将个信源符号按概率分布 的大小,以递减的次序排列起来,设
② 为了使最后一步信源的缩减信源有个信源符号,需要对信源添加个概率为0的信源符号,使扩展后的信源符号个数满足
其中,表示缩减的次数,表示每次缩减所减少的信源符号的个数,代表编码元数。
③ 用共个码符号分别分配给概率最小的个信源符号,并将这两个概率最小的信源符号合并成一个新的符号,并用这个最小概率之和作为新符号的概率,从而得到只包含个符号的缩减信源
④ 以此继续下去,直到缩减信源最后只剩个符号为止。最后这个符号的概率之和为1。然后从最后一级缩减信源开始,依编码路径由后向前返回,就得出各个信源符号所对应的码符号序列,即所对应的码字。
霍夫曼编码过程中生成的码树是霍夫曼树。假如采用二元编码,那么他的码树二元霍夫曼树。在数据结构中,二元霍夫曼树也称为最优二元树。若对他的叶全部赋权值,那么权值乘上叶子的高度的和最小,这也就是二元霍夫曼编码的平均码长。
进行霍夫曼编码,有静态霍夫曼编码和自适应霍夫曼编码(常用的算法有Faller-Gallagher-Knuth算法,简称FGK算法,后来针对FGK算法存在的问题,Vitter进行了改进,称为Vitter算法)。本实验采用静态霍夫曼编码。
静态霍夫曼编码,首先扫描第一遍输入符号流,统计各个符号出现的概率,之后按照上述算法构造二元最优树,再对输入符号流进行第二遍扫描进行编码。
二元的霍夫曼编码的过程和构造一棵最优二元树的过程是等价的。一棵最优二元树,若从他的根开始进行遍历操作,左结点遍历字符串插入0,右结点遍历字符串插入1,那么最后叶结点的得到的遍历字符串即为码字。
构造一棵二元最优树的算法可以描述为:
① 令;
② 从S中取出两个最小的权和,画结点,带权,画结点,带权。画和的父亲,连接和,和,令带权;
③ 令;
④ 判断是否只有一个元素,若是,则停止,否则重复② ③ ④。
带有上述中唯一权重的结点就是最优二元树的根结点。为了方便取出权重最小的结点,我们可以使用优先级队列(或者堆)来实现。那么,构造一棵最优二元树的算法可以更加形式化的描述为:
初始化优先级队列PQ
把所有带权重的结点放入PQ中
while PQ的大小 > 1:
取出PQ的顶端结点v_1,带权重w_1
取出PQ的顶端结点v_2,带权重w_2
构造新结点Father,使之左孩子为Node1,右孩子为Node2,权重为w_1+w_2
将Father放入到PQ中
带有PQ的唯一权重的对应节点是构造好最优二元树的根结点。
之后,再对构造好的最优二元树进行遍历即可得到码字。
静态霍夫曼编码容易实现,但要提前统计被编码对象中的符号出现概率,因此必须对输入符号流进行两遍扫描,这也限制了静态霍夫曼编码的应用。
三、源代码
#!/usr/bin/env python3
import os
import pickle
from math import log
from queue import PriorityQueue, Queue
# python3 -m pip install matplotlib
import matplotlib.pyplot as plt
# python3 -m pip install numpy
import numpy as np
# python3 -m pip install opencv-python
from cv2 import IMREAD_GRAYSCALE, imread, imshow, imwrite, absdiff
# python3 -m pip install prettytable
from prettytable import PrettyTable
class StaticHuffNode:
def __init__(self, s: int, weight: int, lchild=None, rchild=None, parent=None):
self.s = s # 信源符号
self.weight = weight # 权重
self.parent = parent # 父亲
self.lchild = lchild # 左子树
self.rchild = rchild # 右子树
self.word = "" # 码字
def __lt__(self, other):
return self.weight < other.weight
class WriterWrapper:
def __init__(self, f):
self.buf = ""
self.f = f
def __call__(self, con: str):
self.buf += con
while len(self.buf) >= 8:
out = self.buf[0:8]
self.buf = self.buf[8:]
conint = int(out, 2)
self.f.write(bytes([conint]))
def flush(self):
if len(self.buf) > 0:
self.buf += "0"*(7 - ((len(self.buf) - 1) % 8))
while len(self.buf) > 0:
out = self.buf[0:8]
self.buf = self.buf[8:]
conint = int(out, 2)
self.f.write(bytes([conint]))
class ReaderWrapper:
def __init__(self, f):
self.buf = ""
self.f = f
def next(self):
if self.buf != "":
c = self.buf[0]
self.buf = self.buf[1:]
return c
c = self.f.read(1)
if c == b"":
return ""
bstr = bin(ord(c))[2:]
bstr = "0"*(7 - ((len(bstr) - 1) % 8)) + bstr
self.buf = bstr
return self.next()
def read(self, c : int):
s = ""
for i in c:
s += self.next()
return s
def traverse(root, s: str, NodeStringMap: dict):
if (root.lchild == None) and (root.rchild == None):
root.word = s
NodeStringMap[root.s] = root.word
return
traverse(root.lchild, s + "0", NodeStringMap)
traverse(root.rchild, s + "1", NodeStringMap)
def StaticHuffmanEncodingAndDecoding(counter, originsize, originshape, originfilesize):
PQ = PriorityQueue()
table = PrettyTable()
table.field_names = ["信源符号", "数量", "概率"]
for item in flatten:
counter[item] += 1
for item in counter:
table.add_row([item, counter[item], counter[item] / originsize])
PQ.put(StaticHuffNode(item, counter[item]))
Hs = sum(map(lambda weight: -weight / originsize *
log(weight / originsize, 2), counter.values()))
table.sortby = "数量"
table.reversesort = True
print("完成")
print("原图像分辨率为:{1}*{0},作为信源,共有{2}个、{3}种信源符号,熵为{4:.2f}, "
"信源概率表如下:".format(*originshape, originsize, len(counter), Hs))
print(table)
print("正在进行静态霍夫曼编码...")
# 采用优先级队列,选择权重最小的节点,合并后放入队列中继续操作,直到队列中只剩一个节点
while PQ.qsize() > 1:
left = PQ.get()
right = PQ.get()
PQ.put(StaticHuffNode(0, left.weight + right.weight, left, right))
NodeStringMap = {}
# 遍历霍夫曼树,获取每个信源符号对应的码字
root = PQ.get()
traverse(root, "", NodeStringMap)
table = PrettyTable()
table.field_names = ["信源符号", "码字", "码字长度", "数量", "概率"]
Len = 0
for item in counter:
table.add_row([item, NodeStringMap[item], len(NodeStringMap[item]),
counter[item], counter[item] / originsize])
Len += counter[item]*len(NodeStringMap[item])
AveLen = Len / originsize
print("编码完成,编码表如下:")
table.sortby = "数量"
print(table)
print("信源的熵为{:.2f},平均码长为{:.2f},编码效率为{:.2f}%".format(
Hs, AveLen, Hs/AveLen*100))
f = open("statictarget.bin", "wb")
print("将编码后的文件写入到新文件statictarget.bin中。编码采用静态霍夫曼编码,先写入图像尺寸和文件长度,再写入编码表,最后写入编码后图像矩阵")
# 写入文件分辨率和长度
f.write(pickle.dumps(originshape))
f.write(pickle.dumps(Len))
# 写入编码表
f.write(pickle.dumps(counter))
# 写入图像矩阵
wt = WriterWrapper(f)
for item in flatten:
wt(NodeStringMap[item])
wt.flush()
f.close()
TarLen = os.path.getsize("statictarget.bin")
print("新文件statictarget.bin的大小为{}字节,源文件大小为{}字节,压缩比为{:.2f}%".format(
TarLen, originfilesize, TarLen/originfilesize*100))
print("现在开始从新文件statictarget.bin中恢复图像信息...")
f = open("statictarget.bin", "rb")
# 读取图像分辨率
originshape = pickle.load(f)
# 读取长度
Len = pickle.load(f)
# 读取编码表
counter = pickle.load(f)
print("正在进行霍夫曼译码...")
PQ = PriorityQueue()
for item in counter:
PQ.put(StaticHuffNode(item, counter[item]))
while PQ.qsize() > 1:
left = PQ.get()
right = PQ.get()
PQ.put(StaticHuffNode(0, left.weight + right.weight, left, right))
p = root = PQ.get()
cur = 0
rd = ReaderWrapper(f)
imgflatten = []
while (cur != Len):
c = rd.next()
cur += 1
if c == "0":
p = p.lchild
elif c == "1":
p = p.rchild
else:
raise AssertionError("c must be 0 or 1")
break
if (p.lchild == None) and (p.rchild == None):
imgflatten.append(p.s)
p = root
imgarr = np.array(imgflatten, dtype=np.uint8)
imgarr.resize(originshape)
imwrite("statictarget.bmp", imgarr)
dif = absdiff(originimg, imgarr)
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
fig = plt.figure('result')
plt.subplot(1, 3, 1)
plt.title('原图像')
plt.imshow(originimg, plt.cm.gray)
plt.axis('off')
plt.subplot(1, 3, 2)
plt.title('编解码后的图像')
plt.imshow(imgarr, plt.cm.gray)
plt.axis('off')
plt.subplot(1, 3, 3)
plt.title('差异')
plt.imshow(dif, plt.cm.gray)
plt.subplots_adjust(wspace=0.15)
plt.axis('off')
plt.show()
return Len, Hs
if __name__ == "__main__":
print("请将要压缩的图像文件命名为origin.bmp,请保证该图像为灰度图像。")
print("读入源图像文件...", end="")
originimg = imread("origin.bmp", IMREAD_GRAYSCALE)
originfilesize = os.path.getsize("origin.bmp")
print(f"成功,源文件大小为{originfilesize}字节")
print("分析原图像文件...", end="")
originshape = originimg.shape
originsize = originimg.size
flatten = originimg.flatten()
counter = dict.fromkeys(set(flatten), 0)
Len, Hs = StaticHuffmanEncodingAndDecoding(
counter, originsize, originshape, originfilesize)