keywords: One-shot; Distance Metric Learning; 图像验证; ICML2015
最近在 kaggle 上参加了一个鲸鱼识别的比赛,通过鲸鱼尾巴辨别鲸鱼ID。共有5005个类别,大多数类仅有一张图片,于是找到了这篇 One shot Image Recognition。孪生神经网络是在人脸识别和图片验证领域常用的度量学习方法,通过卷积神经网络学习图像的特征向量表示,之后通过衡量两个输入之间的特征向量相似度,进行判定。
度量学习
- 学习一个低维嵌入空间,使得同类的物体距离更近,不同类的物体距离更远。
- 应用场景:具有大量类别的分类任务,或者每一类训练样本较少的情况。
对于人脸识别等特殊的图像识别任务,由于目标种类多且不固定,训练数据少,使得传统的图像分类网络难以胜任,例如一个公司的人脸识别系统,在旧员工离职或者新员工入职时,传统网络必须重新训练网络,而基于度量学习方法的孪生神经网络仅需要改变公司的人脸图像数据库。
孪生神经网络
孪生神经网络有两个重要组成部分,特征提取子网和距离计算子网,对应于上图的 Hidden layer 和 Distance layer,其中, Hidden layer 提取输入图像的特征表示, Distance layer 产生两个输入的特征表示的距离尺度,最后将距离作为特征,输出两张图像的匹配度。
特征提取子网
对于图像类的任务,提取子网一般可以使用 ResNet50, VGG19 等网络实现,通常载入其权重可以给 Siames Net 带来更好的泛化性能。
from keras.applications.resnet50 import ResNet50
def res50_sub(in_shape, model_path=None):
in_x = Input(shape=in_shape)
res_out = ResNet50(include_top=False, pooling='max')(in_x) # load the ResNet50
embed = Dense(256, activation='sigmoid')(res_out)
Res_vector = Model(inputs=in_x, outputs=embed)
return Res_vector
距离计算子网
距离计算子网包含两个输入,计算由两个特征表示之间的距离尺度并得到相似度。
def header_model(input_shape):
xa_inp = Input(shape=input_shape)
xb_inp = Input(shape=input_shape)
# compute the distance between input a and b
x = Lambda(lambda x : K.square(x[0] - x[1]))([xa_inp, xb_inp])
x = Dense(1, use_bias=True, activation='sigmoid')(x) # output the similarity between a and b
header = Model([xa_inp, xb_inp], x)
return header
Siamese Net
结合特征计算子网与距离计算子网,得到两张输入图像的相似度。
def siamese_net(sub_net, header, input_shape, lr):
image_a = Input(shape=input_shape)
image_b = Input(shape=input_shape)
xa_inp = sub_net(image_a)
xb_inp = sub_net(image_b)
x = header([xa_inp, xb_inp])
model = Model([image_a, image_b], x)
model.compile(optimizer=optimizers.Adam(lr=lr), loss='binary_crossentropy', metrics=['binary_crossentropy', 'acc'])
return model
数据生成器
孪生神经网络将图像识别问题转化为二分类的图像匹配问题。对于 N 个图像的数据集来说,具有N*(N-1)个图像对,遍历输入所有图像对浪费计算资源,而且一直解决易于分类的图像对不利于模型的收敛。
因此采用特殊的训练策略,使用lapjv包根据图像之间的相似度矩阵score,筛选难分类的图像对作为模型输入。
import keras
from keras.preprocessing import image
from keras.utils import np_utils, Sequence
import numpy as np
from tqdm import tqdm
import random, os
from lapjv import lapjv
from keras import backend as K
class Train_generator(Sequence):
def __init__(self, score, id_group, image_list, img2ind, image_dim, data_folder, image_loader,
steps=1000, batch_size=32):
"""
@param score the cost matrix for the picture matching
@param steps the number of epoch we are planning with this score matrix
"""
super(Train_generator, self).__init__()
self.score = -score
# Maximizing the score is the same as minimuzing -score.
# for i in range(5):
# self.score[i::5, :] = -self.score[i::5, :]
self.steps = steps
self.batch_size = batch_size
self.image_dim = image_dim
self.id_group = id_group # group up the images by class id
self.base_path = data_folder
self.image_loader = image_loader
self.image_list = image_list
self.img2ind = img2ind # map the image name to score index
for ts in self.id_group.values():
idxs = [self.img2ind[t] for t in ts]
for i in idxs:
for j in idxs:
self.score[i, j] = 10000.0 # Set a large value for matching images -- eliminates this potential pairing
self.on_epoch_end()
def read_image(self, image_name):
path = os.path.join(self.base_path, image_name)
return self.image_loader(path, self.image_dim)
def __getitem__(self, index):
start = self.batch_size * index
end = min(start + self.batch_size, len(self.match) + len(self.unmatch))
size = end - start
assert size > 0
a = np.zeros((size,) + self.image_dim, dtype=K.floatx())
b = np.zeros((size,) + self.image_dim, dtype=K.floatx())
c = np.zeros((size, 1), dtype=K.floatx())
j = start // 2
for i in range(0, size, 2):
a[i, :, :, :] = self.read_image(self.match[j][0])
b[i, :, :, :] = self.read_image(self.match[j][1])
c[i, 0] = 1 # This is a match
a[i + 1, :, :, :] = self.read_image(self.unmatch[j][0])
b[i + 1, :, :, :] = self.read_image(self.unmatch[j][1])
c[i + 1, 0] = 0 # Different whales
j += 1
return [a, b], c
def on_epoch_end(self):
# print('start pairing')
if self.steps <= 0: return # Skip this on the last epoch.
self.steps -= 1
self.match = []
self.unmatch = []
st_time = time.time()
x, _, _ = lapjv(self.score) # Solve the linear assignment problem
seconds = time.time() - st_time
minute = (seconds / 60) % 60
hour = seconds / 3600
second = seconds % 60
print('lapjv duration: %2d:%2d:%2d' % (hour, minute, second))
y = np.arange(len(x), dtype=np.int32)
# Compute a derangement for matching whales
for ts in self.id_group.values():
d = ts.copy()
while True:
random.shuffle(d)
if not np.any(ts == d): break
for a, b in zip(ts, d): self.match.append((a, b))
# Construct unmatched image pairs from the LAP solution.
for i, j in zip(x, y):
if i == j:
print(self.score)
print(x)
print(y)
print(i, j)
assert i != j
self.unmatch.append((self.image_list[i], self.image_list[j]))
# Force a different choice for next epoch.
self.score[x, y] = 10000.0
self.score[y, x] = 10000.0
random.shuffle(self.match)
random.shuffle(self.unmatch)
def __len__(self):
return (len(self.match) + len(self.unmatch) + self.batch_size - 1) // self.batch_size
模型训练
定义 train step, 执行一次训练步骤。
def train_step(model, scores, ampl, step, train_df, image_dim, lr):
# scores = scores
global steps
train_folder = 'input/crop_train'
id_group, image_list, img2ind = get_whale_dict(train_df)
print('get train generator')
print('%s', datetime.datetime.now())
train_gene = Train_generator(scores + ampl * np.random.random_sample(size=scores.shape),
id_group, image_list, img2ind, image_dim, train_folder, image_loader, steps=step, batch_size=16)
print('start training')
print('%s', datetime.datetime.now())
set_learningrate(model, lr)
model.fit_generator(train_gene, initial_epoch=steps, epochs=steps+step, max_queue_size=12, workers=6, verbose=1)
steps += step
随机产生相似度矩阵Score,开始第一次训练,在模型具有一定识别能力后,基于模型预测结果产生相似度矩阵 Score,并降低 learning rate,进一步提高模型性能。代码已经占用了太多篇幅,计算相似度使用的两个输入生成器就不提供了……
def train_siamese(res_sub_net, header, image_dim, batch_size, lr=64e-5, epochs=16, model_path=None, GPU_num=1):
model = siamese_net(res_sub_net, header, image_dim, lr)
if model_path:
print('load subnet and header from %s' % model_path)
res_sub_net.load_weights('%s/siamese_res50.h5' % model_path)
header.load_weights('%s/siamese_header.h5' % model_path)
return model
if GPU_num>1:
model = multi_gpu_model(model, gpus=GPU_num)
model.compile(optimizer=optimizers.Adam(lr=lr), loss='binary_crossentropy', metrics=['binary_crossentropy', 'acc'])
train = pd.read_csv('input/aug_train.csv')
train_num = train.values.shape[0]
# train model with random score
ampl = 100.0
scores = np.random.random_sample(size=(train_num, train_num))
train_step(model, scores, ampl, 16, train, image_dim, 64e-5)
res_sub_net.save('model/siamese/stage1/siamese_res50-1.h5')
header.save('model/siamese/stage1/siamese_header-1.h5')
# train model with computed score
ampl = max(1.0, 100 ** -0.1 * ampl) # random distort
scores = compute_score(res_sub_net, header, train, image_dim) # compute score of train image pairs
train_step(model, scores, ampl, 16, train, image_dim, 16e-5)
res_sub_net.save('model/siamese/stage2/siamese_res50-2.h5')
header.save('model/siamese/stage2/siamese_header-2.h5')
ampl = max(1.0, 100 ** -0.1 * ampl)
scores = compute_score(res_sub_net, header, train, image_dim)
train_step(model, scores, ampl, 16, train, image_dim, 1e-5)
res_sub_net.save('model/siamese/stage3/siamese_res50-3.h5')
header.save('model/siamese/stage3/siamese_header-3.h5')
return model
def compute_score(subnet, header, train_df, image_dim):
train_folder = 'input/crop_train'
params = {
'batch_size': 16,
'image_loader': image_loader,
'dim': image_dim,
'shuffle': False
}
img_gene = ImageGenerator(train_df.Image.values.tolist(), train_folder, **params)
# get the feature of all image in train data set
known = subnet.predict_generator(img_gene, max_queue_size=20, workers=10, verbose=0)
# compute the pair similarity of all features
scores = header.predict_generator(FeaturePairGenerator(known), max_queue_size=20, workers=10, verbose=0)
predict_res = np.zeros((known.shape[0], known.shape[0]), dtype=K.floatx())
predict_res[np.triu_indices(known.shape[0], 1)] = scores.squeeze()
predict_res += predict_res.transpose()
return predict_res
模型预测
不同于传统的图像分类任务,在实际应用中,孪生网络用于预测主要有两种方式。
- 与训练集依次匹配
对于新的输入,遍历整个训练集,相似度最高的图像类别就是最终的类别输出。
- 结合机器学习方法
将特征提取子网的输出作为特征,结合传统机器学习算法(KNN,SVM或Softmax)训练一个分类模型。