信息论与编码实验一 灰度图像的二元霍夫曼编码和译码

一、问题描述和基本要求

对一幅BMP格式的灰度图像(如个人证件照片)进行二元霍夫曼编码和译码。

二、算法思想

霍夫曼在1952年提出了一中构造最佳码的方法,这种编码方法被称为霍夫曼编码。霍夫曼编码适用于多元独立信源,对于多元独立信源来说他是最佳码。它充分利用了信源概率分布的特性进行编码,完全依据字符出现概率来构造异字头的平均长度最短的码字,因此它是一种最佳的逐个符号的编码方法。
r 元霍夫曼码的编码步骤如下:
① 将q个信源符号按概率分布 P(s_i) 的大小,以递减的次序排列起来,设


② 为了使最后一步信源的缩减信源有r个信源符号,需要对信源S添加t个概率为0的信源符号,使扩展后的信源符号个数q+t满足

其中,θ表示缩减的次数,r-1表示每次缩减所减少的信源符号的个数,r代表编码元数。
③ 用0,1,…,r-1r个码符号分别分配给概率最小的r个信源符号,并将这两个概率最小的信源符号合并成一个新的符号,并用这r个最小概率之和作为新符号的概率,从而得到只包含q-1个符号的缩减信源S_1
④ 以此继续下去,直到缩减信源最后只剩r个符号为止。最后这r个符号的概率之和为1。然后从最后一级缩减信源开始,依编码路径由后向前返回,就得出各个信源符号所对应的码符号序列,即所对应的码字。
霍夫曼编码过程中生成的码树是霍夫曼树。假如采用二元编码,那么他的码树二元霍夫曼树。在数据结构中,二元霍夫曼树也称为最优二元树。若对他的叶全部赋权值,那么权值乘上叶子的高度的和最小,这也就是二元霍夫曼编码的平均码长。
进行霍夫曼编码,有静态霍夫曼编码和自适应霍夫曼编码(常用的算法有Faller-Gallagher-Knuth算法,简称FGK算法,后来针对FGK算法存在的问题,Vitter进行了改进,称为Vitter算法)。本实验采用静态霍夫曼编码。
静态霍夫曼编码,首先扫描第一遍输入符号流,统计各个符号出现的概率,之后按照上述算法构造二元最优树,再对输入符号流进行第二遍扫描进行编码。
二元的霍夫曼编码的过程和构造一棵最优二元树的过程是等价的。一棵最优二元树,若从他的根开始进行遍历操作,左结点遍历字符串插入0,右结点遍历字符串插入1,那么最后叶结点的得到的遍历字符串即为码字。
构造一棵二元最优树的算法可以描述为:
① 令S={ w_1,w_2,…,w_t }
② 从S中取出两个最小的权w_iw_j,画结点v_i,带权w_i,画结点v_j,带权w_j。画v_iv_j的父亲v,连接v_ivv_jv,令v带权w_i+w_j
③ 令S = (S - { w_i,w_j })∪{ w_i+w_j }
④ 判断S是否只有一个元素,若是,则停止,否则重复② ③ ④。
带有上述S中唯一权重的结点v就是最优二元树的根结点。为了方便取出权重最小的结点,我们可以使用优先级队列(或者堆)来实现。那么,构造一棵最优二元树的算法可以更加形式化的描述为:

初始化优先级队列PQ
把所有带权重的结点放入PQ中
while PQ的大小 > 1:
    取出PQ的顶端结点v_1,带权重w_1
    取出PQ的顶端结点v_2,带权重w_2
    构造新结点Father,使之左孩子为Node1,右孩子为Node2,权重为w_1+w_2
    将Father放入到PQ中

带有PQ的唯一权重的对应节点是构造好最优二元树的根结点。
之后,再对构造好的最优二元树进行遍历即可得到码字。
静态霍夫曼编码容易实现,但要提前统计被编码对象中的符号出现概率,因此必须对输入符号流进行两遍扫描,这也限制了静态霍夫曼编码的应用。

