之前的神经网络都是基于梯度下降算法来优化的,梯度下降是基于导数的,有一定的数学道理,但是常常会遇到鞍点问题,导致不能达到最优解
本文使用 PSO 粒子群优化算法来替代神经网络中的bp算法
1. 定义神经网络模型
class Module(nn.Module):
def __init__(self, pso_data):
super(Module, self).__init__()
# 接收 pso_data 优化参数
self.pso_data = pso_data
# 图片尺寸:1*28*28
self.conv1 = nn.Conv2d(in_channels=1, out_channels=8, kernel_size=3, padding=1, bias=False)
self.conv2 = nn.Conv2d(in_channels=8, out_channels=4, kernel_size=3, padding=1, bias=False)
self.conv3 = nn.Conv2d(in_channels=4, out_channels=2, kernel_size=3, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(2)
self.bn2 = nn.BatchNorm2d(8)
self.maxPooling = nn.MaxPool2d(2)
self.fc = nn.Linear(98, 10)
self.relu = nn.ReLU(inplace=True)
def forward(self, x):
# 调用初始化 PSO 参数
self._model_param_init()
x = self.conv1(x)
x = self.bn2(x)
x = self.relu(self.maxPooling(x))
x = self.conv2(x)
x = self.relu(self.maxPooling(x))
x = self.conv3(x)
x = self.bn1(x)
x = self.relu(x)
# print("x.shape = ", x.shape)
# Flatten data from (64, 2, 7, 7) to (64,98)
x = x.view(x.size(0), -1)
x = self.fc(x)
return x
2. 定义指定初始化参数值
# 初始化函数
def _model_param_init(self):
self.conv1.weight.data = torch.FloatTensor(self.pso_data[0:72]).reshape(8, 1, 3, 3)
self.conv2.weight.data = torch.FloatTensor(self.pso_data[72:360]).reshape(4, 8, 3, 3)
self.conv3.weight.data = torch.FloatTensor(self.pso_data[360:432]).reshape(2, 4, 3, 3)
# self.conv1.bias.data = torch.arrange(270).reshape(10, 3, 3, 3)
self.bn1.weight.data = torch.FloatTensor(self.pso_data[432:434])
self.bn1.bias.data = torch.FloatTensor(self.pso_data[434:436])
self.bn2.weight.data = torch.FloatTensor(self.pso_data[436:444])
self.bn2.bias.data = torch.FloatTensor(self.pso_data[444:452])
self.fc.weight.data = torch.FloatTensor(self.pso_data[452:1432]).reshape(10, 98)
self.fc.bias.data = torch.FloatTensor(self.pso_data[1432:1442])
3. 调用pyswarms库中的粒子群优化算法
# -*- codeing = utf-8 -*-
# @Time : 2022/7/4 12:25
# @Software : PyCharm
import time
import torch
import torch.nn as nn
from torch import optim
from torch.utils.data import DataLoader
from torchsummary import summary
from torchvision import datasets
from torchvision.transforms import transforms
import pyswarms as ps
import numpy as np
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
batch_size = 32
# transforms.ToTensor()---shape从(H,W,C)->(C,H,W), 每个像素点从(0-255)映射到(0-1):直接除以255
# transforms.Normalize()---先将输入归一化到(0,1),像素点通过"(x-mean)/std",将每个元素分布到(-1,1)
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(std=(0.1307,), mean=(0.3081,))
])
# 1.准备数据集
train_dataset = datasets.MNIST(root="../DataSet/mnist", train=True, transform=transform, download=True)
test_dataset = datasets.MNIST(root="../DataSet/mnist", train=False, transform=transform, download=True)
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)
class Module(nn.Module):
def __init__(self, pso_data):
super(Module, self).__init__()
# 接收 pso_data 优化参数
self.pso_data = pso_data
# 图片尺寸:1*28*28
self.conv1 = nn.Conv2d(in_channels=1, out_channels=8, kernel_size=3, padding=1, bias=False)
self.conv2 = nn.Conv2d(in_channels=8, out_channels=4, kernel_size=3, padding=1, bias=False)
self.conv3 = nn.Conv2d(in_channels=4, out_channels=2, kernel_size=3, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(2)
self.bn2 = nn.BatchNorm2d(8)
self.maxPooling = nn.MaxPool2d(2)
self.fc = nn.Linear(98, 10)
self.relu = nn.ReLU(inplace=True)
def forward(self, x):
# 调用初始化 PSO 参数
self._model_param_init()
x = self.conv1(x)
x = self.bn2(x)
x = self.relu(self.maxPooling(x))
x = self.conv2(x)
x = self.relu(self.maxPooling(x))
x = self.conv3(x)
x = self.bn1(x)
x = self.relu(x)
# print("x.shape = ", x.shape)
# Flatten data from (64, 2, 7, 7) to (64,98)
x = x.view(x.size(0), -1)
x = self.fc(x)
return x
def _model_param_init(self):
self.conv1.weight.data = torch.FloatTensor(self.pso_data[0:72]).reshape(8, 1, 3, 3)
self.conv2.weight.data = torch.FloatTensor(self.pso_data[72:360]).reshape(4, 8, 3, 3)
self.conv3.weight.data = torch.FloatTensor(self.pso_data[360:432]).reshape(2, 4, 3, 3)
# self.conv1.bias.data = torch.arrange(270).reshape(10, 3, 3, 3)
self.bn1.weight.data = torch.FloatTensor(self.pso_data[432:434])
self.bn1.bias.data = torch.FloatTensor(self.pso_data[434:436])
self.bn2.weight.data = torch.FloatTensor(self.pso_data[436:444])
self.bn2.bias.data = torch.FloatTensor(self.pso_data[444:452])
self.fc.weight.data = torch.FloatTensor(self.pso_data[452:1432]).reshape(10, 98)
self.fc.bias.data = torch.FloatTensor(self.pso_data[1432:1442])
# 4.训练数据集
def train(pso_data):
model = Module(pso_data).to(device)
# 3.构造损失器和优化器
criterion = torch.nn.CrossEntropyLoss() # softmax 函数的作用包含在 CrossEntropyLoss 中,交叉熵算法
opt = optim.SGD(params=model.parameters(), lr=0.01, momentum=0.5)
batch_loss = []
for batch_idx, (inputs, target) in enumerate(train_loader, 0):
inputs, target = inputs.to(device), target.to(device)
# 重置梯度
opt.zero_grad()
# 送入数据进行计算
y_pred_data = model(inputs)
# 计算损失
loss = criterion(y_pred_data, target)
# print("第{}个batch,loss={}".format(batch_idx + 1, loss))
batch_loss.append(loss.item())
# # 反向传播
# loss.backward()
# # 梯度更新
# opt.step()
return np.mean(batch_loss)
# 5.测试数据集
def verify(pso_data):
model = Module(pso_data).to(device)
model.eval()
correct = 0
total = 0
# 该语句下的所有tensor在进行反向传播时,不会被计算梯度
with torch.no_grad():
for (images, labels) in test_loader:
images, labels = images.to(device), labels.to(device)
# 数据进入模型进行计算
outputs = model(images)
# 沿着维度为1的方向(行方向) 寻找每行最大元素的值与其下标
_, predicted = torch.max(outputs.data, dim=1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
print("Accuracy on test set: %d%%" % (100 * correct / total))
# 定义目标函数
def obj_func(pso_data):
print("\npso_data =", pso_data)
loss_list = []
for i in range(10):
# print("\npso_data =", pso_data[i])
loss = train(pso_data[i])
loss_list.append(loss)
return np.array(loss_list)
if __name__ == '__main__':
# model = Module([0]*1442)
# summary(model, input_size=(1, 28, 28), batch_size=-1)
start_time = time.time()
print("\n开始粒子群优化训练=========================================================================================")
options = {'c1': 0.5, 'c2': 0.3, 'w': 0.9}
# Call instance of PSO
dimensions = 1442
constraints = (np.array([-1] * 1442), np.array([1] * 1442))
optimizer = ps.single.GlobalBestPSO(n_particles=10, dimensions=dimensions, options=options, bounds=constraints)
# Perform optimization
cost, pos = optimizer.optimize(obj_func, iters=50)
end_time = time.time()
time_cost = end_time - start_time
print("\n训练完毕================================================================================================")
print("\nRun time: %.9f (h).\n" % (time_cost / 3600))
# 测试
verify(pos)