-- coding: utf-8 --
"""
Created on Wed Oct 3 21:54:36 2018
@author: ltx
"""
采用优化的梯度算法如:动量梯度下降算法,adam算法等来提高精确度。
import numpy as np
import matplotlib.pyplot as plt
import scipy.io
import math
import sklearn
import sklearn.datasets
import opt_utils
import testCase
plt.rcParams['figure.figsize'] = (7.0, 4.0) # set default size of plots
plt.rcParams['image.interpolation'] = 'nearest'
plt.rcParams['image.cmap'] = 'gray'
---------------------划分数据集为minibatch-------------------
def GetMinibatch(X,Y,batch_number,seed=0):
np.random.seed(seed)#指定随机种子
m=X.shape[1]
Number=math.floor(m/batch_number) #floor为向上取整
#打乱测试集的顺序
A=list(np.random.permutation(m)) #生成0~m-1的随机数
shuffer_X=X[:,A]
shuffer_Y=Y[:,A].reshape((1,m))
Bacths=[]
for i in range(0,Number):
shuffer_batch_X=shuffer_X[:,i*(batch_number):(i+1)*(batch_number)]
shuffer_batch_Y=shuffer_Y[:,i*(batch_number):(i+1)*(batch_number)]
Bacths.append((shuffer_batch_X,shuffer_batch_Y))
if(m%batch_number!=0):
shuffer_batch_X=shuffer_X[:,Number*batch_number:]
shuffer_batch_Y=shuffer_Y[:,Number*batch_number:]
Bacths.append((shuffer_batch_X,shuffer_batch_Y))
return Bacths
print("-------------测试random_mini_batches-------------")
X_assess,Y_assess,mini_batch_size = testCase.random_mini_batches_test_case()
mini_batches = GetMinibatch(X_assess,Y_assess,mini_batch_size)
print("第1个mini_batch_X 的维度为:",mini_batches[0][0].shape)
print("第1个mini_batch_Y 的维度为:",mini_batches[0][1].shape)
print("第2个mini_batch_X 的维度为:",mini_batches[1][0].shape)
print("第2个mini_batch_Y 的维度为:",mini_batches[1][1].shape)
print("第3个mini_batch_X 的维度为:",mini_batches[2][0].shape)
print("第3个mini_batch_Y 的维度为:",mini_batches[2][1].shape)
采用动量梯度下降算法_初始化动量矢量
def Initial_velocity(parameters):
L=len(parameters)//2 #L=4 //除后结果为整数,/除后结果为浮点数
V={}
for l in range(L):
V["dW"+str(l+1)]=np.zeros_like(parameters["W"+str(l+1)])
V["db"+str(l+1)]=np.zeros_like(parameters["b"+str(l+1)])
return V
测试initialize_velocity
print("-------------测试initialize_velocity-------------")
parameters = testCase.initialize_velocity_test_case()
v = Initial_velocity(parameters)
print('v["dW1"] = ' + str(v["dW1"]))
print('v["db1"] = ' + str(v["db1"]))
print('v["dW2"] = ' + str(v["dW2"]))
print('v["db2"] = ' + str(v["db2"]))
def UpdateWith_velocity (parameters,grads,V,beta,learning_rate):
L=len(parameters)//2
for l in range(L):
V["dW"+str(l+1)]=betaV["dW"+str(l+1)]+(1-beta)grads["dW"+str(l+1)]
V["db"+str(l+1)]=betaV["db"+str(l+1)]+(1-beta)grads["db"+str(l+1)]
parameters["W"+str(l+1)]=parameters["W"+str(l+1)]-learning_rate*V["dW"+str(l+1)]
parameters["b"+str(l+1)]=parameters["b"+str(l+1)]-learning_rate*V["db"+str(l+1)]
return parameters,V
测试update_parameters_with_momentun
print("-------------测试update_parameters_with_momentun-------------")
parameters,grads,v = testCase.update_parameters_with_momentum_test_case()
UpdateWith_velocity (parameters,grads,v,beta=0.9,learning_rate=0.01)
print("W1 = " + str(parameters["W1"]))
print("b1 = " + str(parameters["b1"]))
print("W2 = " + str(parameters["W2"]))
print("b2 = " + str(parameters["b2"]))
print('v["dW1"] = ' + str(v["dW1"]))
print('v["db1"] = ' + str(v["db1"]))
print('v["dW2"] = ' + str(v["dW2"]))
print('v["db2"] = ' + str(v["db2"]))
-------------Adam算法---------------------------------
初始化Adam所需要的参数:
def initial_Adam(parameters):
L=len(parameters)//2
S={}
V={}
for l in range(L):
S["dW"+str(1+l)]=np.zeros_like(parameters["W"+str(1+l)])
S["db"+str(1+l)]=np.zeros_like(parameters["b"+str(1+l)])
V["dW"+str(1+l)]=np.zeros_like(parameters["W"+str(1+l)])
V["db"+str(1+l)]=np.zeros_like(parameters["b"+str(1+l)])
return V,S
测试initialize_adam
print("-------------测试initialize_adam-------------")
parameters = testCase.initialize_adam_test_case()
v,s = initial_Adam(parameters)
print('v["dW1"] = ' + str(v["dW1"]))
print('v["db1"] = ' + str(v["db1"]))
print('v["dW2"] = ' + str(v["dW2"]))
print('v["db2"] = ' + str(v["db2"]))
print('s["dW1"] = ' + str(s["dW1"]))
print('s["db1"] = ' + str(s["db1"]))
print('s["dW2"] = ' + str(s["dW2"]))
print('s["db2"] = ' + str(s["db2"]))
------使用Adam公式更新参数----------------
def Update_parameter_Adam(parameters,grads,V,S,t,learning_rate=0.01,beta1=0.9,beta2=0.999,epsilon=1e-8):
L=len(parameters)//2
V_corrected={}
S_corrected={}
for l in range(L):
V["dW"+str(1+l)]=beta1V["dW"+str(1+l)]+(1-beta1)grads["dW"+str(1+l)]
V["db"+str(1+l)]=beta1V["db"+str(1+l)]+(1-beta1)grads["db"+str(1+l)]
V_corrected["dW"+str(1+l)]=V["dW"+str(1+l)]/(1-np.power(beta1,t))
V_corrected["db"+str(1+l)]=V["db"+str(1+l)]/(1-np.power(beta1,t))
S["dW"+str(1+l)]=beta2*S["dW"+str(1+l)]+(1-beta2)*np.square(grads["dW"+str(1+l)])
S["db"+str(1+l)]=beta2*S["db"+str(1+l)]+(1-beta2)*np.square(grads["db"+str(1+l)])
S_corrected["dW"+str(1+l)]=S["dW"+str(1+l)]/(1-np.power(beta2,t))
S_corrected["db"+str(1+l)]=S["db"+str(1+l)]/(1-np.power(beta2,t))
parameters["W"+str(1+l)]=parameters["W"+str(1+l)]-learning_rate*(V_corrected["dW"+str(1+l)])/(np.sqrt(S_corrected["dW"+str(1+l)])+epsilon)
parameters["b"+str(1+l)]=parameters["b"+str(1+l)]-learning_rate*(V_corrected["db"+str(1+l)])/(np.sqrt(S_corrected["db"+str(1+l)])+epsilon)
return parameters,V,S
测试update_with_parameters_with_adam
print("-------------测试update_with_parameters_with_adam-------------")
parameters , grads , v , s = testCase.update_parameters_with_adam_test_case()
Update_parameter_Adam(parameters,grads,v,s,t=2)
print("W1 = " + str(parameters["W1"]))
print("b1 = " + str(parameters["b1"]))
print("W2 = " + str(parameters["W2"]))
print("b2 = " + str(parameters["b2"]))
print('v["dW1"] = ' + str(v["dW1"]))
print('v["db1"] = ' + str(v["db1"]))
print('v["dW2"] = ' + str(v["dW2"]))
print('v["db2"] = ' + str(v["db2"]))
print('s["dW1"] = ' + str(s["dW1"]))
print('s["db1"] = ' + str(s["db1"]))
print('s["dW2"] = ' + str(s["dW2"]))
print('s["db2"] = ' + str(s["db2"]))
-------普通的梯度下降方法---------------------
def Upadate_parameter(parameters,grads,learing_rate=0.8):
L=len(parameters)//2
for l in range(L):
parameters["W"+str(1+l)]=parameters["W"+str(1+l)]-learing_rategrads["dW"+str(1+l)]
parameters["b"+str(1+l)]=parameters["b"+str(1+l)]-learing_rategrads["db"+str(1+l)]
return parameters
----------------model------------------------
加载数据集
train_X,train_Y = opt_utils.load_dataset(is_plot=False)
分成一小批一小批的数据batch
batch_number=64
layer_dims=[train_X.shape[0],5,2,1]
初始化模型参数
costs = []
parameters=opt_utils.initialize_parameters(layer_dims)
循环训练模型参数
def model (parameters,td="gd",learing_rate=0.0007,beta=0.9,is_plot=True,print_cost=True,iterations=10000):
t = 0
seed = 10
for i in range(iterations):
seed=seed+1
Batches = GetMinibatch(train_X,train_Y ,batch_number,seed)
for batch in Batches :
(bacth_X,batch_Y)=batch
#向前传播
A3,cache=opt_utils.forward_propagation(bacth_X,parameters)
#计算cost
cost=opt_utils.compute_cost(A3,batch_Y)
#向后传播
grads=opt_utils.backward_propagation(bacth_X,batch_Y,cache)
#更新模型参数
if(td=="gd"):
parameters=Upadate_parameter(parameters,grads,learing_rate)
elif(td=="velocity"):
V=Initial_velocity(parameters)
parameters,V=UpdateWith_velocity (parameters,grads,V,beta,learing_rate)
elif(td=="adam"):
V,S=initial_Adam(parameters)
t=t+1
parameters,V,S=Update_parameter_Adam(parameters,grads,V,S,t,learing_rate,beta1=0.9,beta2=0.999,epsilon=1e-8)
#记录误差值
if i % 100 == 0:
costs.append(cost)
#是否打印误差值
if print_cost and i % 1000 == 0:
print("第" + str(i) + "次遍历整个数据集,当前误差值:" + str(cost))
if is_plot:
plt.plot(costs)
plt.ylabel('cost')
plt.xlabel('epochs (per 100)')
plt.title("Learning rate =" + str(learing_rate))
plt.show()
return parameters
parameters = model(parameters,td="gd",is_plot=True)
预测
preditions = opt_utils.predict(train_X,train_Y,parameters)
绘制分类图
plt.title("Model with Gradient Descent optimization")
axes = plt.gca()
axes.set_xlim([-1.5, 2.5])
axes.set_ylim([-1, 1.5])
opt_utils.plot_decision_boundary(lambda x: opt_utils.predict_dec(parameters, x.T), train_X, train_Y)
--------------实验结果------------------------------