因为DeepSORT模型内容比较复杂,涉及到背景只是也是非常复杂,最近也是再研究目标跟踪,这里通过写简书的形式进行知识点梳理也方便自己的回忆。
首先贴出论文地址以及对应的代码:
论文:SIMPLE ONLINE AND REALTIME TRACKING WITH A DEEP ASSOCIATION METRIC
代码:Yolov5_DeepSort_Pytorch
一、 背景知识
1. 1 卡尔曼滤波(Kalman Filter)
这里我们可以参考这篇文章【MOT】详解SORT与卡尔曼滤波算法, 这篇文章讲解相对较易理
解, 其对应的原版英文地址 How a Kalman filter works, in pictures。如果想具体了解卡尔曼滤波推导公式可以参考卡尔曼滤波详解。
卡尔曼滤波算法分为两个过程分别是预测与更新。
这里关于预测涉及两个方面即卡尔曼滤波的结果以及传感器的探测结果的过程结合。因为两者之间都有一定的误差但是结合两者的结论进行分析,结果会有一定的提升。卡尔曼滤波可以基于目标前一时刻的位置,来预测当前时刻的位置,并且可以比传感器(在目标跟踪中即目标检测器,比如Yolo等)更准确的估计目标的位置。
如上图所示,假设我们要跟踪小车的位置变化,蓝色的分布表示的是卡尔曼滤波的预测值,棕色的分布表示的是传感器的测量值,灰色的部分就是预测值基于测量值更新后的最优估计值。
根据上一个状态(如在该情境下状态表示的是目标的位置及目标的速度)预测现在的状态(假设物体运动的状态按照匀速行驶),我们即可的到现在预测的状态。
【1】均值(状态)
【2】协方差矩阵
因为状态元素之间的因素是相互影响的,举一个很简单的🌰,速度越快我们得到的位移也会越大。因此我们可以用协方差矩阵来表现两者元素之间的影响关系。
【3】传感器误差
当然我们需要考虑到引入传感器的带来的误差,该误差的范围可以用协方差表示。
1.2 SORT中的卡尔曼滤波
在目标跟踪的过程中我们主要估计track的两个重要的状态:均值, 方差。
均值:主要表示目标的位置信息,由bbox的中心坐标,宽高比, 高, 以及各自的速度变化值组成,即, 各个速度初始化为0。
协方差:表示目标位置信息的不确定性,由上述的8个因素对应组成的8*8的对角矩阵,矩阵中数字越大则表明不确定性越大,可以任意初始化。
这里关于卡尔曼滤波可以分为6个步骤:
【1】类初始化 init
【2】初始化状态(mean)与状态协方差(convariance)的函数的initiate
【3】预测阶段函数predict
【4】分布转换函数project
【5】更新阶段函数update
【6】计算状态分布和测量(检测框)之间距离函数gating_distance。
1.2.1 类初始化
class KalmanFilter(object):
"""
A simple Kalman filter for tracking bounding boxes in image space.
The 8-dimensional state space
x, y, a, h, vx, vy, va, vh
contains the bounding box center position (x, y), aspect ratio a, height h,
and their respective velocities.
Object motion follows a constant velocity model. The bounding box location
(x, y, a, h) is taken as direct observation of the state space (linear
observation model).
"""
def __init__(self):
ndim, dt = 4, 1.
# Create Kalman filter model matrices.
# 卡尔曼滤波的转移矩阵
self._motion_mat = np.eye(2 * ndim, 2 * ndim)
for i in range(ndim):
self._motion_mat[i, ndim + i] = dt
# 卡尔曼滤波投射矩阵
self._update_mat = np.eye(ndim, 2 * ndim)
# Motion and observation uncertainty are chosen relative to the current
# state estimate. These weights control the amount of uncertainty in
# the model. This is a bit hacky.
self._std_weight_position = 1. / 20
self._std_weight_velocity = 1. / 160
self._motion_mat
:
self._update_mat
:1.2.2 初始化参数
def _initiate_track(self, detection):
mean, covariance = self.kf.initiate(detection.to_xyah())
def initiate(self, measurement):
"""Create track from unassociated measurement.
Parameters
----------
measurement : ndarray
Bounding box coordinates (x, y, a, h) with center position (x, y),
aspect ratio a, and height h.
Returns
-------
(ndarray, ndarray)
Returns the mean vector (8 dimensional) and covariance matrix (8x8
dimensional) of the new track. Unobserved velocities are initialized
to 0 mean.
"""
mean_pos = measurement
mean_vel = np.zeros_like(mean_pos)
mean = np.r_[mean_pos, mean_vel]
std = [
2 * self._std_weight_position * measurement[0], # the center point x
2 * self._std_weight_position * measurement[1], # the center point y
1 * measurement[2], # the ratio of width/height
2 * self._std_weight_position * measurement[3], # the height
10 * self._std_weight_velocity * measurement[0],
10 * self._std_weight_velocity * measurement[1],
0.1 * measurement[2],
10 * self._std_weight_velocity * measurement[3]]
covariance = np.diag(np.square(std))
return mean, covariance
mean
: eg:
covariance
: eg 1.2.3 预测
⭐️ (这里可以理解为加速度)。
🌲预测公式:
(这里的噪声矩阵,代表整个系统的可靠程度,一般初始化很小的值)
我们这里的称作为状态转移矩阵,该公式预测t时刻的均值。
def predict(self, mean, covariance):
"""Run Kalman filter prediction step.
Parameters
----------
mean : ndarray
The 8 dimensional mean vector of the object state at the previous
time step.
covariance : ndarray
The 8x8 dimensional covariance matrix of the object state at the
previous time step.
Returns
-------
(ndarray, ndarray)
Returns the mean vector and covariance matrix of the predicted
state. Unobserved velocities are initialized to 0 mean.
"""
std_pos = [
self._std_weight_position * mean[0],
self._std_weight_position * mean[1],
1 * mean[2],
self._std_weight_position * mean[3]]
std_vel = [
self._std_weight_velocity * mean[0],
self._std_weight_velocity * mean[1],
0.1 * mean[2],
self._std_weight_velocity * mean[3]]
motion_cov = np.diag(np.square(np.r_[std_pos, std_vel]))
mean = np.dot(self._motion_mat, mean)
covariance = np.linalg.multi_dot((
self._motion_mat, covariance, self._motion_mat.T)) + motion_cov
return mean, covariance
motion_conv
代表的是 ➡ motion_cov = np.diag(np.square(np.r_[std_pos, std_vel]))
根据公式:
mean
➡ mean = np.dot(self._motion_mat, mean)
, eg:根据公式:
covariance
➡ covariance = np.linalg.multi_dot((self._motion_mat, covariance, self._motion_mat.T)) + motion_cov
, eg:
1.2.4 投射(分布矩阵)
跟踪的空间向量与传感器向量空间不一致,需要借助投射矩阵将其转换在同一个向量空间。例如在我们跟踪的向量空间不仅包含目标的位置也包含其速度变化,但是在传感器向量空间是没有的(即目标检测仅仅输出目标的位置,没有输出目标运动的速度信息),需要进行转换将速度信息忽略。
根据如下公式:
代码如下:
def project(self, mean, covariance):
"""Project state distribution to measurement space.
Parameters
----------
mean : ndarray
The state's mean vector (8 dimensional array).
covariance : ndarray
The state's covariance matrix (8x8 dimensional).
Returns
-------
(ndarray, ndarray)
Returns the projected mean and covariance matrix of the given state
estimate.
"""
std = [
self._std_weight_position * mean[0],
self._std_weight_position * mean[1],
0.1 * mean[2],
self._std_weight_position * mean[3]]
innovation_cov = np.diag(np.square(std))
mean = np.dot(self._update_mat, mean) # 将均值向量映射到检测空间,由于测量矩阵H是一个4X8的单位矩阵,所以就相当于提取前4个位置向量[cx, cy, r, h] 8 位变4位
covariance = np.linalg.multi_dot((
self._update_mat, covariance, self._update_mat.T)) # 8*8变成4*4
return mean, covariance + innovation_cov
我们再回忆下投射矩阵
self._update_mat
:
就很容易理解如何忽略速度信息。
➡
mean = np.dot(self._update_mat, mean)
➡
covariance = np.linalg.multi_dot(( self._update_mat, covariance, self._update_mat.T)) # 8*8变成4*4
这里需要强调一点即一开始强调的传感器噪声
➡covariance + innovation_cov
1.2.5 更新
首先贴出更新公式:
这里的指的是卡尔曼增益,读数的均值分布, 。
代码如下:
def update(self, mean, covariance, measurement):
"""Run Kalman filter correction step.
Parameters
----------
mean : ndarray
The predicted state's mean vector (8 dimensional).
covariance : ndarray
The state's covariance matrix (8x8 dimensional).
measurement : ndarray
The 4 dimensional measurement vector (x, y, a, h), where (x, y)
is the center position, a the aspect ratio, and h the height of the
bounding box.
Returns
-------
(ndarray, ndarray)
Returns the measurement-corrected state distribution.
"""
projected_mean, projected_cov = self.project(mean, covariance)
chol_factor, lower = scipy.linalg.cho_factor(
projected_cov, lower=True, check_finite=False)
kalman_gain = scipy.linalg.cho_solve(
(chol_factor, lower), np.dot(covariance, self._update_mat.T).T,
check_finite=False).T
innovation = measurement - projected_mean # measurement(观测值) 只有坐标没有速度, 所以projected_mean 做映射只选取位置信息去掉速度信息
new_mean = mean + np.dot(innovation, kalman_gain.T)
new_covariance = covariance - np.linalg.multi_dot((
kalman_gain, projected_cov, kalman_gain.T))
return new_mean, new_covariance
① 首先将mean
, covariance
参数通过投射变换转入到传感器(目标检测器)空间向量中去projected_mean, projected_cov = self.project(mean, covariance)
。
② 接着计算卡尔曼增益
, 因上面在计算协防方差矩阵的时候已经考虑到检测器的误差所以这里的不需要考虑。
chol_factor, lower = scipy.linalg.cho_factor(projected_cov, lower=True, check_finite=False)
kalman_gain = scipy.linalg.cho_solve((chol_factor, lower), np.dot(covariance, self._update_mat.T).T,check_finite=False).T
我们的到更新后参数,该参数作用于后期预测下一个的卡尔曼滤波的状态值。
③ 计算更新后新的mean
及covariance
innovation = measurement - projected_mean # measurement
new_mean = mean + np.dot(innovation, kalman_gain.T)
new_covariance = covariance - np.linalg.multi_dot((kalman_gain, projected_cov, kalman_gain.T))
1.2.6 计算状态分布和测量检测框之间距离函数
def gating_distance(self, mean, covariance, measurements,
only_position=False):
"""Compute gating distance between state distribution and measurements.
A suitable distance threshold can be obtained from `chi2inv95`. If
`only_position` is False, the chi-square distribution has 4 degrees of
freedom, otherwise 2.
Parameters
----------
mean : ndarray
Mean vector over the state distribution (8 dimensional).
covariance : ndarray
Covariance of the state distribution (8x8 dimensional).
measurements : ndarray
An Nx4 dimensional matrix of N measurements, each in
format (x, y, a, h) where (x, y) is the bounding box center
position, a the aspect ratio, and h the height.
only_position : Optional[bool]
If True, distance computation is done with respect to the bounding
box center position only.
Returns
-------
ndarray
Returns an array of length N, where the i-th element contains the
squared Mahalanobis distance between (mean, covariance) and
`measurements[i]`.
"""
mean, covariance = self.project(mean, covariance)
if only_position:
mean, covariance = mean[:2], covariance[:2, :2]
measurements = measurements[:, :2]
cholesky_factor = np.linalg.cholesky(covariance)
d = measurements - mean
z = scipy.linalg.solve_triangular(
cholesky_factor, d.T, lower=True, check_finite=False,
overwrite_b=True)
squared_maha = np.sum(z * z, axis=0)
return squared_maha
这里根据器检测的均值,卡尔曼滤波均值以及卡尔曼滤波协方差矩阵计算卡尔曼滤波得出的估计值(mean, variance)以及检测器得出的估计值(mean1,variance)的马氏距离(Mahalanobis)。
至于为什么要使用马氏距离可以参考这篇博客 马氏距离通俗理解。
也可以参考下面这段话
添加马氏距离与余弦距离:实际上是针对运动信息与外观信息的计算。两个名词听着较为陌生,而实际上换句话解释,马氏距离就是“加强版的欧氏距离”。它实际上是规避了欧氏距离中对于数据特征方差不同的风险,在计算中添加了协方差矩阵,其目的就是进行方差归一化,从而使所谓的“距离”更加符合数据特征以及实际意义。马氏距离是对于差异度的衡量中,的一种距离度量方式,而不同于马氏距离,余弦距离则是一种相似度度量方式。前者是针对于位置进行区分,而后者则是针对于方向。换句话说,我们使用余弦距离的时候,可以用来衡量不同个体在维度之间的差异,而一个个体中,维度与维度的差异我们却不好判断,此时我们可以使用马氏距离进行弥补,从而在整体上可以达到一个相对于全面的差异性衡量。而我们之所以要进行差异性衡量,根本目的也是想比较检测器与跟踪器的相似程度,优化度量方式,也可以更好地完成匹配。我们通过该方法可以很好的估算出卡尔曼滤波与检测器的相似度,继而方便后续得出其代价矩阵。
说了这么多咱们终于理解卡尔曼滤波也对其中的代码有了一定的熟悉,下面还有一个大的背景知识点就是匈牙利匹配算法。
1.3 匈牙利匹配算法(Hungarian Matching Algorithm)
为什么要涉及到匈牙利匹配算法呢?这主要原因是我们得出的卡尔曼滤波的出的结果如何与每一个目标id很好的匹配上,匈牙利算法可以告诉我们当前帧的某个目标,是否与前一帧的某个目标相同。
为了能更好的方便解释匈牙利匹配算法,这篇文章 目标跟踪初探(DeepSORT)的示例非常好的进行了诠释。
首先,先介绍一下什么是分配问题(Assignment Problem):假设有N个人和N个任务,每个任务可以任意分配给不同的人,已知每个人完成每个任务要花费的代价不尽相同,那么如何分配可以使得总的代价最小。举个例子,假设现在有3个任务,要分别分配给3个人,每个人完成各个任务所需代价矩阵(cost matrix)如下所示(这个代价可以是金钱、时间等等):
怎样才能找到一个最优分配,使得完成所有任务花费的代价最小呢?
匈牙利算法(又叫KM算法)就是用来解决分配问题的一种方法,它基于定理:
如果代价矩阵的某一行或某一列同时加上或减去某个数,则这个新的代价矩阵的最优分配仍然是原代价矩阵的最优分配。
算法步骤(假设矩阵为NxN方阵):
【1】对于矩阵的每一行,减去其中最小的元素
【2】对于矩阵的每一列,减去其中最小的元素
【3】用最少的水平线或垂直线覆盖矩阵中所有的0
【4】如果线的数量等于N,则找到了最优分配,算法结束,否则进入步骤5
【5】找到没有被任何线覆盖的最小元素,每个没被线覆盖的行减去这个元素,每个被线覆【6】盖的列加上这个元素,返回步骤3
继续拿上面的例子做演示
sklearn里的linear_assignment()函数以及scipy里的linear_sum_assignment()函数都实现了匈牙利算法,两者的返回值的形式不同:
import numpy as np
from sklearn.utils.linear_assignment_ import linear_assignment
from scipy.optimize import linear_sum_assignment
cost_matrix = np.array([
[15,40,45],
[20,60,35],
[20,40,25]
])
matches = linear_assignment(cost_matrix)
print('sklearn API result:\n', matches)
matches = linear_sum_assignment(cost_matrix)
print('scipy API result:\n', matches)
"""Outputs
sklearn API result:
[[0 1]
[1 0]
[2 2]]
scipy API result:
(array([0, 1, 2], dtype=int64), array([1, 0, 2], dtype=int64))
"""
二、 流程
介绍了上述的背景知识,我们可以开始我们跟踪算法的流程梳理了,其实以往的SORT算法,就是仅仅根据上述的背景知识就可以进行目标跟踪了。这篇文章中(多目标跟踪(MOT)入门)的很多有关于介绍deepsort流程有很多很棒的图片,通过图片可以很好的去理解整个流程,当然我也会结合代码去梳理里面的逻辑。
2.1 SORT算法流程图
关键步骤:轨迹卡尔曼滤波预测→ 使用匈牙利算法将预测后的tracks和当前帧中的detecions进行匹配(IOU匹配) → 卡尔曼滤波更新。 对于没有匹配上的轨迹,也不是马上就删掉了,有个T_lost的保存时间,但SORT里把这个时间阈值设置的是1,也就是说对于没匹配上的轨迹相当于直接删了。
2.2 DEEPSORT关键点梳理
相对于SORT算法比,该算法多了级联匹配(Matching Cascade)和新轨迹的确认(confirmed)。
2.2.1 跟踪器(Tracker)
在介绍每个零件的之前,我们先介绍每个跟踪器(tracker)的类。
# vim: expandtab:ts=4:sw=4
class TrackState:
"""
Enumeration type for the single target track state. Newly created tracks are
classified as `tentative` until enough evidence has been collected. Then,
the track state is changed to `confirmed`. Tracks that are no longer alive
are classified as `deleted` to mark them for removal from the set of active
tracks.
"""
Tentative = 1
Confirmed = 2
Deleted = 3
class Track:
"""
A single target track with state space `(x, y, a, h)` and associated
velocities, where `(x, y)` is the center of the bounding box, `a` is the
aspect ratio and `h` is the height.
Parameters
----------
mean : ndarray
Mean vector of the initial state distribution.
covariance : ndarray
Covariance matrix of the initial state distribution.
track_id : int
A unique track identifier.
n_init : int
Number of consecutive detections before the track is confirmed. The
track state is set to `Deleted` if a miss occurs within the first
`n_init` frames.
max_age : int
The maximum number of consecutive misses before the track state is
set to `Deleted`.
feature : Optional[ndarray]
Feature vector of the detection this track originates from. If not None,
this feature is added to the `features` cache.
Attributes
----------
mean : ndarray
Mean vector of the initial state distribution.
covariance : ndarray
Covariance matrix of the initial state distribution.
track_id : int
A unique track identifier.
hits : int
Total number of measurement updates.
age : int
Total number of frames since first occurance.
time_since_update : int
Total number of frames since last measurement update.
state : TrackState
The current track state.
features : List[ndarray]
A cache of features. On each measurement update, the associated feature
vector is added to this list.
"""
def __init__(self, mean, covariance, track_id, class_id, n_init, max_age,
feature=None):
self.mean = mean
self.covariance = covariance
self.track_id = track_id
self.class_id = class_id
self.hits = 1 # 连续匹配多少次
self.age = 1
self.time_since_update = 0 # 有多少帧track还没有匹配上,更新+1, 匹配清0
self.yolo_bbox = [0, 0, 0, 0]
self.state = TrackState.Tentative
self.features = []
if feature is not None:
self.features.append(feature)
self._n_init = n_init
self._max_age = max_age
def to_tlwh(self):
"""Get current position in bounding box format `(top left x, top left y,
width, height)`.
Returns
-------
ndarray
The bounding box.
"""
ret = self.mean[:4].copy()
ret[2] *= ret[3]
ret[:2] -= ret[2:] / 2
return ret
def to_tlbr(self):
"""Get kf estimated current position in bounding box format `(min x, miny, max x,
max y)`.
Returns
-------
ndarray
The predicted kf bounding box.
"""
ret = self.to_tlwh()
ret[2:] = ret[:2] + ret[2:]
return ret
def get_yolo_pred(self):
"""Get yolo prediction`.
Returns
-------
ndarray
The yolo bounding box.
"""
return self.yolo_bbox
def increment_age(self):
self.age += 1
self.time_since_update += 1
def predict(self, kf):
"""Propagate the state distribution to the current time step using a
Kalman filter prediction step.
Parameters
----------
kf : kalman_filter.KalmanFilter
The Kalman filter.
"""
self.mean, self.covariance = kf.predict(self.mean, self.covariance)
self.increment_age()
def update(self, kf, detection, class_id):
"""Perform Kalman filter measurement update step and update the feature
cache.
Parameters
----------
kf : kalman_filter.KalmanFilter
The Kalman filter.
detection : Detection
The associated detection.
"""
self.yolo_bbox = detection
self.mean, self.covariance = kf.update(
self.mean, self.covariance, detection.to_xyah())
self.features.append(detection.feature)
self.class_id = class_id
self.hits += 1
self.time_since_update = 0
# 匹配的时候(update)time_since_update +需要置0,predict加1
if self.state == TrackState.Tentative and self.hits >= self._n_init:
self.state = TrackState.Confirmed
def mark_missed(self):
"""Mark this track as missed (no association at the current time step).
"""
if self.state == TrackState.Tentative:
self.state = TrackState.Deleted
elif self.time_since_update > self._max_age:
self.state = TrackState.Deleted
def is_tentative(self):
"""Returns True if this track is tentative (unconfirmed).
"""
return self.state == TrackState.Tentative
def is_confirmed(self):
"""Returns True if this track is confirmed."""
return self.state == TrackState.Confirmed
def is_deleted(self):
"""Returns True if this track is dead and should be deleted."""
return self.state == TrackState.Deleted
2.2.2 目标检测器(Detections)
这里很好理解利用检测器检测出目标在图片中的位置(通常由4个坐标形成),对应的检测器有centernet, yolov系列等。这里使用的是yolov5
pred = model(img, augment=opt.augment, visualize=visualize)
# Apply NMS
pred = non_max_suppression(pred, opt.conf_thres, opt.iou_thres, opt.classes, opt.agnostic_nms, max_det=opt.max_det)
# Process detections
for i, det in enumerate(pred): # detections per image
seen += 1
if webcam: # batch_size >= 1
p, im0, frame = path[i], im0s[i].copy(), dataset.count
s += f'{i}: '
else:
p, im0, frame = path, im0s.copy(), getattr(dataset, 'frame', 0)
s += '%gx%g ' % img.shape[2:] # print string
save_path = str(Path(out) / Path(p).name)
annotator = Annotator(im0, line_width=2, pil=not ascii)
if det is not None and len(det):
# Rescale boxes from img_size to im0 size
det[:, :4] = scale_coords(
img.shape[2:], det[:, :4], im0.shape).round()
# Print results
for c in det[:, -1].unique():
n = (det[:, -1] == c).sum() # detections per class
s += f"{n} {names[int(c)]}{'s' * (n > 1)}, " # add to string
xywhs = xyxy2xywh(det[:, 0:4])
confs = det[:, 4]
clss = det[:, 5]
2.2.3 卡尔曼滤波器(Kalman Filter Predict)
我们基于卡尔曼滤波通过前面帧的信息来预测下一帧目标的位置。这里关于卡尔曼滤波的代码上面介绍了就不再说明了。
2.2.4 确认状态设定(Confirmed)
首先需要说的一点就是图中的tracks对应不同的目标,对应每一个目标则属于一个track。刚开始的时候我们所有的track都属于未确定的状态,如果我们的目标检测器检测到的目标与卡尔曼滤波预测目标匹配的时候我们认为其已经匹配,在这里如果连续3次
的时候此时我们设定该track(目标)为确认状态。
2.2.5 IOU匹配(IOU Matched)
这里关于IOU匹配的阈值设定为0.7
,如果大于0.7
我们认为其是匹配的,否则我们认为是不匹配的, 将该tracker删除。
2.2.6 级联匹配(Matching Cascade)
这里使用级联匹配的目的是两个目标遮挡的情况,刚刚被遮挡的目标的track无法匹配detection,且暂时 从图像中消失,之后背遮挡目标再次出现的时候,应该尽量让被遮挡的目标分配的ID不发生变动,减少ID交换的情况,因此我们就需要用到级联匹配。关于级联匹配的过程我们可以如下的图来理解:
【1】首先能进入到级联匹配的预设条件:
连续匹配3次,即该tracker的状态必须为confirmed
【2】并且进入该状态的tracker有一个vip待遇:
即如果遇到检测器级联匹配失败同时IOU匹配失败的情况(缺失一次,missing_age + 1),仍然可以容忍max_age=70帧。如果missing_age超过这个帧数(70)则将该track删除。
【3】级联匹配的代价函数:
级联匹配的代价函数是由卡尔曼滤波的均值(目标坐标)以及协方差与检测器的均值(目标坐标)的马式距离(Mahalanobis distance)
与检测器目标的ReId特征与卡尔曼滤波ReId特征余弦相似度(Cosine Distance)(⭐️注意这里我们取相似度最大及距离最小的相似度)
相加。这里有一个参数,关于这个参数我们可以根据相机的运动抖动情况判断,当其运动明显的时候,我们设置该参数为0是一个合理的选择。【4】级联匹配的优先级:
级联匹配优先匹配丢失匹配帧最少的目标,也就是说没有丢失过的轨迹赋予优先匹配的权利,而丢失的最久的轨迹最后匹配
。【5】门矩阵(Gate Matrix)
当cost matrix
的某些值超出一定范围,我们设定其上限,具体我们可以根据代码细说。
代码如下:
# linear_assignment.py
def matching_cascade(
distance_metric, max_distance, cascade_depth, tracks, detections,
track_indices=None, detection_indices=None):
"""Run matching cascade.
Parameters
----------
distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray
The distance metric is given a list of tracks and detections as well as
a list of N track indices and M detection indices. The metric should
return the NxM dimensional cost matrix, where element (i, j) is the
association cost between the i-th track in the given track indices and
the j-th detection in the given detection indices.
max_distance : float
Gating threshold. Associations with cost larger than this value are
disregarded.
cascade_depth: int
The cascade depth, should be se to the maximum track age.
tracks : List[track.Track]
A list of predicted tracks at the current time step.
detections : List[detection.Detection]
A list of detections at the current time step.
track_indices : Optional[List[int]]
List of track indices that maps rows in `cost_matrix` to tracks in
`tracks` (see description above). Defaults to all tracks.
detection_indices : Optional[List[int]]
List of detection indices that maps columns in `cost_matrix` to
detections in `detections` (see description above). Defaults to all
detections.
Returns
-------
(List[(int, int)], List[int], List[int])
Returns a tuple with the following three entries:
* A list of matched track and detection indices.
* A list of unmatched track indices.
* A list of unmatched detection indices.
"""
if track_indices is None:
track_indices = list(range(len(tracks)))
if detection_indices is None:
detection_indices = list(range(len(detections)))
unmatched_detections = detection_indices
matches = []
for level in range(cascade_depth):
if len(unmatched_detections) == 0: # No detections left
break
track_indices_l = [
k for k in track_indices
if tracks[k].time_since_update == 1 + level
]
if len(track_indices_l) == 0: # Nothing to match at this level
continue
matches_l, _, unmatched_detections = \
min_cost_matching(
distance_metric, max_distance, tracks, detections,
track_indices_l, unmatched_detections)
matches += matches_l
unmatched_tracks = list(set(track_indices) - set(k for k, _ in matches))
return matches, unmatched_tracks, unmatched_detections
# tracker.py
def gated_metric(tracks, dets, track_indices, detection_indices):
features = np.array([dets[i].feature for i in detection_indices])
targets = np.array([tracks[i].track_id for i in track_indices])
cost_matrix = self.metric.distance(features, targets)
cost_matrix = linear_assignment.gate_cost_matrix(
self.kf, cost_matrix, tracks, dets, track_indices,
detection_indices)
return cost_matrix
2.2.7 特征提取
这里我们可以使用简单的reid网络或普通的分类网络即可,只要我们通过该网络可以提取每个目标的特征即可,这里我们通过网络得到128维度的特征。在输入该模型之前我们需要将图片resize(64,128)大小的图片整理成一个batch然后再进入模型。
# 特征提取器
class Extractor(object):
def __init__(self, model_path, use_cuda=True):
self.net = Net(reid=True)
self.device = "cuda" if torch.cuda.is_available() and use_cuda else "cpu"
state_dict = torch.load(model_path, map_location=torch.device(self.device))[
'net_dict']
self.net.load_state_dict(state_dict)
logger = logging.getLogger("root.tracker")
logger.info("Loading weights from {}... Done!".format(model_path))
self.net.to(self.device)
self.size = (64, 128)
self.norm = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
])
# 图片预处理
def _preprocess(self, im_crops):
"""
TODO:
1. to float with scale from 0 to 1
2. resize to (64, 128) as Market1501 dataset did
3. concatenate to a numpy array
3. to torch Tensor
4. normalize
"""
def _resize(im, size):
return cv2.resize(im.astype(np.float32)/255., size)
im_batch = torch.cat([self.norm(_resize(im, self.size)).unsqueeze(
0) for im in im_crops], dim=0).float()
return im_batch
# 输入模型
def __call__(self, im_crops):
im_batch = self._preprocess(im_crops)
with torch.no_grad():
im_batch = im_batch.to(self.device)
features = self.net(im_batch)
return features.cpu().numpy()
特征提取网络结构如下,然后得到128维的特征:
# 特征提取网络
class Net(nn.Module):
def __init__(self, num_classes=751, reid=False):
super(Net, self).__init__()
# 3 128 64
self.conv = nn.Sequential(
nn.Conv2d(3, 64, 3, stride=1, padding=1),
nn.BatchNorm2d(64),
nn.ReLU(inplace=True),
# nn.Conv2d(32,32,3,stride=1,padding=1),
# nn.BatchNorm2d(32),
# nn.ReLU(inplace=True),
nn.MaxPool2d(3, 2, padding=1),
)
# 32 64 32
self.layer1 = make_layers(64, 64, 2, False)
# 32 64 32
self.layer2 = make_layers(64, 128, 2, True)
# 64 32 16
self.layer3 = make_layers(128, 256, 2, True)
# 128 16 8
self.layer4 = make_layers(256, 512, 2, True)
# 256 8 4
self.avgpool = nn.AvgPool2d((8, 4), 1)
# 256 1 1
self.reid = reid
self.classifier = nn.Sequential(
nn.Linear(512, 256),
nn.BatchNorm1d(256),
nn.ReLU(inplace=True),
nn.Dropout(),
nn.Linear(256, num_classes),
)
def forward(self, x):
x = self.conv(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.avgpool(x)
x = x.view(x.size(0), -1)
# B x 128
if self.reid:
x = x.div(x.norm(p=2, dim=1, keepdim=True))
return x
# classifier
x = self.classifier(x)
return x
2.2.8 ⭐️参数更新(Filter)
从图中可以看到无论是IOU匹配还是级联匹配,只要匹配上我们就需要更新我们的卡尔曼参数。
2.3 DEEPSORT流程
【1】首先我们输入的图片经过目标检测器2.2.2得到目标框的坐标位置,目标的置信度以及目标的类别
【2】将上述的信息输入到deepsort环节中去
import numpy as np
import torch
from .deep.feature_extractor import Extractor
from .sort.nn_matching import NearestNeighborDistanceMetric
from .sort.detection import Detection
from .sort.tracker import Tracker
__all__ = ['DeepSort']
class DeepSort(object):
def __init__(self, model_path, max_dist=0.2, min_confidence=0.3, max_iou_distance=0.7, max_age=70, n_init=3, nn_budget=100, use_cuda=True):
self.min_confidence = min_confidence
self.extractor = Extractor(model_path, use_cuda=use_cuda)
max_cosine_distance = max_dist
metric = NearestNeighborDistanceMetric(
"cosine", max_cosine_distance, nn_budget)
self.tracker = Tracker(
metric, max_iou_distance=max_iou_distance, max_age=max_age, n_init=n_init)
def update(self, bbox_xywh, confidences, classes, ori_img, use_yolo_preds=True):
self.height, self.width = ori_img.shape[:2]
# generate detections
features = self._get_features(bbox_xywh, ori_img)
bbox_tlwh = self._xywh_to_tlwh(bbox_xywh)
detections = [Detection(bbox_tlwh[i], conf, features[i]) for i, conf in enumerate(
confidences) if conf > self.min_confidence]
# run on non-maximum supression
boxes = np.array([d.tlwh for d in detections])
scores = np.array([d.confidence for d in detections])
# update tracker
self.tracker.predict()
self.tracker.update(detections, classes)
# output bbox identities
outputs = []
for track in self.tracker.tracks:
if not track.is_confirmed() or track.time_since_update > 1:
continue
if use_yolo_preds:
det = track.get_yolo_pred()
x1, y1, x2, y2 = self._tlwh_to_xyxy(det.tlwh)
else:
box = track.to_tlwh()
x1, y1, x2, y2 = self._tlwh_to_xyxy(box)
track_id = track.track_id
class_id = track.class_id
outputs.append(np.array([x1, y1, x2, y2, track_id, class_id], dtype=np.int))
if len(outputs) > 0:
outputs = np.stack(outputs, axis=0)
return outputs
"""
TODO:
Convert bbox from xc_yc_w_h to xtl_ytl_w_h
Thanks JieChen91@github.com for reporting this bug!
"""
@staticmethod
def _xywh_to_tlwh(bbox_xywh):
if isinstance(bbox_xywh, np.ndarray):
bbox_tlwh = bbox_xywh.copy()
elif isinstance(bbox_xywh, torch.Tensor):
bbox_tlwh = bbox_xywh.clone()
bbox_tlwh[:, 0] = bbox_xywh[:, 0] - bbox_xywh[:, 2] / 2.
bbox_tlwh[:, 1] = bbox_xywh[:, 1] - bbox_xywh[:, 3] / 2.
return bbox_tlwh
def _xywh_to_xyxy(self, bbox_xywh):
x, y, w, h = bbox_xywh
x1 = max(int(x - w / 2), 0)
x2 = min(int(x + w / 2), self.width - 1)
y1 = max(int(y - h / 2), 0)
y2 = min(int(y + h / 2), self.height - 1)
return x1, y1, x2, y2
def _tlwh_to_xyxy(self, bbox_tlwh):
"""
TODO:
Convert bbox from xtl_ytl_w_h to xc_yc_w_h
Thanks JieChen91@github.com for reporting this bug!
"""
x, y, w, h = bbox_tlwh
x1 = max(int(x), 0)
x2 = min(int(x+w), self.width - 1)
y1 = max(int(y), 0)
y2 = min(int(y+h), self.height - 1)
return x1, y1, x2, y2
def increment_ages(self):
self.tracker.increment_ages()
def _xyxy_to_tlwh(self, bbox_xyxy):
x1, y1, x2, y2 = bbox_xyxy
t = x1
l = y1
w = int(x2 - x1)
h = int(y2 - y1)
return t, l, w, h
def _get_features(self, bbox_xywh, ori_img):
im_crops = []
for box in bbox_xywh:
x1, y1, x2, y2 = self._xywh_to_xyxy(box)
im = ori_img[y1:y2, x1:x2]
im_crops.append(im)
if im_crops:
features = self.extractor(im_crops)
else:
features = np.array([])
return features
(1) update
函数
a. 通过目标的位置以及原图通过特征提取器提取128维度的目标特征,接着过滤掉置信度低于0.3
的目标存储到detections类。
class Detection(object):
"""
This class represents a bounding box detection in a single image.
Parameters
----------
tlwh : array_like
Bounding box in format `(x, y, w, h)`.
confidence : float
Detector confidence score.
feature : array_like
A feature vector that describes the object contained in this image.
Attributes
----------
tlwh : ndarray
Bounding box in format `(top left x, top left y, width, height)`.
confidence : ndarray
Detector confidence score.
feature : ndarray | NoneType
A feature vector that describes the object contained in this image.
"""
def __init__(self, tlwh, confidence, feature):
self.tlwh = np.asarray(tlwh, dtype=np.float)
self.confidence = float(confidence)
self.feature = np.asarray(feature, dtype=np.float32)
def to_tlbr(self):
"""Convert bounding box to format `(min x, min y, max x, max y)`, i.e.,
`(top left, bottom right)`.
"""
ret = self.tlwh.copy()
ret[2:] += ret[:2]
return ret
def to_xyah(self):
"""Convert bounding box to format `(center x, center y, aspect ratio,
height)`, where the aspect ratio is `width / height`.
"""
ret = self.tlwh.copy()
ret[:2] += ret[2:] / 2
ret[2] /= ret[3]
return ret
features = self._get_features(bbox_xywh, ori_img)
bbox_tlwh = self._xywh_to_tlwh(bbox_xywh)
detections = [Detection(bbox_tlwh[i], conf, features[i]) for i, conf in enumerate(confidences) if conf > self.min_confidence]
b. 卡尔曼滤波更新与预测
self.tracker.predict()
self.tracker.update(detections, classes)
预测的时候, 输入mean以及convariance,输出新的mean, 新的convariance,🚩同时age以及,time_since_update 加1。
def increment_age(self):
self.age += 1 # 该track自出现以来的总帧数加1
self.time_since_update += 1 # # 该track自最近一次更新以来的总帧数加1
def predict(self, kf):
"""Propagate the state distribution to the current time step using a
Kalman filter prediction step.
Parameters
----------
kf : kalman_filter.KalmanFilter
The Kalman filter.
"""
self.mean, self.covariance = kf.predict(self.mean, self.covariance)
self.increment_age()
更新的时候是一个相对复杂的过程。
def update(self, detections, classes):
"""Perform measurement update and track management.
Parameters
----------
detections : List[deep_sort.detection.Detection]
A list of detections at the current time step.
"""
# Run matching cascade.
matches, unmatched_tracks, unmatched_detections = \
self._match(detections)
# Update track set.
for track_idx, detection_idx in matches: # 对匹配上的进行参数更新
self.tracks[track_idx].update(
self.kf, detections[detection_idx], classes[detection_idx])
for track_idx in unmatched_tracks:
self.tracks[track_idx].mark_missed()
for detection_idx in unmatched_detections:
self._initiate_track(detections[detection_idx], classes[detection_idx].item())
self.tracks = [t for t in self.tracks if not t.is_deleted()]
# Update distance metric.
active_targets = [t.track_id for t in self.tracks if t.is_confirmed()]
features, targets = [], []
for track in self.tracks:
if not track.is_confirmed():
continue
features += track.features
targets += [track.track_id for _ in track.features]
track.features = []
self.metric.partial_fit(
np.asarray(features), np.asarray(targets), active_targets)
def _match(self, detections):
def gated_metric(tracks, dets, track_indices, detection_indices):
features = np.array([dets[i].feature for i in detection_indices])
targets = np.array([tracks[i].track_id for i in track_indices])
cost_matrix = self.metric.distance(features, targets)
cost_matrix = linear_assignment.gate_cost_matrix(
self.kf, cost_matrix, tracks, dets, track_indices,
detection_indices)
return cost_matrix
# Split track set into confirmed and unconfirmed tracks.
confirmed_tracks = [
i for i, t in enumerate(self.tracks) if t.is_confirmed()]
unconfirmed_tracks = [
i for i, t in enumerate(self.tracks) if not t.is_confirmed()]
# Associate confirmed tracks using appearance features.
# 对confirmd tracks进行级联匹配
matches_a, unmatched_tracks_a, unmatched_detections = \
linear_assignment.matching_cascade(
gated_metric, self.metric.matching_threshold, self.max_age,
self.tracks, detections, confirmed_tracks)
# Associate remaining tracks together with unconfirmed tracks using IOU.
# 将所有状态为未确定态的轨迹和刚刚没有匹配上的轨迹组合为iou_track_candidates,进行IoU的匹配
iou_track_candidates = unconfirmed_tracks + [ # a 是级联匹配失败剩下的
k for k in unmatched_tracks_a if
self.tracks[k].time_since_update == 1] # 刚刚没有匹配上
# 未匹配
unmatched_tracks_a = [
k for k in unmatched_tracks_a if
self.tracks[k].time_since_update != 1] # 已经很久没有匹配上
matches_b, unmatched_tracks_b, unmatched_detections = \
linear_assignment.min_cost_matching(
iou_matching.iou_cost, self.max_iou_distance, self.tracks,
detections, iou_track_candidates, unmatched_detections)
matches = matches_a + matches_b
unmatched_tracks = list(set(unmatched_tracks_a + unmatched_tracks_b))
return matches, unmatched_tracks, unmatched_detections
# linear_assignment.py
def matching_cascade(
distance_metric, max_distance, cascade_depth, tracks, detections,
track_indices=None, detection_indices=None):
"""Run matching cascade.
Parameters
----------
distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray
The distance metric is given a list of tracks and detections as well as
a list of N track indices and M detection indices. The metric should
return the NxM dimensional cost matrix, where element (i, j) is the
association cost between the i-th track in the given track indices and
the j-th detection in the given detection indices.
max_distance : float
Gating threshold. Associations with cost larger than this value are
disregarded.
cascade_depth: int
The cascade depth, should be se to the maximum track age.
tracks : List[track.Track]
A list of predicted tracks at the current time step.
detections : List[detection.Detection]
A list of detections at the current time step.
track_indices : Optional[List[int]]
List of track indices that maps rows in `cost_matrix` to tracks in
`tracks` (see description above). Defaults to all tracks.
detection_indices : Optional[List[int]]
List of detection indices that maps columns in `cost_matrix` to
detections in `detections` (see description above). Defaults to all
detections.
Returns
-------
(List[(int, int)], List[int], List[int])
Returns a tuple with the following three entries:
* A list of matched track and detection indices.
* A list of unmatched track indices.
* A list of unmatched detection indices.
"""
if track_indices is None:
track_indices = list(range(len(tracks)))
if detection_indices is None:
detection_indices = list(range(len(detections)))
unmatched_detections = detection_indices
matches = []
# 由小到大依次对每个level的tracks做匹配, 最近丢失匹配帧越小的目标越先进行匹配
for level in range(cascade_depth):
if len(unmatched_detections) == 0: # No detections left
break
track_indices_l = [
k for k in track_indices
if tracks[k].time_since_update == 1 + level
]
if len(track_indices_l) == 0: # Nothing to match at this level
continue
matches_l, _, unmatched_detections = \
min_cost_matching(
distance_metric, max_distance, tracks, detections,
track_indices_l, unmatched_detections)
matches += matches_l
unmatched_tracks = list(set(track_indices) - set(k for k, _ in matches))
return matches, unmatched_tracks, unmatched_detections
首先通过_self.match
函数来判断哪些目标匹配,哪些卡尔曼滤波没有匹配,哪些目标检测没有匹配。接着由小到大依次对每个level的tracks做匹配, 最近丢失匹配帧越小的目标越先进行匈牙利匹配。
matches, unmatched_tracks, unmatched_detections = \
self._match(detections)
匈牙利匹配如下代码,这部分代码中还有一个核心的函数min_cost_matching,这个函数可以接收不同的distance_metric,在级联匹配和IoU匹配中都有用到。
如:
# Associate confirmed tracks using appearance features.
matches_a, unmatched_tracks_a, unmatched_detections = \
linear_assignment.matching_cascade(
gated_metric, self.metric.matching_threshold, self.max_age,
self.tracks, detections, confirmed_tracks)
matches_b, unmatched_tracks_b, unmatched_detections = \
linear_assignment.min_cost_matching(
iou_matching.iou_cost, self.max_iou_distance, self.tracks,
detections, iou_track_candidates, unmatched_detections)
def min_cost_matching(
distance_metric, max_distance, tracks, detections, track_indices=None,
detection_indices=None):
"""Solve linear assignment problem.
Parameters
----------
distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray
The distance metric is given a list of tracks and detections as well as
a list of N track indices and M detection indices. The metric should
return the NxM dimensional cost matrix, where element (i, j) is the
association cost between the i-th track in the given track indices and
the j-th detection in the given detection_indices.
max_distance : float
Gating threshold. Associations with cost larger than this value are
disregarded.
tracks : List[track.Track]
A list of predicted tracks at the current time step.
detections : List[detection.Detection]
A list of detections at the current time step.
track_indices : List[int]
List of track indices that maps rows in `cost_matrix` to tracks in
`tracks` (see description above).
detection_indices : List[int]
List of detection indices that maps columns in `cost_matrix` to
detections in `detections` (see description above).
Returns
-------
(List[(int, int)], List[int], List[int])
Returns a tuple with the following three entries:
* A list of matched track and detection indices.
* A list of unmatched track indices.
* A list of unmatched detection indices.
"""
if track_indices is None:
track_indices = np.arange(len(tracks))
if detection_indices is None:
detection_indices = np.arange(len(detections))
if len(detection_indices) == 0 or len(track_indices) == 0:
return [], track_indices, detection_indices # Nothing to match.
# -----------------------------------------
# Gated_distance——>
# 1. cosine distance
# 2. 马氏距离
# 得到代价矩阵
# -----------------------------------------
# iou_cost——>
# 仅仅计算track和detection之间的iou距离
# -----------------------------------------
cost_matrix = distance_metric(
tracks, detections, track_indices, detection_indices)
cost_matrix[cost_matrix > max_distance] = max_distance + 1e-5
# -----------------------------------------
# gated_distance中设置距离中最高上限,
# 这里最远距离实际是在deep sort类中的max_dist参数设置的
# 默认max_dist=0.2, 距离越小越好
# -----------------------------------------
# iou_cost情况下,max_distance的设置对应tracker中的max_iou_distance,
# 默认值为max_iou_distance=0.7
# 注意结果是1-iou,所以越小越好
# -----------------------------------------
row_indices, col_indices = linear_assignment(cost_matrix)
matches, unmatched_tracks, unmatched_detections = [], [], []
for col, detection_idx in enumerate(detection_indices): # 没匹配的tdetection有哪些
if col not in col_indices: # 查看检测和匈牙利匹配是否对应 对应上了则匹配,对应不上不匹配
unmatched_detections.append(detection_idx)
for row, track_idx in enumerate(track_indices): # 没匹配的track有哪些
if row not in row_indices:
unmatched_tracks.append(track_idx)
for row, col in zip(row_indices, col_indices):查看卡尔曼匹配和匈牙利匹配是否对应 对应上了则匹配,对应不上不匹配
track_idx = track_indices[row]
detection_idx = detection_indices[col]
if cost_matrix[row, col] > max_distance: # 就算很匹配,但是当其距离大于0.7仍放到没配对中
unmatched_tracks.append(track_idx)
unmatched_detections.append(detection_idx)
else:
matches.append((track_idx, detection_idx))
return matches, unmatched_tracks, unmatched_detections
首先通过如下代码计算mean,covarinace马氏距离以及特征的余弦距离。
def gated_metric(tracks, dets, track_indices, detection_indices):
features = np.array([dets[i].feature for i in detection_indices])
targets = np.array([tracks[i].track_id for i in track_indices])
cost_matrix = self.metric.distance(features, targets)
cost_matrix = linear_assignment.gate_cost_matrix(self.kf,cost_matrix, tracks, dets, track_indices,detection_indices)
return cost_matrix
cost_matrix = distance_metric(tracks, detections, track_indices, detection_indices)
cost_matrix[cost_matrix > max_distance] = max_distance + 1e-5
这里linear_assignment.gate_cost_matrix
函数如下.
门控矩阵的作用就是通过计算卡尔曼滤波的状态分布和测量值之间的距离对代价矩阵进行限制。代价矩阵中的距离是Track和Detection之间的表观相似度,假如一个轨迹要去匹配两个表观特征非常相似的Detection,这样就很容易出错,但是这个时候分别让两个Detection计算与这个轨迹的马氏距离,并使用一个阈值gating_threshold进行限制,所以就可以将马氏距离较远的那个Detection区分开,可以降低错误的匹配。
def gate_cost_matrix(
kf, cost_matrix, tracks, detections, track_indices, detection_indices,
gated_cost=INFTY_COST, only_position=False):
"""Invalidate infeasible entries in cost matrix based on the state
distributions obtained by Kalman filtering.
Parameters
----------
kf : The Kalman filter.
cost_matrix : ndarray
The NxM dimensional cost matrix, where N is the number of track indices
and M is the number of detection indices, such that entry (i, j) is the
association cost between `tracks[track_indices[i]]` and
`detections[detection_indices[j]]`.
tracks : List[track.Track]
A list of predicted tracks at the current time step.
detections : List[detection.Detection]
A list of detections at the current time step.
track_indices : List[int]
List of track indices that maps rows in `cost_matrix` to tracks in
`tracks` (see description above).
detection_indices : List[int]
List of detection indices that maps columns in `cost_matrix` to
detections in `detections` (see description above).
gated_cost : Optional[float]
Entries in the cost matrix corresponding to infeasible associations are
set this value. Defaults to a very large value.
only_position : Optional[bool]
If True, only the x, y position of the state distribution is considered
during gating. Defaults to False.
Returns
-------
ndarray
Returns the modified cost matrix.
"""
gating_dim = 2 if only_position else 4
gating_threshold = kalman_filter.chi2inv95[gating_dim]
measurements = np.asarray(
[detections[i].to_xyah() for i in detection_indices])
for row, track_idx in enumerate(track_indices):
track = tracks[track_idx]
gating_distance = kf.gating_distance(
track.mean, track.covariance, measurements, only_position)
cost_matrix[row, gating_distance > gating_threshold] = gated_cost
return cost_matrix
如果级联匹配失败的我们将进行IOU匹配。
# Associate remaining tracks together with unconfirmed tracks using IOU.
# 对级联匹配中未匹配的tracks和unconfirmed tracks中time_since_update为1(大于1说明一直没有匹配以前做过iou匹配了, 这里就不需要了)的tracks进行IOU匹配
iou_track_candidates = unconfirmed_tracks + [ # a 是级联匹配失败剩下的
k for k in unmatched_tracks_a if
self.tracks[k].time_since_update == 1] # 刚刚没有匹配上
# 未匹配
unmatched_tracks_a = [k for k in unmatched_tracks_a if
self.tracks[k].time_since_update != 1] # 已经很久没有匹配上,说明做过iou匹配了
matches_b, unmatched_tracks_b, unmatched_detections = \
linear_assignment.min_cost_matching(
iou_matching.iou_cost, self.max_iou_distance, self.tracks,
detections, iou_track_candidates, unmatched_detections)
# iou_matching.py
def iou(bbox, candidates):
"""Computer intersection over union.
Parameters
----------
bbox : ndarray
A bounding box in format `(top left x, top left y, width, height)`.
candidates : ndarray
A matrix of candidate bounding boxes (one per row) in the same format
as `bbox`.
Returns
-------
ndarray
The intersection over union in [0, 1] between the `bbox` and each
candidate. A higher score means a larger fraction of the `bbox` is
occluded by the candidate.
"""
bbox_tl, bbox_br = bbox[:2], bbox[:2] + bbox[2:]
candidates_tl = candidates[:, :2]
candidates_br = candidates[:, :2] + candidates[:, 2:]
tl = np.c_[np.maximum(bbox_tl[0], candidates_tl[:, 0])[:, np.newaxis],
np.maximum(bbox_tl[1], candidates_tl[:, 1])[:, np.newaxis]]
br = np.c_[np.minimum(bbox_br[0], candidates_br[:, 0])[:, np.newaxis],
np.minimum(bbox_br[1], candidates_br[:, 1])[:, np.newaxis]]
wh = np.maximum(0., br - tl)
area_intersection = wh.prod(axis=1)
area_bbox = bbox[2:].prod()
area_candidates = candidates[:, 2:].prod(axis=1)
return area_intersection / (area_bbox + area_candidates - area_intersection)
def iou_cost(tracks, detections, track_indices=None,
detection_indices=None):
"""An intersection over union distance metric.
Parameters
----------
tracks : List[deep_sort.track.Track]
A list of tracks.
detections : List[deep_sort.detection.Detection]
A list of detections.
track_indices : Optional[List[int]]
A list of indices to tracks that should be matched. Defaults to
all `tracks`.
detection_indices : Optional[List[int]]
A list of indices to detections that should be matched. Defaults
to all `detections`.
Returns
-------
ndarray
Returns a cost matrix of shape
len(track_indices), len(detection_indices) where entry (i, j) is
`1 - iou(tracks[track_indices[i]], detections[detection_indices[j]])`.
"""
if track_indices is None:
track_indices = np.arange(len(tracks))
if detection_indices is None:
detection_indices = np.arange(len(detections))
cost_matrix = np.zeros((len(track_indices), len(detection_indices)))
for row, track_idx in enumerate(track_indices):
if tracks[track_idx].time_since_update > 1:
cost_matrix[row, :] = linear_assignment.INFTY_COST
continue
bbox = tracks[track_idx].to_tlwh()
candidates = np.asarray(
[detections[i].tlwh for i in detection_indices])
cost_matrix[row, :] = 1. - iou(bbox, candidates)
return cost_matrix
接着我们需要完成update余下的工作
def mark_missed(self):
"""Mark this track as missed (no association at the current time step).
"""
if self.state == TrackState.Tentative:
self.state = TrackState.Deleted
elif self.time_since_update > self._max_age:
self.state = TrackState.Deleted
def _initiate_track(self, detection, class_id):
mean, covariance = self.kf.initiate(detection.to_xyah())
self.tracks.append(Track(
mean, covariance, self._next_id, class_id, self.n_init, self.max_age,
detection.feature))
self._next_id += 1
# 进行测量的更新和轨迹管理
# Run matching cascade.
def update(self,detections):
matches,unmatched_tracks,unmatched_detections=self._match(detections)
# Update track set.
# 1.针对匹配上的结果
for track_idx,detection_idx in matches:
# track更新对应的detection
self.tracks[track_idx].update(self.kf, detections[detection_idx])
# 2.针对未匹配的tracker,调用mark_missed标记
# track失配,若待定(unconfirmed)则删除,若update时间很久也删除
# max age是一个存活期限,默认为70帧
for track_idx in unmatched_tracks:
self.tracks[track_idx].mark_missed()
# 3.针对未匹配的detection,detection失配,进行初始化
for detection_idx in unmatched_detections:
self._initiate_track(detections[detection_idx])
# 得到最新的tracks列表,保存的是标记为confirmed和Tentative的track
self.tracks=[t for t in self.tracks if not t.is_deleted()]
# Update distance metric.
active_targets=[t.track_id for t in self.tracks if t.is_confirmed()]
# active_targets框可以在画面展示
# 获取所有confirmed状态的track id
features,targets=[],[]
for track in self.tracks:
if not track.is_confirmed():
continue
features+=track.features # 将tracks列表拼接到features列表
# 获取每个feature对应的track id
targets+=[track.track_id for _ in track.features]
track.features=[]
# 距离度量中的特征集更新
self.metric.partial_fit(np.asarray(features),np.asarray(targets),
active_targets)
class NearestNeighborDistanceMetric(object):
# 对于每个目标,返回一个最近的距离
def __init__(self, metric, matching_threshold, budget=None):
# 默认matching_threshold = 0.2 budge = 100
if metric == "euclidean":
# 使用最近邻欧氏距离
self._metric = _nn_euclidean_distance
elif metric == "cosine":
# 使用最近邻余弦距离
self._metric = _nn_cosine_distance
else:
raise ValueError("Invalid metric; must be either 'euclidean' or 'cosine'")
self.matching_threshold = matching_threshold
# 在级联匹配的函数中调用
self.budget = budget
# budge 预算,控制feature的多少
self.samples = {}
# samples是一个字典{id->feature list}
def partial_fit(self, features, targets, active_targets):
# 作用:部分拟合,用新的数据更新测量距离
# 调用:在特征集更新模块部分调用,tracker.update()中
for feature, target in zip(features, targets):
self.samples.setdefault(target, []).append(feature)
# 对应目标下添加新的feature,更新feature集合
# 目标id : feature list
if self.budget is not None:
self.samples[target] = self.samples[target][-self.budget:]
# 设置预算,每个类最多多少个目标,超过直接忽略
# 筛选激活的目标
self.samples = {k: self.samples[k] for k in active_targets}
def distance(self, features, targets):
# 作用:比较feature和targets之间的距离,返回一个代价矩阵
# 调用:在匹配阶段,将distance封装为gated_metric,
# 进行外观信息(reid得到的深度特征)+
# 运动信息(马氏距离用于度量两个分布相似程度)
cost_matrix = np.zeros((len(targets), len(features)))
for i, target in enumerate(targets):
cost_matrix[i, :] = self._metric(self.samples[target], features)
return cost_matrix
★上述中应该是每一个目标tracker每一帧保留一个feature, 每次都与其自己的feature作相似度匹配,选最小的,同时随着时间推移不断更新feature budget(这里设置为100), 即每个activate的追踪器保留最近的self.budget条特征,就是自己检测器检测的特征与以往检测出来的feature作比较,选取最接近的feature作为判断,代价函数越小就越属于那一个id。
整体流程可以参考改图( 来源:Deep SORT多目标跟踪算法代码解析)
参考
[1] 【MOT】详解SORT与卡尔曼滤波算法
[2] 图说卡尔曼滤波,一份通俗易懂的教程
[3] 目标跟踪初探(DeepSORT)
[4] 马氏距离通俗理解
[5] 关于 Deep Sort 的一些理解
[6] 多目标跟踪(MOT)入门
[7] Deep SORT多目标跟踪算法代码解析