三、源代码

#!/usr/bin/env python3
import os
import pickle
from math import log
from queue import PriorityQueue, Queue
# python3 -m pip install matplotlib
import matplotlib.pyplot as plt
# python3 -m pip install numpy
import numpy as np
# python3 -m pip install opencv-python
from cv2 import IMREAD_GRAYSCALE, imread, imshow, imwrite, absdiff
# python3 -m pip install prettytable
from prettytable import PrettyTable

class StaticHuffNode:
    def __init__(self, s: int, weight: int, lchild=None, rchild=None, parent=None):
        self.s = s              # 信源符号
        self.weight = weight    # 权重
        self.parent = parent    # 父亲
        self.lchild = lchild    # 左子树
        self.rchild = rchild    # 右子树
        self.word = ""          # 码字

    def __lt__(self, other):
        return self.weight < other.weight

class WriterWrapper:
    def __init__(self, f):
        self.buf = ""
        self.f = f

    def __call__(self, con: str):
        self.buf += con
        while len(self.buf) >= 8:
            out = self.buf[0:8]
            self.buf = self.buf[8:]
            conint = int(out, 2)
            self.f.write(bytes([conint]))

    def flush(self):
        if len(self.buf) > 0:
            self.buf += "0"*(7 - ((len(self.buf) - 1) % 8))
        while len(self.buf) > 0:
            out = self.buf[0:8]
            self.buf = self.buf[8:]
            conint = int(out, 2)
            self.f.write(bytes([conint]))

class ReaderWrapper:
    def __init__(self, f):
        self.buf = ""
        self.f = f

    def next(self):
        if self.buf != "":
            c = self.buf[0]
            self.buf = self.buf[1:]
            return c
        c = self.f.read(1)
        if c == b"":
            return ""
        bstr = bin(ord(c))[2:]
        bstr = "0"*(7 - ((len(bstr) - 1) % 8)) + bstr
        self.buf = bstr
        return self.next()

    def read(self, c : int):
        s = ""
        for i in c:
            s += self.next()
        return s

def traverse(root, s: str, NodeStringMap: dict):
    if (root.lchild == None) and (root.rchild == None):
        root.word = s
        NodeStringMap[root.s] = root.word
        return
    traverse(root.lchild, s + "0", NodeStringMap)
    traverse(root.rchild, s + "1", NodeStringMap)

