1. 安装配置环境
pip install numpy pandas matplotlib seaborn plotly requests tqdm opencv-python pillow wandb -i https://pypi.tuna.tsinghua.edu.cn/simple
pip3 install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cu113
#下载中文字体
wget https://zihao-openmmlab.obs.cn-east-3.myhuaweicloud.com/20220716-mmclassification/dataset/SimHei.ttf --no-check-certificate
#创建目录
import os
# 存放结果文件
os.mkdir('output')
# 存放训练得到的模型权重
os.mkdir('checkpoints')
# 存放生成的图表
os.mkdir('图表')
2. 准备图像分类数据集
wget https://zihao-openmmlab.obs.cn-east-3.myhuaweicloud.com/20220716-mmclassification/dataset/fruit30/fruit30_split.zip
#解压
unzip fruit30_split.zip >> /dev/null
# 删除压缩包
!rm fruit30_split.zip
3. 使用迁移学习微调,训练出图像分类模型
在自己的图像分类数据集上,使用ImageNet预训练图像分类模型初始化,改动分类层,迁移学习微调训练
#导入工具包
import time
import os
import numpy as np
from tqdm import tqdm
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
%matplotlib inline
# 忽略烦人的红色提示
import warnings
warnings.filterwarnings("ignore")
# 有 GPU 就用 GPU,没有就用 CPU
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print('device', device)
###图像预处理
from torchvision import transforms
# 训练集图像预处理:缩放裁剪、图像增强、转 Tensor、归一化
train_transform = transforms.Compose([transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
# (不需要图像增强)测试集图像预处理-RCTN:缩放、裁剪、转 Tensor、归一化
test_transform = transforms.Compose([transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
#载入图像分类数据集
# 数据集文件夹路径
dataset_dir = 'fruit30_split'
train_path = os.path.join(dataset_dir, 'train')
test_path = os.path.join(dataset_dir, 'val')
print('训练集路径', train_path)
print('测试集路径', test_path)
from torchvision import datasets
#datasets中的ImageFolder是直接传入数据集的路径和预处理的方式就可以构建出训练集和测试集
# 载入训练集
train_dataset = datasets.ImageFolder(train_path, train_transform)
# 载入测试集
test_dataset = datasets.ImageFolder(test_path, test_transform)
print('训练集图像数量', len(train_dataset))
#训练集图像数量 4375
print('类别个数', len(train_dataset.classes))
#类别个数 30
print('各类别名称', train_dataset.classes)
#类别名称 ['哈密瓜', '圣女果', '山竹', '杨梅', '柚子', '
#柠檬', '桂圆', '梨', '椰子', '榴莲', '火龙果', '猕猴桃', '
#石榴', '砂糖橘', '胡萝卜', '脐橙', '芒果', '苦瓜', '苹果-
#红', '苹果-青', '草莓', '荔枝', '菠萝', '葡萄-白', '葡萄-
#红', '西瓜', '西红柿', '车厘子', '香蕉', '黄瓜']
print('测试集图像数量', len(test_dataset))
print('类别个数', len(test_dataset.classes))
print('各类别名称', test_dataset.classes)
#类别和索引号一一对应
# 各类别名称
class_names = train_dataset.classes
n_class = len(class_names)
class_names
# 映射关系:类别 到 索引号
train_dataset.class_to_idx
#{'哈密瓜': 0, '圣女果': 1, '山竹': 2, '杨梅': 3, '柚子': 4, '柠檬': 5, '桂圆': 6, #'梨': 7, '椰子': 8, '榴莲': 9, '火
#龙果': 10, '猕猴桃': 11, '石榴': 12, '砂糖橘': 13, '胡萝卜
#': 14, '脐橙': 15, '芒果': 16, '苦瓜': 17, '苹果-红': 18, '苹果-青': 19, '草莓': #20, '荔枝': 21, '菠萝': 22, '葡萄-
#白': 23, '葡萄-红': 24, '西瓜': 25, '西红柿': 26, '车厘子': 27, '香蕉': 28, '黄瓜': 29}
# 映射关系:索引号 到 类别
idx_to_labels = {y:x for x,y in train_dataset.class_to_idx.items()}
idx_to_labels
# 保存为本地的 npy 文件
np.save('idx_to_labels.npy', idx_to_labels)
np.save('labels_to_idx.npy', train_dataset.class_to_idx)
#定义数据加载器DataLoader
from torch.utils.data import DataLoader
BATCH_SIZE = 32
# 训练集的数据加载器
train_loader = DataLoader(train_dataset,
batch_size=BATCH_SIZE,
shuffle=True,
num_workers=4
)
# 测试集的数据加载器
test_loader = DataLoader(test_dataset,
batch_size=BATCH_SIZE,
shuffle=False,
num_workers=4
)
#查看一个batch的图像和标注
# DataLoader 是 python生成器,每次调用返回一个 batch 的数据
images, labels = next(iter(train_loader))
images.shape
labels
#可视化一个batch的图像和标注
# 将数据集中的Tensor张量转为numpy的array数据类型
images = images.numpy()
images[5].shape
plt.hist(images[5].flatten(), bins=50)
plt.savefig('hist.pdf')
plt.show()
# batch 中经过预处理的图像
idx = 2
plt.imshow(images[idx].transpose((1,2,0))) # 转为(224, 224, 3)
plt.title('label:'+str(labels[idx].item()))
plt.savefig('label.pdf')
label = labels[idx].item()
label
pred_classname = idx_to_labels[label]
pred_classname
#火龙果
# 原始图像
idx = 2
mean = np.array([0.485, 0.456, 0.406])
std = np.array([0.229, 0.224, 0.225])
plt.imshow(np.clip(images[idx].transpose((1,2,0)) * std + mean, 0, 1))
plt.title('label:'+ pred_classname)
plt.savefig('label_org.pdf')
plt.show()
#导入训练需使用的工具包
from torchvision import models
import torch.optim as optim
###迁移学习训练方法
#1. 只微调训练模型最后一层(全连接分类层)
model = models.resnet18(pretrained=True) # 载入预训练模型
# 修改全连接层,使得全连接层的输出与当前数据集类别数对应
# 新建的层默认 requires_grad=True
model.fc = nn.Linear(model.fc.in_features, n_class)
#n_class
model.fc
#只微调训练最后一层全连接层的参数,冻结其他层
optimizer=optim.Adam(model.fc.parameters())
#2. 微调训练所有层
model = models.resnet18(pretrained=True) # 载入预训练模型
model.fc = nn.Linear(model.fc.in_features, n_class)
optimizer = optim.Adam(model.parameters())
#3. 随机初始化模型全部权重,重新训练所有层
model = models.resnet18(pretrained=False) # 只载入模型结构,不载入预训练权重参数
model.fc = nn.Linear(model.fc.in_features, n_class)
optimizer = optim.Adam(model.parameters())
#训练配置
model = model.to(device)
# 交叉熵损失函数
criterion = nn.CrossEntropyLoss()
# 训练轮次 Epoch
EPOCHS = 20
#模拟一个batch的训练
# 获得一个 batch 的数据和标注
images, labels = next(iter(train_loader))
images = images.to(device)
labels = labels.to(device)
# 输入模型,执行前向预测
outputs = model(images)
# 获得当前 batch 所有图像的预测类别 logit 分数
outputs.shape
# 由 logit,计算当前 batch 中,每个样本的平均交叉熵损失函数值
loss = criterion(outputs, labels)
# 反向传播“三部曲”
optimizer.zero_grad() # 清除梯度
loss.backward() # 反向传播
optimizer.step() # 优化更新
# 获得当前 batch 所有图像的预测类别
_, preds = torch.max(outputs, 1)
preds
labels
#完整训练过程
# 遍历每个 EPOCH
for epoch in tqdm(range(EPOCHS)):
model.train()
for images, labels in train_loader: # 获得一个 batch 的数据和标注
images = images.to(device)
labels = labels.to(device)
outputs = model(images)
loss = criterion(outputs, labels) # 计算当前 batch 中,每个样本的平均交叉熵损失函数值
optimizer.zero_grad()
loss.backward()
optimizer.step()
#在测试集上初步测试
model.eval()
with torch.no_grad():
correct = 0
total = 0
for images, labels in tqdm(test_loader):
images = images.to(device)
labels = labels.to(device)
outputs = model(images)
_, preds = torch.max(outputs, 1)
total += labels.size(0)
correct += (preds == labels).sum()
print('测试集上的准确率为 {:.3f} %'.format(100 * correct / total))
#测试集上的准确率为 87.755 %
#保存模型
torch.save(model, 'checkpoints/fruit30_pytorch_20220814.pth')
4. 微调训练过程(升级版)
#导入工具包
#图像预处理
#载入图像分类数据集
#类别和索引号映射字典
#定义数据加载器DataLoader
#导入训练时所需要的工具包
from torchvision import models
import torch.optim as optim
from torch.optim import lr_scheduler #增加了学习率
#选择三种迁移学习训练方式
##### 选择一:只微调训练模型最后一层(全连接分类层)
model = models.resnet18(pretrained=True) # 载入预训练模型
# 修改全连接层,使得全连接层的输出与当前数据集类别数对应
# 新建的层默认 requires_grad=True
model.fc = nn.Linear(model.fc.in_features, n_class)
model.fc
# 只微调训练最后一层全连接层的参数,其它层冻结
optimizer = optim.Adam(model.fc.parameters())
##### 选择二:微调训练所有层
# model = models.resnet18(pretrained=True) # 载入预训练模型
# model.fc = nn.Linear(model.fc.in_features, n_class)
# optimizer = optim.Adam(model.parameters())
##### 选择三:随机初始化模型全部权重,从头训练所有层
# model = models.resnet18(pretrained=False) # 只载入模型结构,不载入预训练权重参数
# model.fc = nn.Linear(model.fc.in_features, n_class)
# optimizer = optim.Adam(model.parameters())
#训练配置
model=model.to(device)
# 交叉熵损失函数
criterion = nn.CrossEntropyLoss()
# 训练轮次 Epoch
EPOCHS = 30
# 学习率降低策略
lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)
#每隔5个epoch 让学习率降低为原来的*0.5
#定义函数(在训练集上训练)
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from sklearn.metrics import roc_auc_score
#运行一个 batch 的训练,返回当前 batch 的训练日志
def train_one_batch(images, labels):
'''
运行一个 batch 的训练,返回当前 batch 的训练日志
'''
# 获得一个 batch 的数据和标注
images = images.to(device)
labels = labels.to(device)
outputs = model(images) # 输入模型,执行前向预测
loss = criterion(outputs, labels) # 计算当前 batch 中,每个样本的平均交叉熵损失函数值
# 优化更新权重
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 获取当前 batch 的标签类别和预测类别
_, preds = torch.max(outputs, 1) # 获得当前 batch 所有图像的预测类别
preds = preds.cpu().numpy()
loss = loss.detach().cpu().numpy()
outputs = outputs.detach().cpu().numpy()
labels = labels.detach().cpu().numpy()
log_train = {}
log_train['epoch'] = epoch
log_train['batch'] = batch_idx
# 计算分类评估指标
log_train['train_loss'] = loss
log_train['train_accuracy'] = accuracy_score(labels, preds)
# 可以计算训练集上的指标
# log_train['train_precision'] = precision_score(labels, preds, average='macro')
# log_train['train_recall'] = recall_score(labels, preds, average='macro')
# log_train['train_f1-score'] = f1_score(labels, preds, average='macro')
return log_train
#定义一个函数(在整个测试集上评估)
def evaluate_testset():
'''
在整个测试集上评估,返回分类评估指标日志
'''
loss_list = []
labels_list = []
preds_list = []
with torch.no_grad():
for images, labels in test_loader: # 生成一个 batch 的数据和标注
images = images.to(device)
labels = labels.to(device)
outputs = model(images) # 输入模型,执行前向预测
# 获取整个测试集的标签类别和预测类别
_, preds = torch.max(outputs, 1) # 获得当前 batch 所有图像的预测类别
preds = preds.cpu().numpy()
loss = criterion(outputs, labels) # 由 logit,计算当前 batch 中,每个样本的平均交叉熵损失函数值
loss = loss.detach().cpu().numpy()
outputs = outputs.detach().cpu().numpy()
labels = labels.detach().cpu().numpy()
loss_list.append(loss)
labels_list.extend(labels)
preds_list.extend(preds)
log_test = {}
log_test['epoch'] = epoch
# 计算分类评估指标
log_test['test_loss'] = np.mean(loss) #测试集上的loss
log_test['test_accuracy'] = accuracy_score(labels_list, preds_list)
log_test['test_precision'] = precision_score(labels_list, preds_list, average='macro')
log_test['test_recall'] = recall_score(labels_list, preds_list, average='macro')
log_test['test_f1-score'] = f1_score(labels_list, preds_list, average='macro')
return log_test
#训练开始之前,需要记录训练过程日志
epoch = 0
batch_idx = 0
best_test_accuracy = 0
# 训练日志-训练集
df_train_log = pd.DataFrame()
log_train = {}
log_train['epoch'] = 0
log_train['batch'] = 0
images, labels = next(iter(train_loader))
log_train.update(train_one_batch(images, labels))
df_train_log = df_train_log.append(log_train, ignore_index=True)
df_train_log
# epoch batch train_loss train_accuracy
0 0 0 3.6047866 0.0625
# 训练日志-测试集 整个测试集
df_test_log = pd.DataFrame()
log_test = {}
log_test['epoch'] = 0
log_test.update(evaluate_testset())
df_test_log = df_test_log.append(log_test, ignore_index=True)
df_test_log
# epoch test_loss test_accuracy test_precision test_recall test_f1-score
0 0.0 3.275232 0.037106 0.013615 0.036673 0.015244
登陆wandb
- 安装wandb: pip install wandb
- 登录 wandb:在命令行中运行wandb login
- 按提示复制粘贴API Key至命令行中
#创建wandb可视化项目
import wandb
#project是大项目 ,name是小项目 起名是用时间来取名的
wandb.init(project='fruit30', name=time.strftime('%m%d%H%M%S'))
#进行训练
for epoch in range(1, EPOCHS+1):
print(f'Epoch {epoch}/{EPOCHS}')
## 训练阶段
model.train()
for images, labels in tqdm(train_loader): # 获得一个 batch 的数据和标注
batch_idx += 1
log_train = train_one_batch(images, labels)
df_train_log = df_train_log.append(log_train, ignore_index=True)
wandb.log(log_train)
lr_scheduler.step() #学习率更新
## 测试阶段
model.eval()
log_test = evaluate_testset()
df_test_log = df_test_log.append(log_test, ignore_index=True)
wandb.log(log_test)
# 保存最新的最佳模型文件
# 在测试集上的准确率有更好的,就会更新
if log_test['test_accuracy'] > best_test_accuracy:
# 删除旧的最佳模型文件(如有)
old_best_checkpoint_path = 'checkpoints/best-{:.3f}.pth'.format(best_test_accuracy)
if os.path.exists(old_best_checkpoint_path):
os.remove(old_best_checkpoint_path)
# 保存新的最佳模型文件
new_best_checkpoint_path = 'checkpoints/best-{:.3f}.pth'.format(log_test['test_accuracy'])
torch.save(model, new_best_checkpoint_path)
print('保存新的最佳模型', 'checkpoints/best-{:.3f}.pth'.format(best_test_accuracy))
best_test_accuracy = log_test['test_accuracy']
df_train_log.to_csv('训练日志-训练集.csv', index=False)
df_test_log.to_csv('训练日志-测试集.csv', index=False)
#在测试集上评价
# 载入最佳模型作为当前模型
model = torch.load('checkpoints/best-{:.3f}.pth'.format(best_test_accuracy))
model.eval()
print(evaluate_testset()) # 在整个测试集上做评估
#{'epoch': 30, 'test_loss': 0.23493767, 'test_accuracy': 0.8794063079777366, 'test_precision': 0.8843936613935047, 'test_recall': 0.8781298367986428, 'test_f1-score': 0.8781021317064345}
5. 可视化训练日志
#导入工具包
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
#导入训练日志表格
df_train = pd.read_csv('训练日志-训练集.csv')
df_test = pd.read_csv('训练日志-测试集.csv')
df_train
df_test
#训练集损失函数
plt.figure(figsize=(16, 8))
x = df_train['batch']
y = df_train['train_loss']
plt.plot(x, y, label='训练集')
plt.tick_params(labelsize=20)
plt.xlabel('batch', fontsize=20)
plt.ylabel('loss', fontsize=20)
plt.title('训练集损失函数', fontsize=25)
plt.savefig('图表/训练集损失函数.pdf', dpi=120, bbox_inches='tight')
plt.show()
#训练集准确率
plt.figure(figsize=(16, 8))
x = df_train['batch']
y = df_train['train_accuracy']
plt.plot(x, y, label='训练集')
plt.tick_params(labelsize=20)
plt.xlabel('batch', fontsize=20)
plt.ylabel('loss', fontsize=20)
plt.title('训练集准确率', fontsize=25)
plt.savefig('图表/训练集准确率.pdf', dpi=120, bbox_inches='tight')
plt.show()
#测试集损失函数
plt.figure(figsize=(16, 8))
x = df_test['epoch']
y = df_test['test_loss']
plt.plot(x, y, label='测试集')
plt.tick_params(labelsize=20)
plt.xlabel('epoch', fontsize=20)
plt.ylabel('loss', fontsize=20)
plt.title('测试集损失函数', fontsize=25)
plt.savefig('图表/测试集损失函数.pdf', dpi=120, bbox_inches='tight')
plt.show()
#测试集评估指标
from matplotlib import colors as mcolors
import random
random.seed(124)
colors = ['b', 'g', 'r', 'c', 'm', 'y', 'k', 'tab:blue', 'tab:orange', 'tab:green', 'tab:red', 'tab:purple', 'tab:brown', 'tab:pink', 'tab:gray', 'tab:olive', 'tab:cyan', 'black', 'indianred', 'brown', 'firebrick', 'maroon', 'darkred', 'red', 'sienna', 'chocolate', 'yellow', 'olivedrab', 'yellowgreen', 'darkolivegreen', 'forestgreen', 'limegreen', 'darkgreen', 'green', 'lime', 'seagreen', 'mediumseagreen', 'darkslategray', 'darkslategrey', 'teal', 'darkcyan', 'dodgerblue', 'navy', 'darkblue', 'mediumblue', 'blue', 'slateblue', 'darkslateblue', 'mediumslateblue', 'mediumpurple', 'rebeccapurple', 'blueviolet', 'indigo', 'darkorchid', 'darkviolet', 'mediumorchid', 'purple', 'darkmagenta', 'fuchsia', 'magenta', 'orchid', 'mediumvioletred', 'deeppink', 'hotpink']
markers = [".",",","o","v","^","<",">","1","2","3","4","8","s","p","P","*","h","H","+","x","X","D","d","|","_",0,1,2,3,4,5,6,7,8,9,10,11]
linestyle = ['--', '-.', '-']
def get_line_arg():
'''
随机产生一种绘图线型
'''
line_arg = {}
line_arg['color'] = random.choice(colors)
# line_arg['marker'] = random.choice(markers)
line_arg['linestyle'] = random.choice(linestyle)
line_arg['linewidth'] = random.randint(1, 4)
# line_arg['markersize'] = random.randint(3, 5)
return line_arg
metrics = ['test_accuracy', 'test_precision', 'test_recall', 'test_f1-score']
plt.figure(figsize=(16, 8))
x = df_test['epoch']
for y in metrics:
plt.plot(x, df_test[y], label=y, **get_line_arg())
plt.tick_params(labelsize=20)
plt.ylim([0, 1])
plt.xlabel('epoch', fontsize=20)
plt.ylabel(y, fontsize=20)
plt.title('测试集分类评估指标', fontsize=25)
plt.savefig('图表/测试集分类评估指标.pdf', dpi=120, bbox_inches='tight')
plt.legend(fontsize=20)
plt.show()
- 不能把测试集图像用于训练
- 测试集图像可能存在错标和漏标,测试集上的准确率越高,模型也不一样越好
- 三种不同的迁移学习训练配置:只微调训练模型最后一层、微调训练所有层、随机初始化模型全部权重
- 更换不同的优化器和学习率使模型收敛更快
- 训练好图像分类模型之后,需要在测试集上评估混淆矩阵、ROC曲线、PR曲线、语义特征降维可视化......