数据集下载地址
AMD-Training400.zip
https://ai.baidu.com/broad/introduction
引入包
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns
from PIL import Image, ImageDraw
import numpy as np
import pandas as pd
import os
import copy
from sklearn.model_selection import ShuffleSplit
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, Subset
import torchvision
import torchvision.transforms as transforms
from torchvision import utils
import torch.nn.functional as F
from torch import optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torchsummary import summary
# CPU or GPU
device = 'cuda' if torch.cuda.is_available() else 'cpu'
# dataloader里的多进程用到num_workers
workers = 0 if os.name=='nt' else 4
数据初探
查看标签数据情况
# 数据地址
data_path = './data/sod/'
labels_csv_path = os.path.join(data_path, 'Training400', 'Fovea_location.xlsx')
# 读取数据
labels_df = pd.read_excel(labels_csv_path, index_col='ID')
labels_df.head()
# 位置信息
AorN=[imn[0] for imn in labels_df.imgName]
sns.scatterplot(x=labels_df['Fovea_X'], y=labels_df['Fovea_Y'],hue=AorN)
查看数据
# 查看图片数据
np.random.seed(2019)
plt.rcParams['figure.figsize'] = (15, 9)
plt.subplots_adjust(wspace=0, hspace=0.3)
nrows, ncols = 2, 3
# 取得图片名字
img_name = labels_df["imgName"]
# 图片id值
ids = labels_df.index
# 随机选择一些图片 nr*nc
rnd_ids = np.random.choice(ids,nrows*ncols)
print(rnd_ids)
# [ 73 371 160 294 217 191]
def load_img_label(labels_df, id_):
img_name = labels_df["imgName"]
if img_name[id_][0]=="A":
prefix="AMD"
else:
prefix="Non-AMD"
img_full_path = os.path.join(data_path,"Training400", prefix, img_name[id_])
img = Image.open(img_full_path)
# 中心位置值
x = labels_df["Fovea_X"][id_]
y = labels_df["Fovea_Y"][id_]
label = (x, y)
return img,label
def show_img_label(img,label,w_h=(50,50),thickness=2):
w, h = w_h
cx,cy = label
# 画矩形框
draw = ImageDraw.Draw(img)
draw.rectangle(((cx-w/2, cy-h/2), (cx+w/2, cy+h/2)),outline="green",width=thickness)
plt.imshow(np.asarray(img))
for i,id_ in enumerate(rnd_ids):
img,label = load_img_label(labels_df,id_)
print(img.size,label)
plt.subplot(nrows, ncols, i+1)
show_img_label(img,label,w_h = (150,150),thickness=20)
plt.title(img_name[id_])
"""
(2124, 2056) (1037.89889229694, 1115.71768088143)
(1444, 1444) (635.148992978281, 744.648850248249)
(1444, 1444) (639.360312038611, 814.762764100936)
(2124, 2056) (1122.08407442503, 1067.58829793991)
(2124, 2056) (1092.93333646222, 1055.15333296773)
(2124, 2056) (1112.50135915347, 1070.7251775623)
"""
查看图片尺寸情况
# 查看图片长宽分布
h_list,w_list=[],[]
for id_ in ids:
if img_name[id_][0]=="A":
prefix="AMD"
else:
prefix="Non-AMD"
fullPath2img = os.path.join(data_path,"Training400",prefix, img_name[id_])
# load image
img = Image.open(fullPath2img)
h,w = img.size
h_list.append(h)
w_list.append(w)
sns.distplot(a=h_list, kde=False)
# The plots of distributions reveal that the majority of heights and width are in the range of 1900 to 2100.
自定义一些数据增强函数
也可调用其他数据增强包
- Augmenter
- imgaug
- Albumentations
……
# 定义数据转换
# 数据增强 data augmentation
import torchvision.transforms.functional as TF
# 调整尺寸大小
def resize_img_label(image,label=(0.,0.), target_size=(256,256)):
w_orig, h_orig = image.size
w_target, h_target = target_size
cx, cy = label
# resize image and label
image_new = TF.resize(image,target_size)
label_new= cx/w_orig*w_target, cy/h_orig*h_target
return image_new,label_new
# 随机水平翻转
def random_hflip(image,label):
w, h = image.size
x, y = label
image = TF.hflip(image)
label = w-x, y
return image,label
# 随机垂直翻转
def random_vflip(image,label):
w, h = image.size
x, y = label
image = TF.vflip(image)
label = x, h-y
return image, label
np.random.seed(1)
# 随机移动(偏移)
def random_shift(image, label, max_translate=(0.2,0.2)):
w, h = image.size
max_t_w, max_t_h = max_translate
cx, cy = label
# translate coeficinet, random [-1,1]
trans_coef = np.random.rand()*2-1
w_t = int(trans_coef*max_t_w*w)
h_t = int(trans_coef*max_t_h*h)
image = TF.affine(image,translate=(w_t, h_t),shear=0,angle=0,scale=1)
label = cx+w_t, cy+h_t
return image,label
# 标签比例缩放
def scale_label(a,b):
div = [ai/bi for ai,bi in zip(a,b)]
return div
# 重新调节回原图片比例大小
def rescale_label(a,b):
div = [ai*bi for ai,bi in zip(a,b)]
return div
# 亮度调整,直接调用,不改变大小标签等
img_t = TF.adjust_contrast(img_r, contrast_factor=0.4)
# gamma值改变,同样不改变大小与标签值
img_t = TF.adjust_gamma(img_r, gamma=1.4)
数据增强几个样例(有的增强,label需要相关操作)
- 大小
img, label=load_img_label(labels_df,1)
print(img.size,label)
img_r,label_r=resize_img_label(img,label)
print(img_r.size,label_r)
plt.subplot(1,2,1)
show_img_label(img,label,w_h=(150,150),thickness=20)
plt.subplot(1,2,2)
show_img_label(img_r,label_r)
- 随机移动 random_shift
img, label=load_img_label(labels_df,1)
# 大小
img_r,label_r=resize_img_label(img,label)
# 水平翻转
img_t,label_t=random_shift(img_r,label_r,max_translate=(.5,.5))
plt.subplot(1,2,1)
show_img_label(img_r,label_r)
plt.subplot(1,2,2)
show_img_label(img_t,label_t)
- 垂直翻转 random_vflip
img, label=load_img_label(labels_df,7)
# 大小调整
img_r,label_r=resize_img_label(img,label)
# 垂直翻转
img_fv,label_fv=random_vflip(img_r,label_r)
plt.subplot(1,2,1)
show_img_label(img_r,label_r)
plt.subplot(1,2,2)
show_img_label(img_fv,label_fv)
创建Dataset,DataLoader
# 定义transformer
def normal_transformer(image, label, params):
image,label=resize_img_label(image,label,params["target_size"])
if random.random() < params["p_hflip"]:
image,label=random_hflip(image,label)
if random.random() < params["p_vflip"]:
image,label=random_vflip(image,label)
if random.random() < params["p_shift"]:
image,label=random_shift(image,label, params["max_translate"])
if random.random() < params["p_brightness"]:
brightness_factor=1+(np.random.rand()*2-1)*params["brightness_factor"]
image=TF.adjust_brightness(image,brightness_factor)
if random.random() < params["p_contrast"]:
contrast_factor=1+(np.random.rand()*2-1)*params["contrast_factor"]
image=TF.adjust_contrast(image,contrast_factor)
if random.random() < params["p_gamma"]:
gamma=1+(np.random.rand()*2-1)*params["gamma"]
image=TF.adjust_gamma(image,gamma)
if params["scale_label"]:
label=scale_label(label,params["target_size"])
image=TF.to_tensor(image)
return image, label
# 定义dataset
class AMDDataset(Dataset):
def __init__(self, data_path, transform, trans_params):
# 标签文件地址
labels_csv_path = os.path.join(data_path, "Training400", "Fovea_location.xlsx")
# 读取并解析标签文件
labels_df = pd.read_excel(labels_csv_path, index_col="ID")
self.labels = labels_df[["Fovea_X", "Fovea_Y"]].values
# 解析图片名
self.img_name = labels_df["imgName"]
self.ids = labels_df.index
self.full_img_path = [0]*len(self.ids)
for id_ in self.ids:
if self.img_name[id_][0]=="A":
prefix="AMD"
else:
prefix="Non-AMD"
self.full_img_path[id_-1] = os.path.join(data_path, "Training400", prefix,self.img_name[id_])
self.transform = transform
self.trans_params = trans_params
def __len__(self):
return len(self.labels)
def __getitem__(self, idx):
# load PIL image
image = Image.open(self.full_img_path[idx])
label = self.labels[idx]
# transform to tensor
image, label = self.transform(image, label, self.trans_params)
return image, label
# 设置训练 验证转换参数 trans_params_train trans_params_val
trans_params_train = {
"target_size" : (256, 256),
"p_hflip" : 0.5,
"p_vflip" : 0.5,
"p_shift" : 0.5,
"max_translate": (0.2, 0.2),
"p_brightness": 0.5,
"brightness_factor": 0.2,
"p_contrast": 0.5,
"contrast_factor": 0.2,
"p_gamma": 0.5,
"gamma": 0.2,
"scale_label": True,
}
trans_params_val = {
"target_size" : (256, 256),
"p_hflip" : 0.0,
"p_vflip" : 0.0,
"p_shift" : 0.0,
"p_brightness": 0.0,
"p_contrast": 0.0,
"p_gamma": 0.0,
"gamma": 0.0,
"scale_label": True,
}
train_ds = AMDDataset(data_path, transformer, trans_params_train)
val_ds = AMDDataset(data_path, transformer, trans_params_val)
# 切分数据为训练测试集
sss = ShuffleSplit(n_splits=1, test_size=0.2, random_state=0)
indices = range(len(train_ds))
for train_index, val_index in sss.split(indices):
train_ds = Subset(train_ds, train_index)
print(len(train_ds))
val_ds = Subset(val_ds, val_index)
print(len(val_ds))
查看下经过转换后的图片
# 查看一下处理后的图片
def show(img,label=None):
npimg = img.numpy().transpose((1,2,0))
plt.imshow(npimg)
if label is not None:
label=rescale_label(label,img.shape[1:])
x,y=label
plt.plot(x,y,'b+',markersize=20)
plt.figure(figsize=(5,5))
for img,label in train_ds:
show(img,label)
break
定义dataloader
因为标签值返回的是list结构,所以在后续我们需要将其转为tensor方式
# 定义dataloader
train_dl = DataLoader(
train_ds,
batch_size=8,
shuffle=True
)
val_dl = DataLoader(
val_ds,
batch_size=16,
shuffle=False
)
"""
for img_b, label_b in train_dl:
print(img_b.shape,img_b.dtype)
print(label_b)
break
###################
torch.Size([8, 3, 256, 256]) torch.float32
[tensor([0.4825, 0.4530, 0.6596, 0.5515, 0.5801, 0.5192, 0.4439, 0.5710],
dtype=torch.float64), tensor([0.5454, 0.4841, 0.6527, 0.5510, 0.5205, 0.5636, 0.4656, 0.7672],
dtype=torch.float64)]
##################
"""
创建模型
# 构建模型
class Net(nn.Module):
def __init__(self, params):
super(Net, self).__init__()
C_in, H_in, W_in = params["input_shape"]
init_f = params["initial_filters"]
num_outputs = params["num_outputs"]
self.conv1 = nn.Conv2d(C_in, init_f, kernel_size=3,stride=2,padding=1)
self.conv2 = nn.Conv2d(init_f+C_in, 2*init_f, kernel_size=3,stride=1,padding=1)
self.conv3 = nn.Conv2d(3*init_f+C_in, 4*init_f, kernel_size=3,padding=1)
self.conv4 = nn.Conv2d(7*init_f+C_in, 8*init_f, kernel_size=3,padding=1)
self.conv5 = nn.Conv2d(15*init_f+C_in, 16*init_f, kernel_size=3,padding=1)
self.fc1 = nn.Linear(16*init_f, num_outputs)
def forward(self, x):
identity = F.avg_pool2d(x,4,4)
x = F.relu(self.conv1(x))
x = F.max_pool2d(x, 2, 2)
# When concatenating two tensors,
# they must have the same shape except in the concatenating dimension.
x = torch.cat((x, identity), dim=1)
identity = F.avg_pool2d(x,2,2)
x = F.relu(self.conv2(x))
x = F.max_pool2d(x, 2, 2)
x = torch.cat((x, identity), dim=1)
identity = F.avg_pool2d(x,2,2)
x = F.relu(self.conv3(x))
x = F.max_pool2d(x, 2, 2)
x = torch.cat((x, identity), dim=1)
identity = F.avg_pool2d(x,2,2)
x = F.relu(self.conv4(x))
x = F.max_pool2d(x, 2, 2)
x = torch.cat((x, identity), dim=1)
x = F.relu(self.conv5(x))
x = F.adaptive_avg_pool2d(x,1)
x = x.reshape(x.size(0), -1)
x = self.fc1(x)
return x
params_model={
"input_shape": (3,256,256),
"initial_filters": 16,
"num_outputs": 2,
}
model = Net(params_model).to(device)
print(model)
"""
Net(
(conv1): Conv2d(3, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
(conv2): Conv2d(19, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(conv3): Conv2d(51, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(conv4): Conv2d(115, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(conv5): Conv2d(243, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(fc1): Linear(in_features=256, out_features=2, bias=True)
)
"""
一些中间函数
# 获取学习率方法
def get_lr(opt):
for param_group in opt.param_groups:
return param_group['lr']
def cxcy_to_bbox(cxcy, w=50./256, h=50./256):
# define two new tensors for w and h
w_tensor = torch.ones(cxcy.shape[0], 1, device=cxcy.device)*w
h_tensor = torch.ones(cxcy.shape[0], 1, device=cxcy.device)*h
# extract cx and cy
cx = cxcy[:,0].unsqueeze(1)
cy = cxcy[:,1].unsqueeze(1)
# concat cx,cy,w and h
boxes = torch.cat((cx,cy, w_tensor, h_tensor), -1) # cx,cy,w,h
return torch.cat((boxes[:, :2] - boxes[:, 2:]/2, # xmin, ymin
boxes[:, :2] + boxes[:, 2:]/2), 1) # xmax, ymax
def metrics_batch(output, target):
output = cxcy_to_bbox(output)
target = cxcy_to_bbox(target)
iou = torchvision.ops.box_iou(output, target)
return torch.diagonal(iou, 0).sum().item()
def loss_batch(loss_func, output, target, opt=None):
# get loss
loss = loss_func(output, target)
# get performance metric
metric_b = metrics_batch(output,target)
if opt is not None:
opt.zero_grad()
loss.backward()
opt.step()
return loss.item(), metric_b
def loss_epoch(model,loss_func,dataset_dl,sanity_check=False,opt=None):
running_loss = 0.0
running_metric = 0.0
len_data = len(dataset_dl.dataset)
for xb, yb in dataset_dl:
# list转为tensor
yb = torch.stack(yb,1)
yb = yb.type(torch.float32).to(device)
# 模型计算结果
output = model(xb.to(device))
# 每批次损失值
loss_b, metric_b = loss_batch(loss_func, output, yb, opt)
# 更新损失值
running_loss += loss_b
# 更新正确值
if metric_b is not None:
running_metric += metric_b
# 损失值平均
loss = running_loss / float(len_data)
# 正确值平均
metric = running_metric / float(len_data)
return loss, metric
训练验证模型主函数
# 训练验证主函数
def train_val(model, params):
# 提取各参数
num_epochs = params["num_epochs"]
loss_func = params["loss_func"]
opt = params["optimizer"]
train_dl = params["train_dl"]
val_dl = params["val_dl"]
sanity_check = params["sanity_check"]
lr_scheduler = params["lr_scheduler"]
path2weights = params["path2weights"]
# 存储过程中损失值
loss_history = {
"train": [],
"val": [],
}
# 存储过程中正确值
metric_history = {
"train": [],
"val": [],
}
# 存储中间较好的参数
best_model_wts = copy.deepcopy(model.state_dict())
# 初始化
best_loss = float('inf')
for epoch in range(num_epochs):
# 取得学习率
current_lr = get_lr(opt)
print('Epoch {}/{}, current lr={}'.format(epoch, num_epochs - 1, current_lr))
# 训练模型
model.train()
train_loss, train_metric = loss_epoch(model,loss_func,train_dl,sanity_check,opt)
# collect loss and metric for training dataset
loss_history["train"].append(train_loss)
metric_history["train"].append(train_metric)
# evaluate the model
model.eval()
with torch.no_grad():
val_loss, val_metric = loss_epoch(model,loss_func,val_dl,sanity_check)
# collect loss and metric for validation dataset
loss_history["val"].append(val_loss)
metric_history["val"].append(val_metric)
# store best model
if val_loss < best_loss:
best_loss = val_loss
best_model_wts = copy.deepcopy(model.state_dict())
# store weights into a local file
torch.save(model.state_dict(), path2weights)
print("Copied best model weights!")
# learning rate schedule
lr_scheduler.step(val_loss)
if current_lr != get_lr(opt):
print("Loading best model weights!")
model.load_state_dict(best_model_wts)
print("train loss: %.6f, accuracy: %.2f" %(train_loss,100*train_metric))
print("val loss: %.6f, accuracy: %.2f" %(val_loss,100*val_metric))
print("-"*10)
# load best model weights
model.load_state_dict(best_model_wts)
return model, loss_history, metric_history
模型训练
loss_func = nn.SmoothL1Loss(reduction="sum")
opt = optim.Adam(model.parameters(), lr=1e-4)
lr_scheduler = ReduceLROnPlateau(opt, mode='min',factor=0.5, patience=20,verbose=1)
path2models = "./models/sod/"
if not os.path.exists(path2models):
os.mkdir(path2models)
params_train = {
"num_epochs": 10,
"optimizer": opt,
"loss_func": loss_func,
"train_dl": train_dl,
"val_dl": val_dl,
"sanity_check": False,
"lr_scheduler": lr_scheduler,
"path2weights": path2models+"weights_smoothl1.pt",
}
# train and validate the model
model, loss_hist, metric_hist = train_val(model,params_train)
"""
Epoch 0/9, current lr=0.0001
Copied best model weights!
train loss: 0.014286, accuracy: 27.88
val loss: 0.011519, accuracy: 50.27
----------
Epoch 1/9, current lr=0.0001
Copied best model weights!
train loss: 0.010053, accuracy: 36.32
val loss: 0.009709, accuracy: 54.58
----------
Epoch 2/9, current lr=0.0001
Copied best model weights!
train loss: 0.008984, accuracy: 37.51
val loss: 0.009206, accuracy: 59.38
----------
Epoch 3/9, current lr=0.0001
train loss: 0.009700, accuracy: 36.43
val loss: 0.009328, accuracy: 59.98
----------
Epoch 4/9, current lr=0.0001
train loss: 0.008283, accuracy: 37.85
val loss: 0.010192, accuracy: 50.60
----------
Epoch 5/9, current lr=0.0001
train loss: 0.007235, accuracy: 42.44
val loss: 0.009638, accuracy: 48.86
----------
Epoch 6/9, current lr=0.0001
train loss: 0.005808, accuracy: 44.42
val loss: 0.010148, accuracy: 57.19
----------
Epoch 7/9, current lr=0.0001
train loss: 0.006283, accuracy: 44.23
val loss: 0.010079, accuracy: 40.51
----------
Epoch 8/9, current lr=0.0001
train loss: 0.005408, accuracy: 47.86
val loss: 0.011009, accuracy: 34.36
----------
Epoch 9/9, current lr=0.0001
train loss: 0.005920, accuracy: 43.82
val loss: 0.011682, accuracy: 30.28
----------
"""
结果数据可视化
# 画出损失值与正确率
def show_loss_acc(num_epochs, loss_hist, metric_hist):
# 损失值
plt.title("Train-Val Loss")
plt.plot(range(1,num_epochs+1),loss_hist["train"],label="train")
plt.plot(range(1,num_epochs+1),loss_hist["val"],label="val")
plt.ylabel("Loss")
plt.xlabel("Training Epochs")
plt.legend()
plt.show()
# 准确率
plt.title("Train-Val Accuracy")
plt.plot(range(1,num_epochs+1),metric_hist["train"],label="train")
plt.plot(range(1,num_epochs+1),metric_hist["val"],label="val")
plt.ylabel("Accuracy")
plt.xlabel("Training Epochs")
plt.legend()
plt.show()
show_loss_acc(params_train['num_epochs'], loss_hist, metric_hist)
释放资源
# 可以看到cuda显存的信息
print(torch.cuda.memory_summary())
# 释放GPU内
if model:
del model
torch.cuda.empty_cache()