def StaticHuffmanEncodingAndDecoding(counter, originsize, originshape, originfilesize):
    PQ = PriorityQueue()
    table = PrettyTable()
    table.field_names = ["信源符号", "数量", "概率"]
    for item in flatten:
        counter[item] += 1
    for item in counter:
        table.add_row([item, counter[item], counter[item] / originsize])
        PQ.put(StaticHuffNode(item, counter[item]))
    Hs = sum(map(lambda weight: -weight / originsize *
                 log(weight / originsize, 2), counter.values()))
    table.sortby = "数量"
    table.reversesort = True
    print("完成")
    print("原图像分辨率为:{1}*{0},作为信源,共有{2}个、{3}种信源符号,熵为{4:.2f}, "
          "信源概率表如下:".format(*originshape, originsize, len(counter), Hs))
    print(table)
    print("正在进行静态霍夫曼编码...")
    # 采用优先级队列,选择权重最小的节点,合并后放入队列中继续操作,直到队列中只剩一个节点
    while PQ.qsize() > 1:
        left = PQ.get()
        right = PQ.get()
        PQ.put(StaticHuffNode(0, left.weight + right.weight, left, right))
    NodeStringMap = {}
    # 遍历霍夫曼树,获取每个信源符号对应的码字
    root = PQ.get()
    traverse(root, "", NodeStringMap)
    table = PrettyTable()
    table.field_names = ["信源符号", "码字", "码字长度", "数量", "概率"]
    Len = 0
    for item in counter:
        table.add_row([item, NodeStringMap[item], len(NodeStringMap[item]),
                       counter[item], counter[item] / originsize])
        Len += counter[item]*len(NodeStringMap[item])
    AveLen = Len / originsize
    print("编码完成,编码表如下:")
    table.sortby = "数量"
    print(table)
    print("信源的熵为{:.2f},平均码长为{:.2f},编码效率为{:.2f}%".format(
        Hs, AveLen, Hs/AveLen*100))
    f = open("statictarget.bin", "wb")
    print("将编码后的文件写入到新文件statictarget.bin中。编码采用静态霍夫曼编码,先写入图像尺寸和文件长度,再写入编码表,最后写入编码后图像矩阵")
    # 写入文件分辨率和长度
    f.write(pickle.dumps(originshape))
    f.write(pickle.dumps(Len))
    # 写入编码表
    f.write(pickle.dumps(counter))
    # 写入图像矩阵
    wt = WriterWrapper(f)
    for item in flatten:
        wt(NodeStringMap[item])
    wt.flush()
    f.close()
    TarLen = os.path.getsize("statictarget.bin")
    print("新文件statictarget.bin的大小为{}字节,源文件大小为{}字节,压缩比为{:.2f}%".format(
        TarLen, originfilesize, TarLen/originfilesize*100))
    print("现在开始从新文件statictarget.bin中恢复图像信息...")
    f = open("statictarget.bin", "rb")
    # 读取图像分辨率
    originshape = pickle.load(f)
    # 读取长度
    Len = pickle.load(f)
    # 读取编码表
    counter = pickle.load(f)
    print("正在进行霍夫曼译码...")
    PQ = PriorityQueue()
    for item in counter:
        PQ.put(StaticHuffNode(item, counter[item]))
    while PQ.qsize() > 1:
        left = PQ.get()
        right = PQ.get()
        PQ.put(StaticHuffNode(0, left.weight + right.weight, left, right))
    p = root = PQ.get()
    cur = 0
    rd = ReaderWrapper(f)
    imgflatten = []
    while (cur != Len):
        c = rd.next()
        cur += 1
        if c == "0":
            p = p.lchild
        elif c == "1":
            p = p.rchild
        else:
            raise AssertionError("c must be 0 or 1")
            break
        if (p.lchild == None) and (p.rchild == None):
            imgflatten.append(p.s)
            p = root
    imgarr = np.array(imgflatten, dtype=np.uint8)
    imgarr.resize(originshape)
    imwrite("statictarget.bmp", imgarr)
    dif = absdiff(originimg, imgarr)
    plt.rcParams['font.sans-serif'] = ['SimHei']
    plt.rcParams['axes.unicode_minus'] = False
    fig = plt.figure('result')
    plt.subplot(1, 3, 1)
    plt.title('原图像')
    plt.imshow(originimg, plt.cm.gray)
    plt.axis('off')
    plt.subplot(1, 3, 2)
    plt.title('编解码后的图像')
    plt.imshow(imgarr, plt.cm.gray)
    plt.axis('off')
    plt.subplot(1, 3, 3)
    plt.title('差异')
    plt.imshow(dif, plt.cm.gray)
    plt.subplots_adjust(wspace=0.15)
    plt.axis('off')
    plt.show()
    return Len, Hs

if __name__ == "__main__":
    print("请将要压缩的图像文件命名为origin.bmp,请保证该图像为灰度图像。")
    print("读入源图像文件...", end="")
    originimg = imread("origin.bmp", IMREAD_GRAYSCALE)
    originfilesize = os.path.getsize("origin.bmp")
    print(f"成功,源文件大小为{originfilesize}字节")
    print("分析原图像文件...", end="")
    originshape = originimg.shape
    originsize = originimg.size
    flatten = originimg.flatten()
    counter = dict.fromkeys(set(flatten), 0)
    Len, Hs = StaticHuffmanEncodingAndDecoding(
        counter, originsize, originshape, originfilesize)
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容