【机器学习实践】隐马尔可夫模型(一)

隐马尔可夫模型

隐马尔可夫链模型 Hidden Markov Model

首先应当了解马尔科夫链模型,隐马尔可夫链模型即可以解决马尔可夫链模型状态量非常多的情况下(如 连续值)转移矩阵过大的问题,并由它转化而来。解决原理是假定当前状态(称为显状态)仅由隐状态决定,由于隐状态可以比当前状态少很多,因此转移矩阵参数更少。
隐马尔可夫模型的分类:离散、连续。

隐马尔可夫模型的典型功能

  1. 根据一个或多个已知的可见层和隐藏层训练模型,获得释放矩阵和状态转移矩阵,得到模型参数。
  2. 使用前向算法、后向算法或者暴力算法,根据已知模型的参数,求出某一已知可见层序列的发生概率。
  3. 使用维特比(Viterbi)算法根据一个可见层序列推导出最有可能的隐藏层序列。

python实现

此处利用python对离散的隐马尔可夫模型进行建模,满足其训练、计算和推导的功能。
由于篇幅限制,本篇仅实现其前向算法、后向算法、遍历(暴力)算法的实现,和相关准备工作

下面是hmm类本体get_prob实现典型功能2的功能,而fit在本章中使用具有隐藏层和可见层的数据进行计算,而Baum-Welch算法和Viterbi算法在后续篇幅进行介绍,本类对相关算法部分留下了篇幅

#hmm.py

from operator import truediv
import numpy as np

class discrete_hidden_Markov_model:
    def __init__(self, debug:bool=False):
        #typical 5 model params of hmm(lambda(Pi, A, B))
        self.vis_kind = None
        self.invis_kind = None
        self.init_state_mat = None
        self.trans_state_mat = None
        self.emitter_mat = None
        #configurations
        self.debug = debug

    #from a visible chain, or a visible chain with an invisible chain, infer the params of the hmm model
    def fit(self, vis_data:np.ndarray, invis_data:np.ndarray=None)->None:

        #if invis_data is None, then use baum-welch algrithm, if invis_data is given, then use ordinary probability calculation
        if invis_data is None:#use bw automatically
            pass 
        else:
            if np.shape(vis_data) != np.shape(invis_data):
                print("[-] length of two chains are not equal");return
            else:
                data_len = np.shape(vis_data)[1]
                data_num = np.shape(vis_data)[0]
            self.vis_kind = vis_data.max() + 1
            self.invis_kind = invis_data.max() + 1
            # to fill the initial state matrix
            self.init_state_mat = np.zeros((self.invis_kind,), dtype=np.float64)
            for i in range(data_num):
                self.init_state_mat[invis_data[i, 0]] += 1#only the head of one chain
            self.init_state_mat /= np.linalg.norm(self.init_state_mat, ord=1)
            if self.debug : print(self.init_state_mat)
            # to fill the state transfer matrix
            self.trans_state_mat = np.zeros((self.invis_kind, self.invis_kind), dtype=np.float64)
            for k in range(data_num):
                for i in range(data_len - 1):
                    self.trans_state_mat[invis_data[k, i], invis_data[k, i + 1]] += 1
            for i in range(self.invis_kind):
                self.trans_state_mat[i] /= np.linalg.norm(self.trans_state_mat[i], ord=1)
            if self.debug:print(self.trans_state_mat)
            # to fill the emitter matrix
            self.emitter_mat = np.zeros((self.invis_kind, self.vis_kind), dtype=np.float64)
            for k in range(data_num):
                for i in range(data_len):
                    self.emitter_mat[invis_data[k, i],vis_data[k, i]] += 1
            for i in range(self.invis_kind):
                self.emitter_mat[i] /= np.linalg.norm(self.emitter_mat[i], ord=1)
            if self.debug:print(self.emitter_mat)

    #get the specific probability from a known visible chain, with the params of the model known
    def get_prob(self, vis_data:np.ndarray, algorithm:str="forward")->float:
        '''algorithm : 'traversal' / 'forward' / 'backward' 
        '''
        vis_data = vis_data.ravel()
        if self.emitter_mat is None or self.trans_state_mat is None:
            print("[-] params have not been infered yet"); return
        probability = 0.
        if algorithm == 'forward':
            data_infer_len = vis_data.shape[0]
            prob_data_arr = np.zeros((self.invis_kind, data_infer_len), dtype=np.float64)
            #first col of prob
            for j in range(self.invis_kind):
                prob_data_arr[j, 0] = self.init_state_mat[j] * self.emitter_mat[j, vis_data[0]]
            #total time complexity : O(T*N^2)
            for i in range(1, data_infer_len):
                #calculate this matrix of probability forward
                #time complexity : O(N^2)
                for j in range(self.invis_kind):# indicator for forward invis nodes
                    prob_back_node = 0.
                    for k in range(self.invis_kind):# indicator for last(back) invis nodes
                        prob_back_node += prob_data_arr[k, i-1] * self.trans_state_mat[k, j] * self.emitter_mat[j, vis_data[i]]
                    prob_data_arr[j, i] = prob_back_node
            if self.debug:print(prob_data_arr)
            probability = prob_data_arr[:, -1].sum()
            if self.debug:print(probability)
        elif algorithm == 'backward':
            data_infer_len = vis_data.shape[0]
            prob_data_arr = np.zeros((self.invis_kind, data_infer_len+1), dtype=np.float64)
            #last col of the prob
            for j in range(self.vis_kind):
                prob_data_arr[j, -1] = 1
            #total time complexity : O(T*N^2)
            for i in range(data_infer_len - 1, -1, -1):
                #calculate this matrix of probability backward
                #time complexity : O(N^2)
                for j in range(self.invis_kind):#indicator for backward invis nodes
                    prob_front_node = 0.
                    for k in range(self.invis_kind):#indicator for last(front) invis nodes
                        prob_front_node += prob_data_arr[k, i+1] * self.trans_state_mat[j, k] * self.emitter_mat[j, vis_data[i]];
                    prob_data_arr[j, i] = prob_front_node
            for j in range (self.vis_kind):
                probability += prob_data_arr[j, 0] * self.init_state_mat[j]
            if self.debug:print(prob_data_arr)
            
        elif algorithm == 'traversal':
            invis_data = np.zeros(vis_data.shape, dtype=np.int)
            #generate all possible invisible sequences, with traversing method
            data_infer_len = vis_data.shape[0]
            for i in range(self.invis_kind**data_infer_len-1):
                probability_this_chain = 1.
                #calculate the probability of this very sequence and add it to total probability
                for k in range(data_infer_len):
                    # P(vis_data[k]|invis_data[k])
                    # not bayes law type, reason invis -> result vis : regard invis as reason
                    probability_this_chain *= self.emitter_mat[invis_data[k], vis_data[k]]
                for k in range(0, data_infer_len - 1):
                    # P(invis_data[k]->invis_data[k+1])
                    probability_this_chain *= self.trans_state_mat[invis_data[k], invis_data[k+1]]
                probability_this_chain *= self.init_state_mat[invis_data[0]]
                # if self.debug : print(probability_this_chain)
                probability += probability_this_chain
                #update the chain
                invis_data[0] += 1
                for j in range(data_infer_len - 1):
                    if invis_data[j] == self.invis_kind:
                        invis_data[j] = 0
                        invis_data[j+1] += 1
            if self.debug : print(probability)
        else:
            print("[-] should choose the right algorithm");return
        return probability

    def get_hidden(self, vis_data:np.ndarray, algorithm:str="Viterbi")->np.ndarray:
        '''algorithm : 'Viterbi"
        '''
        if algorithm == "Viterbi":
            pass
        else:
            print("[-] should choose the right algorithm");return

