本文重点介绍贝叶斯推断中的MCMC方法,这是众多方法中的一个,具体的分类可以看图。基于估计的呢,是点估计,求出目标分布的极值。这里呢多说一句,贝叶斯的点估计是对后验概率求导(),而频率学派的点估计呢是对似然概率求导()。基于采样的呢是,对目标分布采样,用样本来表示目标分布. 另外,MCMC的理解,要参考两个非常形象的比喻,寻找山峰和利用随机计算圆的面积。
import scipy.stats as stats
import matplotlib.pyplot as plt
import numpy as np
# 生成数据 - 有两组数据X服从true_lambda_1 的指数分布,Y服从true_lambda_2的指数分布,我们要考察的是他们的联合概率分布的更新过程。
#
N=10 # sample size
true_lambda_1=1
true_lambda_2=3
data=np.concatenate(
[
stats.poisson.rvs(true_lambda_1,size=(N,1)),
stats.poisson.rvs(true_lambda_2,size=(N,1)),
],
axis=1
) # x和y各自抽样一次,样本大小是N
x=y=np.linspace(.01,5,100) # 参数lambda的所有可能取值
likelihood_x=np.array(
[
stats.poisson.pmf(data[:,0],xi) for xi in x
]
).prod(axis=1) # 第一组10个样本同时出现,lambda的概率分布!!!
likelihood_y=np.array(
[
stats.poisson.pmf(data[0,:],yi) for yi in y
]
).prod(axis=1) # 第二组10个样本同时出现,lambda的概率分布!!
L=np.dot(likelihood_x.reshape((-1,1)),likelihood_y.reshape((1,-1))) # likelihood乘积的矩阵,矩阵的每一个元素都是p(x|lambda)p(y|lambda),用来更新先验形成后验,直接乘以先验就行了
print(L.shape)
(100, 100)
# 先验分布 - 均匀分布
plt.subplot(121)
uni_x=stats.uniform.pdf(x,loc=0,scale=5)
uni_y=stats.uniform.pdf(x,loc=0,scale=5)
M=np.dot(uni_x.reshape((-1,1)),uni_y.reshape((1,-1)))
im=plt.imshow(M,interpolation="none",origin="lower",cmap=plt.cm.jet,extent=(0,5,0,5))
plt.scatter(true_lambda_2,true_lambda_1,c="k",s=50) # 注意纵坐标是lambda_2的分布
plt.title("uniform prior")
plt.xlabel("lambda_2")
plt.ylabel("lambda_1")
# 更新分布
plt.subplot(122)
plt.contour(x,y,M*L) # 画等高线
im=plt.imshow(M*L,interpolation="none",origin="lower",cmap=plt.cm.jet,extent=(0,5,0,5))
plt.scatter(true_lambda_2,true_lambda_1,c="k",s=50) # 画出真实值的位置
plt.title("postieror after 1 sampling")
plt.xlabel("lambda_2")
plt.ylabel("lambda_1")
Text(0, 0.5, 'lambda_1')
上面这个过程呢,其实是贝叶斯推断的标准过程。包含了几个要素:确定模型(x,y来自两个不同的泊松分布),确定模型参数(两个泊松分布的参数lambda_1,lambda_2),确定先验(均采用均匀分布),采样,更新。
其实,有了这个标准过程就已经可以解决问题了。MCMC呢,对这个过程进行了改进,规避掉了计算复杂度最高的分母。MCMC结合了两个东西,第一个MC是markov chain,指的是当前值只依赖上一步的值。第二个MC呢,指的是monte carlo这是一个用根据分布获取随机数的东西,同时获取出来的随机数呢,又能反向勾勒出原分布的情况。蒙地卡罗还可以利用随机来测量圆形或者不规则图形面积哦,很重要的一个技巧,我们不需要知道原到底长什么样,我们只需要知道某个点到底来自不来自于这个圆内就行了。MCMC是这样一个核心思想(书上看到的一段话,因为写的太好了,所以直接搬运一下):
首先明确一点,MCMC返回值不是后验分布,而是后验分布上的一些样本点。我们可以把MCMC的搜索过程看做是寻找正确的山峰的过程,上面的图也能看出来,概率大的地方就是形成了一个山峰,我们要找的就是这样一个山峰。但是我们返回的是这座上上的一些石头,用来表示这座山。我们可以把MCMC的搜索过程,想象成不断地问每一块石头(样本):你是不是来自我要找的那座山?如果是,那么就会沿着这块石头所指引的方向前进,如果不是就丢掉。最后我们把所有是那座山的石头都作为返回值给出。关于MCMC,我找到了一个很好的入门短视频MCMC
MCMC的具体流程呢是这样的,整个过程和利用随机测圆面积很像:
- 对参数,先给一个先验分布,并且给出一个最开始的初始值
- 利用马尔科夫链得到下一个参数 作为下一步的参数。具体做法是利用一个叫Proposal distribution的东西,这个分布呢其实是markov chain里的transition function,它结合当前值给出下一个值的分布,比如说我们在这里用的就是很简单的正态分布,可以看出其实就是以当前参数作为期望,然后给一个方差,得到这个分布,然后从这个分布上采样出下一个参数。所以说呀,这个玩意用什么分布其实都不重要,感觉就其实有点随机了,主要是后面有拒绝和接受那一步进行把关,这一步主要就是要随机给点(对应到测面积里,就是在随机产生点)
- 然后我们要判断这个是不是来自我们要找的那个山,我们要决定是接受还是拒绝。我们定义一个接受率。这个接受率其实挺好理解的,如何判断一个参数是好还是坏呢?我们就看这个参数下,针对指定的evidence X给出的后验概率是大还是小,如果大那就毫无疑问就直接接受,如果小我们也不一定都拒绝,要以一定的概率拒绝。什么概率呢?我们用一个0到1之间的均匀分布采样一个数,如果这个采样出来的数比大,那就接受,如果比它小呢,就拒绝(对应到圆面积里,这一步就是在判断随机产生的点是不是在圆内)
- 重复上述过程
下面的内容呢,我将参考这篇教程,首先用dummy data跑一遍MCMC,然后再用一个实际的例子跑一遍MCMC。代码只用到numpy,scipy, matplotlib,没有用到任何成熟的库
#“”“Dummy Data Example。注意我们对acceptance 函数等式两边求了个log,不然计算机没办法计算”“”
import numpy as np
import scipy.stats as stats
# prepare data
mod1=lambda t:np.random.normal(10,3,t)
population=mod1(30000) # 总数据是30000个
observations=population[np.random.randint(0,30000,1000)] #假设只观察到1000个数据
fig=plt.figure(figsize=(10,10))
ax=fig.add_subplot(1,1,1)
ax.hist(observations,bins=35)
ax.set_xlabel("Value")
ax.set_ylabel("Frequency")
ax.set_title("Distribution of Dummy Data")
mu=observations.mean()
print("mu=",mu)
mu= 10.177366628047666
# 为了简化问题,我们只求参数sigma,mu我们通过观察这1000组数据,用平均值表示mu。其实mu也可以用mcmc求,但是这只是一个简单的例子,不想搞得太复杂
# proposal distribution - 同样,我们采用一个很常用的proposal distribution,就是正态分布N(mu=sigma_current,sigma=1)
sample_proposal_distribution=lambda mu: np.random.normal(mu,0.5,1)
# function to calculate prior
def get_log_prior(theta):
"""注意,我们对先验没有要求,认为就是个均匀分布,而且我们整个过程中也没有更新这个先验,始终认为先验就是均匀分布"""
sigma=theta["sigma"]
if sigma>0:
return 0
else:
return -np.inf
# function to calculate posterior
def get_log_posterior(theta,evidence,log_prior):
sigma=theta["sigma"]
likelihood=stats.norm.pdf(evidence,loc=9.8,scale=sigma)
log_likelihood=np.log(likelihood).sum()
return log_likelihood+log_prior
# acceptance function
def acceptance(current_theta,proposed_theta):
current_log_prior=get_log_prior(current_theta)
proposed_log_prior=get_log_prior(proposed_theta)
current_log_posterior=get_log_posterior(current_theta,observations,current_log_prior)
proposed_log_posterior=get_log_posterior(proposed_theta,observations,proposed_log_prior)
log_r=proposed_log_posterior-current_log_posterior
if log_r>1:
return True
else:
alpha=np.random.uniform(0,1)
if alpha<np.exp(log_r):
return True
else:
return False
def metrolpolis_hastings(initial_theta,n_samples):
count_samples=0
accepted_samples=[]
rejected_samples=[]
current_theta=initial_theta
while count_samples <n_samples:
proposed_sigma=sample_proposal_distribution(current_theta["sigma"])
proposed_theta={"sigma":proposed_sigma[0]}
accept_or_reject=acceptance(current_theta,proposed_theta)
if accept_or_reject is True:
count_samples+=1
accepted_samples.append(proposed_theta)
current_theta=proposed_theta
else:
rejected_samples.append(proposed_theta)
return accepted_samples,rejected_samples
initial_theta={"sigma":0.5}
n_samples=10000
accepted_samples,rejected_samples=metrolpolis_hastings(initial_theta,n_samples)
# visualize the trace of sigma
accepted_sigmas=[s["sigma"] for s in accepted_samples]
rejected_sigmas=[s["sigma"] for s in rejected_samples]
steps=range(len(accepted_sigmas))
plt.subplot(121)
plt.hist(accepted_sigmas,bins=30)
plt.title("Accepted values")
plt.subplot(122)
plt.hist(rejected_sigmas,bins=30,color="red")
plt.title("rejected falues")
Text(0.5, 1.0, 'rejected values')
最后附上更简洁的完整代码
"""
Author: xuqh
Created on 2022/1/5
This is a complete implementation of Metropolis-Hasting Algorithm (MCMC)
"""
import numpy as np
import scipy.stats as stats
import matplotlib.pyplot as plt
def sample_from_proposal_distribution(current_theta):
current_sigma = current_theta["sigma"]
return {
"sigma": np.random.normal(current_sigma, 1, 1)[0]
}
def get_log_prior(theta):
"""log(p(theta))"""
sigma = theta["sigma"]
if sigma > 0:
return 0
else:
return -np.inf
def get_log_likelihood(evidence, theta):
"""log(p(evidence|theta))"""
sigma = theta["sigma"]
likelihood = stats.norm.pdf(evidence, loc=9.8, scale=sigma)
log_likelihood = np.log(likelihood).sum()
return log_likelihood
def get_log_posterior(evidence, theta):
"""log(p(theta))+log(p(evidence|theta))"""
log_prior = get_log_prior(theta)
log_likelihood = get_log_likelihood(evidence, theta)
return log_prior + log_likelihood
def accept(current_theta, proposed_theta, evidence):
current_log_posterior = get_log_posterior(evidence, current_theta)
proposed_log_posterior = get_log_posterior(evidence, proposed_theta)
r = proposed_log_posterior - current_log_posterior
if r > 1:
return True
else:
alpha = np.random.uniform(0, 1)
if alpha < np.exp(r):
return True
else:
return False
def metropolis_hastings(initial_theta, n_samples, evidence):
current_theta = initial_theta
n_accepted_samples = 0
accepted_samples = []
rejected_samples = []
while n_accepted_samples < n_samples:
proposed_theta = sample_from_proposal_distribution(current_theta)
if accept(current_theta, proposed_theta, evidence):
n_accepted_samples += 1
current_theta=proposed_theta
accepted_samples.append(proposed_theta)
else:
rejected_samples.append((n_accepted_samples, proposed_theta))
return accepted_samples, rejected_samples
if __name__ == '__main__':
# generate data
observations = np.random.normal(10, 3, 1000)
# metropolis-hastings
initial_theta = {
"sigma": 1.0
}
n_samples = 10000
accepted_samples, rejected_samples = metropolis_hastings(initial_theta, n_samples, observations)
# plot trace & target distribution
plt.subplot(2, 1, 1)
plt.title("trace of sigma")
accepted_xs = range(len(accepted_samples))
accepted_ys = [s["sigma"] for s in accepted_samples]
plt.plot(accepted_xs, accepted_ys, color="green")
rejected_xs = [i for i, s in rejected_samples]
rejected_ys = [s["sigma"] for i, s in rejected_samples]
plt.scatter(rejected_xs, rejected_ys, marker="x", color="red")
plt.subplot(2, 1, 2)
plt.title("target distribution")
plt.hist(accepted_ys, bins=30)
plt.show()
再赠送一个实际的例子,数据来自这个网站的monthly sunspot数据
"""
Author: xuqh
Created on 2022/1/5
Applicationo of MCMC and Metropolis-hastings algorithm in real-world cases
Case: Sunspot number (monthly) X
Assumption: X~Gamma(a,b)
"""
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import scipy.stats as stats
def sample_from_proposal_distribution(theta):
a = theta['a']
b = theta["b"]
proposed_a = np.random.normal(a, 0.05, 5)[0]
proposed_b = np.random.normal(b, 0.05, 5)[0]
return {
"a": proposed_a,
"b": proposed_b
}
def get_data():
fpath = "data/SN_m_tot_V2.0.csv"
pdf = pd.read_csv(open(fpath), header=None, sep=";")
data = pdf[3].to_numpy()
return data
def get_log_prior(theta):
a = theta['a']
b = theta["b"]
if a <= 0 or b <= 0:
return -np.inf
else:
return 0
def get_log_likelihood(theta, evidence):
a = theta['a']
b = theta["b"]
likelihood = stats.gamma.pdf(evidence,a=a, scale=b, loc=0)
log_likelihood = np.log(likelihood).sum()
temp=likelihood
return log_likelihood
def get_log_posterior(theta, evidence):
log_prior = get_log_prior(theta)
log_likelihood = get_log_likelihood(theta, evidence)
return log_prior + log_likelihood
def accept(current_theta, proposed_theta, evidence):
current_log_posterior = get_log_posterior(current_theta, evidence)
proposed_log_posterior = get_log_posterior(proposed_theta, evidence)
log_r = proposed_log_posterior - current_log_posterior
if log_r > 1:
return True
else:
alpha = np.random.uniform(0, 1)
if alpha < np.exp(log_r):
return True
else:
return False
def metropolis_hastings(initial_theta, n_samples, evidence):
current_theta = initial_theta
n_accepted_samples = 0
accepted_samples = []
rejected_samples = []
while n_accepted_samples < n_samples:
proposed_theta = sample_from_proposal_distribution(current_theta)
if accept(current_theta, proposed_theta, evidence):
n_accepted_samples += 1
current_theta = proposed_theta
accepted_samples.append(proposed_theta)
else:
rejected_samples.append(proposed_theta)
return accepted_samples, rejected_samples
if __name__ == '__main__':
data = get_data()
data+=1 # remove zeros
# data=data[-1000:]
initial_theta = {
"a": 4,
"b": 10
}
n_samples = 1000
accepted_samples, rejected_samples = metropolis_hastings(initial_theta, n_samples, data)
plt.title("trace of a and b")
accepted_xs=[s["a"] for s in accepted_samples]
accepted_ys=[s["b"] for s in accepted_samples]
plt.plot(accepted_xs,accepted_ys)
rejected_xs=[s["a"] for s in rejected_samples]
rejected_ys=[s["b"] for s in rejected_samples]
plt.scatter(rejected_xs,rejected_ys,marker="x",color="red")
plt.show()