类似的数据集很少,因此本人自制了一个生成隐马尔可夫链的程序。还可以根据生成时的原始参数检验各种算法的正确性。

#data_gen.py

#this file is used to generate original data for hmm training
import numpy as np
#suppose 3 invis state, 3 vis state

chain_len = 100
chain_num = 500

#initial invisible state probability
#   prob    0    1    2
#           p    p    p
init_state_prob = np.array([0.3, 0.2, 0.5])
#here we assume this to be not equal

#invisible state transfer matrix
#    past\cur    0    1    2
#     0          p    p    p
#     1          p    p    p
#     2          p    p    p

trans_state_mat = np.array([
    [0.2, 0.4, 0.4],
    [0.4, 0.4, 0.2],
    [0.3, 0.3, 0.4]
])

#invisible state -> visible state matrix
#    invis\vis    0    1    2
#      0          p    p    p
#      1          p    p    p
#      2          p    p    p

emitter_mat = np.array([
    [0.3, 0.4, 0.3],
    [0.5, 0.3, 0.2],
    [0.2, 0.5, 0.3]
])

def get_chain()->tuple([np.ndarray, np.ndarray]):
    vis_chains = np.ndarray((chain_num, chain_len), dtype=np.int)
    invis_chains = np.ndarray((chain_num, chain_len), dtype=np.int)
    for i in range(chain_num):
        
        #generate invisible chain
        start_code = np.random.random()
        invis_chain = list()
        if start_code < init_state_prob[0]:
            start_invis_state = np.array([1., 0., 0.])
            invis_chain.append(0)
        elif start_code < init_state_prob[1] + init_state_prob[0]:
            start_invis_state = np.array([0., 1., 0.])
            invis_chain.append(1)
        else:
            start_invis_state = np.array([0., 0., 1.])
            invis_chain.append(2)
        invis_state = start_invis_state
        while len(invis_chain) < chain_len:
            next_state_prob = (invis_state @ trans_state_mat)
            next_code = np.random.random()
            if next_code < next_state_prob[0]:
                invis_state = np.array([1., 0., 0.])
                invis_chain.append(0)
            elif next_code < next_state_prob[0] + next_state_prob[1]:
                invis_state = np.array([0., 1., 0.])
                invis_chain.append(1)
            else:
                invis_state = np.array([0., 0., 1.])
                invis_chain.append(2)

        #generate visible chain
        vis_chain = list()
        for invis_state_code in invis_chain:
            if invis_state_code == 0:
                invis_state = np.array([1., 0., 0.])
            elif invis_state_code == 1:
                invis_state = np.array([0., 1., 0.])
            else:
                invis_state = np.array([0., 0., 1.])
            vis_state_prob = invis_state @ emitter_mat
            vis_code = np.random.random()
            if vis_code < vis_state_prob[0]:
                vis_chain.append(0)
            elif vis_code < vis_state_prob[0] + vis_state_prob[1]:
                vis_chain.append(1)
            else:
                vis_chain.append(2)

        vis_chains[i, :] = vis_chain
        invis_chains[i, :] = invis_chain
    return vis_chains, invis_chains

编写代码产生对象进行测试

#main.py

import numpy as np
from data_gen import get_chain
import data_gen

from hmm import discrete_hidden_Markov_model

#initialize
yyz_discrete_HMM_machine = discrete_hidden_Markov_model(debug=False)

# strict test
# yyz_discrete_HMM_machine.emitter_mat = data_gen.emitter_mat
# yyz_discrete_HMM_machine.trans_state_mat = data_gen.trans_state_mat
# yyz_discrete_HMM_machine.init_state_mat = data_gen.init_state_prob
# yyz_discrete_HMM_machine.invis_kind = 3
# yyz_discrete_HMM_machine.vis_kind = 3

#fit test
vis, invis = get_chain()
yyz_discrete_HMM_machine.fit(vis, invis)

#print params
print(yyz_discrete_HMM_machine.emitter_mat, yyz_discrete_HMM_machine.trans_state_mat, yyz_discrete_HMM_machine.init_state_mat)


#normalization test : total probability is 1.
data_infer_len = 6
vis_data = np.zeros((data_infer_len,), dtype=np.int)
# generate all possible invisible sequences, with traversing method
prob_for = 0.
prob_tra = 0.
prob_bak = 0.
for i in range(3**data_infer_len):
    prob_tra += yyz_discrete_HMM_machine.get_prob(vis_data, algorithm='traversal')
    prob_bak += yyz_discrete_HMM_machine.get_prob(vis_data, algorithm='backward')
    prob_for += yyz_discrete_HMM_machine.get_prob(vis_data, algorithm='forward')
    vis_data[0] += 1
    for j in range(data_infer_len - 1):
        if vis_data[j] == 3:
            vis_data[j] = 0
            vis_data[j+1] += 1

print(prob_for, prob_tra, prob_bak)

#result
拟合结果

发射矩阵
[[0.30223322 0.3989192  0.29884758]
 [0.50214004 0.29603819 0.20182177]
 [0.20046293 0.49771578 0.30182128]]

状态转移矩阵
[[0.20398501 0.4028408  0.3931742 ]
 [0.39799201 0.39865764 0.20335034]
 [0.30328927 0.29505072 0.40166001]]

初始概率
[0.298 0.224 0.478]
归一化结果
1.0 0.9950028676787088 0.9999999999999998

得到的概率矩阵和生成代码时在误差允许范围内相等,说明本初级统计型拟合程序有效;三种算法都具有归一化的特性,典型功能2算法测试成功。

总结

  1. 对隐马尔可夫模型做了简要介绍
  2. 由于算法之间的加法、乘法方式和次数不同,理论上结果应当相同的算法之间也产生了偏差。这是数据结构的精度所导致的。
  3. 今天是2.11除夕夜,这一篇其实写了挺久的,主要这会儿看春晚太无聊就写一下总结。另外,祝各位网友牛年新年快乐!

参考资料

前后向算法的理解和公式
知乎资料

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,033评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,725评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,473评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,846评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,848评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,691评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,053评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,700评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,856评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,676评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,787评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,430评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,034评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,990评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,218评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,174评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,526评论 2 343

推荐阅读更多精彩内容