目标跟踪(一)DeepSORT论文及代码分析

因为DeepSORT模型内容比较复杂,涉及到背景只是也是非常复杂,最近也是再研究目标跟踪,这里通过写简书的形式进行知识点梳理也方便自己的回忆。
首先贴出论文地址以及对应的代码:
论文:SIMPLE ONLINE AND REALTIME TRACKING WITH A DEEP ASSOCIATION METRIC
代码:Yolov5_DeepSort_Pytorch


一、 背景知识

1. 1 卡尔曼滤波(Kalman Filter)

这里我们可以参考这篇文章【MOT】详解SORT与卡尔曼滤波算法, 这篇文章讲解相对较易理
解, 其对应的原版英文地址 How a Kalman filter works, in pictures。如果想具体了解卡尔曼滤波推导公式可以参考卡尔曼滤波详解
卡尔曼滤波算法分为两个过程分别是预测更新
这里关于预测涉及两个方面即卡尔曼滤波的结果以及传感器的探测结果的过程结合。因为两者之间都有一定的误差但是结合两者的结论进行分析,结果会有一定的提升。卡尔曼滤波可以基于目标前一时刻的位置,来预测当前时刻的位置,并且可以比传感器(在目标跟踪中即目标检测器,比如Yolo等)更准确的估计目标的位置。


如上图所示,假设我们要跟踪小车的位置变化,蓝色的分布表示的是卡尔曼滤波的预测值,棕色的分布表示的是传感器的测量值,灰色的部分就是预测值基于测量值更新后的最优估计值。
根据上一个状态(如在该情境下状态表示的是目标的位置及目标的速度)预测现在的状态(假设物体运动的状态按照匀速行驶),我们即可的到现在预测的状态。
【1】均值(状态)
{\hat{x}_{k-1} = \left\{ \begin{aligned} \hat{p}_{k-1}&&上一帧位置状态 \\ \hat{v}_{k-1}&&上一帧速度状态\\ \end{aligned} \right. }

{\hat{x}_{k} = \left\{ \begin{aligned} \hat{p}_{k} = \hat{p}_{k-1} + \hat{v}_{k-1} * t&&此刻位置状态 \\ \hat{v}_{k} = \hat{v}_{k-1}&&此刻速度状态\\ \end{aligned} \right. }

【2】协方差矩阵
因为状态元素之间的因素是相互影响的,举一个很简单的🌰,速度v越快我们得到的位移也会越大。因此我们可以用协方差矩阵P_{k-1}来表现两者元素之间的影响关系。
【3】传感器误差
当然我们需要考虑到引入传感器的带来的误差,该误差的范围可以用协方差\color{red}{R_k}表示。

1.2 SORT中的卡尔曼滤波

在目标跟踪的过程中我们主要估计track的两个重要的状态:均值方差
均值:主要表示目标的位置信息,由bbox的中心坐标(c_x, c_y),宽高比r, 高h, 以及各自的速度变化值组成,即[c_x, c_y, r, h, v_x,v_y,v_r, v_h], 各个速度初始化为0
协方差:表示目标位置信息的不确定性,由上述的8个因素对应组成的8*8的对角矩阵,矩阵中数字越大则表明不确定性越大,可以任意初始化。

这里关于卡尔曼滤波可以分为6个步骤:
【1】类初始化 init
【2】初始化状态(mean)与状态协方差(convariance)的函数的initiate
【3】预测阶段函数predict
【4】分布转换函数project
【5】更新阶段函数update
【6】计算状态分布和测量(检测框)之间距离函数gating_distance。

1.2.1 类初始化

class KalmanFilter(object):
    """
    A simple Kalman filter for tracking bounding boxes in image space.

    The 8-dimensional state space

        x, y, a, h, vx, vy, va, vh

    contains the bounding box center position (x, y), aspect ratio a, height h,
    and their respective velocities.

    Object motion follows a constant velocity model. The bounding box location
    (x, y, a, h) is taken as direct observation of the state space (linear
    observation model).

    """

    def __init__(self):
        ndim, dt = 4, 1.

        # Create Kalman filter model matrices.
        # 卡尔曼滤波的转移矩阵
        self._motion_mat = np.eye(2 * ndim, 2 * ndim)
        for i in range(ndim):
            self._motion_mat[i, ndim + i] = dt
        # 卡尔曼滤波投射矩阵
        self._update_mat = np.eye(ndim, 2 * ndim)

        # Motion and observation uncertainty are chosen relative to the current
        # state estimate. These weights control the amount of uncertainty in
        # the model. This is a bit hacky.
        self._std_weight_position = 1. / 20
        self._std_weight_velocity = 1. / 160

self._motion_mat:


self._update_mat:

1.2.2 初始化参数

def _initiate_track(self, detection):
        mean, covariance = self.kf.initiate(detection.to_xyah())
    def initiate(self, measurement):
        """Create track from unassociated measurement.

        Parameters
        ----------
        measurement : ndarray
            Bounding box coordinates (x, y, a, h) with center position (x, y),
            aspect ratio a, and height h.

        Returns
        -------
        (ndarray, ndarray)
            Returns the mean vector (8 dimensional) and covariance matrix (8x8
            dimensional) of the new track. Unobserved velocities are initialized
            to 0 mean.

        """
        mean_pos = measurement
        mean_vel = np.zeros_like(mean_pos)
        mean = np.r_[mean_pos, mean_vel]

        std = [
            2 * self._std_weight_position * measurement[0],   # the center point x
            2 * self._std_weight_position * measurement[1],   # the center point y
            1 * measurement[2],                               # the ratio of width/height
            2 * self._std_weight_position * measurement[3],   # the height
            10 * self._std_weight_velocity * measurement[0],
            10 * self._std_weight_velocity * measurement[1],
            0.1 * measurement[2],
            10 * self._std_weight_velocity * measurement[3]]
        covariance = np.diag(np.square(std))
        return mean, covariance

mean: eg:

mean=[x,y,a,h,0,0,0,0]

covariance: eg
covariance

1.2.3 预测


⭐️ \color{red}{因为我们在进行目标检测的时候按照一帧一帧检测,所以是匀速模型(constant velocity model),所以在这里我们对应的上述的公式}\color{orange}{\vec{u}}为0(这里可以理解为加速度)。

🌲预测公式:
\color{red}{\hat{x}_k} = F_{k}\color{blue}{\hat{x}_{k-1}}
\color{red}{\hat{P}_k} = F_{k}\color{blue}{P_{k-1}}F_{k}^{T} + Q_{k}(这里的噪声矩阵,代表整个系统的可靠程度,一般初始化很小的值)
我们这里的F称作为状态转移矩阵,该公式预测t时刻的均值。

    def predict(self, mean, covariance):
        """Run Kalman filter prediction step.

        Parameters
        ----------
        mean : ndarray
            The 8 dimensional mean vector of the object state at the previous
            time step.
        covariance : ndarray
            The 8x8 dimensional covariance matrix of the object state at the
            previous time step.

        Returns
        -------
        (ndarray, ndarray)
            Returns the mean vector and covariance matrix of the predicted
            state. Unobserved velocities are initialized to 0 mean.

        """
        std_pos = [
            self._std_weight_position * mean[0],
            self._std_weight_position * mean[1],
            1 * mean[2],
            self._std_weight_position * mean[3]]
        std_vel = [
            self._std_weight_velocity * mean[0],
            self._std_weight_velocity * mean[1],
            0.1 * mean[2],
            self._std_weight_velocity * mean[3]]

        motion_cov = np.diag(np.square(np.r_[std_pos, std_vel]))
        mean = np.dot(self._motion_mat, mean)
        covariance = np.linalg.multi_dot((
            self._motion_mat, covariance, self._motion_mat.T)) + motion_cov
        return mean, covariance

motion_conv 代表的是Q_{k}motion_cov = np.diag(np.square(np.r_[std_pos, std_vel]))


根据公式:\color{red}{\hat{x}_k} = F_{k}\color{blue}{\hat{x}_{k-1}}
meanmean = np.dot(self._motion_mat, mean), eg:

根据公式:\color{red}{\hat{P}_k} = F_{k}\color{blue}{P_{k-1}}F_{k}^{T} + Q_{k}
covariancecovariance = np.linalg.multi_dot((self._motion_mat, covariance, self._motion_mat.T)) + motion_cov, eg:

1.2.4 投射(分布矩阵)

跟踪的空间向量与传感器向量空间不一致,需要借助投射矩阵将其转换在同一个向量空间。例如在我们跟踪的向量空间不仅包含目标的位置也包含其速度变化,但是在传感器向量空间是没有的(即目标检测仅仅输出目标的位置,没有输出目标运动的速度信息),需要进行转换将速度信息忽略。


根据如下公式:



代码如下:

    def project(self, mean, covariance):
        """Project state distribution to measurement space.

        Parameters
        ----------
        mean : ndarray
            The state's mean vector (8 dimensional array).
        covariance : ndarray
            The state's covariance matrix (8x8 dimensional).

        Returns
        -------
        (ndarray, ndarray)
            Returns the projected mean and covariance matrix of the given state
            estimate.

        """
        std = [
            self._std_weight_position * mean[0],
            self._std_weight_position * mean[1],
            0.1 * mean[2],
            self._std_weight_position * mean[3]]
        innovation_cov = np.diag(np.square(std))
        mean = np.dot(self._update_mat, mean) # 将均值向量映射到检测空间,由于测量矩阵H是一个4X8的单位矩阵,所以就相当于提取前4个位置向量[cx, cy, r, h] 8 位变4位
        covariance = np.linalg.multi_dot((
            self._update_mat, covariance, self._update_mat.T)) # 8*8变成4*4
        return mean, covariance + innovation_cov

我们再回忆下投射矩阵
self._update_mat:


就很容易理解如何忽略速度信息。
\vec\mu_{expected} = H_{k}\hat{x}_{k}mean = np.dot(self._update_mat, mean)
\Sigma_{expected} = H_{k}P_{k}H_{K}^{T}covariance = np.linalg.multi_dot(( self._update_mat, covariance, self._update_mat.T)) # 8*8变成4*4

这里需要强调一点即一开始强调的传感器噪声 \color{red}{R_{k}}covariance + innovation_cov

1.2.5 更新

首先贴出更新公式:


这里的K指的是卡尔曼增益,\color{green}{\vec{z}}读数的均值分布, \color{blue}{\hat{x}^{'}_k}是我们最佳估计值,我们可以继续放进去做另一轮预测
代码如下:

    def update(self, mean, covariance, measurement):
        """Run Kalman filter correction step.

        Parameters
        ----------
        mean : ndarray
            The predicted state's mean vector (8 dimensional).
        covariance : ndarray
            The state's covariance matrix (8x8 dimensional).
        measurement : ndarray
            The 4 dimensional measurement vector (x, y, a, h), where (x, y)
            is the center position, a the aspect ratio, and h the height of the
            bounding box.

        Returns
        -------
        (ndarray, ndarray)
            Returns the measurement-corrected state distribution.

        """
        projected_mean, projected_cov = self.project(mean, covariance)

        chol_factor, lower = scipy.linalg.cho_factor(
            projected_cov, lower=True, check_finite=False)
        kalman_gain = scipy.linalg.cho_solve(
            (chol_factor, lower), np.dot(covariance, self._update_mat.T).T,
            check_finite=False).T
        innovation = measurement - projected_mean # measurement(观测值) 只有坐标没有速度, 所以projected_mean 做映射只选取位置信息去掉速度信息

        new_mean = mean + np.dot(innovation, kalman_gain.T)
        new_covariance = covariance - np.linalg.multi_dot((
            kalman_gain, projected_cov, kalman_gain.T))
        return new_mean, new_covariance

① 首先将mean, covariance参数通过投射变换转入到传感器(目标检测器)空间向量中去projected_mean, projected_cov = self.project(mean, covariance)
② 接着计算卡尔曼增益, 因上面在计算协防方差矩阵的时候已经考虑到检测器的误差所以这里的\color{green}{R_k}不需要考虑。

chol_factor, lower = scipy.linalg.cho_factor(projected_cov, lower=True, check_finite=False)
kalman_gain = scipy.linalg.cho_solve((chol_factor, lower), np.dot(covariance, self._update_mat.T).T,check_finite=False).T

我们的到更新后参数,该参数作用于后期预测下一个的卡尔曼滤波的状态值。
③ 计算更新后新的meancovariance

innovation = measurement - projected_mean # measurement
new_mean = mean + np.dot(innovation, kalman_gain.T)
new_covariance = covariance - np.linalg.multi_dot((kalman_gain, projected_cov, kalman_gain.T))

1.2.6 计算状态分布和测量检测框之间距离函数

    def gating_distance(self, mean, covariance, measurements,
                        only_position=False):
        """Compute gating distance between state distribution and measurements.

        A suitable distance threshold can be obtained from `chi2inv95`. If
        `only_position` is False, the chi-square distribution has 4 degrees of
        freedom, otherwise 2.

        Parameters
        ----------
        mean : ndarray
            Mean vector over the state distribution (8 dimensional).
        covariance : ndarray
            Covariance of the state distribution (8x8 dimensional).
        measurements : ndarray
            An Nx4 dimensional matrix of N measurements, each in
            format (x, y, a, h) where (x, y) is the bounding box center
            position, a the aspect ratio, and h the height.
        only_position : Optional[bool]
            If True, distance computation is done with respect to the bounding
            box center position only.

        Returns
        -------
        ndarray
            Returns an array of length N, where the i-th element contains the
            squared Mahalanobis distance between (mean, covariance) and
            `measurements[i]`.

        """
        mean, covariance = self.project(mean, covariance)
        if only_position:
            mean, covariance = mean[:2], covariance[:2, :2]
            measurements = measurements[:, :2]

        cholesky_factor = np.linalg.cholesky(covariance)
        d = measurements - mean
        z = scipy.linalg.solve_triangular(
            cholesky_factor, d.T, lower=True, check_finite=False,
            overwrite_b=True)
        squared_maha = np.sum(z * z, axis=0)
        return squared_maha

这里根据器检测的均值,卡尔曼滤波均值以及卡尔曼滤波协方差矩阵计算卡尔曼滤波得出的估计值(mean, variance)以及检测器得出的估计值(mean1,variance)的马氏距离(Mahalanobis)。
至于为什么要使用马氏距离可以参考这篇博客 马氏距离通俗理解
也可以参考下面这段话

添加马氏距离与余弦距离:实际上是针对运动信息与外观信息的计算。两个名词听着较为陌生,而实际上换句话解释,马氏距离就是“加强版的欧氏距离”。它实际上是规避了欧氏距离中对于数据特征方差不同的风险,在计算中添加了协方差矩阵,其目的就是进行方差归一化,从而使所谓的“距离”更加符合数据特征以及实际意义。马氏距离是对于差异度的衡量中,的一种距离度量方式,而不同于马氏距离,余弦距离则是一种相似度度量方式。前者是针对于位置进行区分,而后者则是针对于方向。换句话说,我们使用余弦距离的时候,可以用来衡量不同个体在维度之间的差异,而一个个体中,维度与维度的差异我们却不好判断,此时我们可以使用马氏距离进行弥补,从而在整体上可以达到一个相对于全面的差异性衡量。而我们之所以要进行差异性衡量,根本目的也是想比较检测器与跟踪器的相似程度,优化度量方式,也可以更好地完成匹配。我们通过该方法可以很好的估算出卡尔曼滤波与检测器的相似度,继而方便后续得出其代价矩阵。

说了这么多咱们终于理解卡尔曼滤波也对其中的代码有了一定的熟悉,下面还有一个大的背景知识点就是匈牙利匹配算法。

1.3 匈牙利匹配算法(Hungarian Matching Algorithm)

为什么要涉及到匈牙利匹配算法呢?这主要原因是我们得出的卡尔曼滤波的出的结果如何与每一个目标id很好的匹配上,匈牙利算法可以告诉我们当前帧的某个目标,是否与前一帧的某个目标相同。

为了能更好的方便解释匈牙利匹配算法,这篇文章 目标跟踪初探(DeepSORT)的示例非常好的进行了诠释。

首先,先介绍一下什么是分配问题(Assignment Problem):假设有N个人和N个任务,每个任务可以任意分配给不同的人,已知每个人完成每个任务要花费的代价不尽相同,那么如何分配可以使得总的代价最小。举个例子,假设现在有3个任务,要分别分配给3个人,每个人完成各个任务所需代价矩阵(cost matrix)如下所示(这个代价可以是金钱、时间等等):

怎样才能找到一个最优分配,使得完成所有任务花费的代价最小呢?
匈牙利算法(又叫KM算法)就是用来解决分配问题的一种方法,它基于定理:
如果代价矩阵的某一行或某一列同时加上或减去某个数,则这个新的代价矩阵的最优分配仍然是原代价矩阵的最优分配。

算法步骤(假设矩阵为NxN方阵):
【1】对于矩阵的每一行,减去其中最小的元素
【2】对于矩阵的每一列,减去其中最小的元素
【3】用最少的水平线或垂直线覆盖矩阵中所有的0
【4】如果线的数量等于N,则找到了最优分配,算法结束,否则进入步骤5
【5】找到没有被任何线覆盖的最小元素,每个没被线覆盖的行减去这个元素,每个被线覆【6】盖的列加上这个元素,返回步骤3

继续拿上面的例子做演示


sklearn里的linear_assignment()函数以及scipy里的linear_sum_assignment()函数都实现了匈牙利算法,两者的返回值的形式不同:

import numpy as np 
from sklearn.utils.linear_assignment_ import linear_assignment
from scipy.optimize import linear_sum_assignment
 

cost_matrix = np.array([
    [15,40,45],
    [20,60,35],
    [20,40,25]
])
 
matches = linear_assignment(cost_matrix)
print('sklearn API result:\n', matches)
matches = linear_sum_assignment(cost_matrix)
print('scipy API result:\n', matches)
 

"""Outputs
sklearn API result:
 [[0 1]
  [1 0]
  [2 2]]
scipy API result:
 (array([0, 1, 2], dtype=int64), array([1, 0, 2], dtype=int64))
"""

二、 流程

介绍了上述的背景知识,我们可以开始我们跟踪算法的流程梳理了,其实以往的SORT算法,就是仅仅根据上述的背景知识就可以进行目标跟踪了。这篇文章中(多目标跟踪(MOT)入门)的很多有关于介绍deepsort流程有很多很棒的图片,通过图片可以很好的去理解整个流程,当然我也会结合代码去梳理里面的逻辑。

2.1 SORT算法流程图

SORT

关键步骤:轨迹卡尔曼滤波预测→ 使用匈牙利算法将预测后的tracks和当前帧中的detecions进行匹配(IOU匹配) → 卡尔曼滤波更新。 对于没有匹配上的轨迹,也不是马上就删掉了,有个T_lost的保存时间,但SORT里把这个时间阈值设置的是1,也就是说对于没匹配上的轨迹相当于直接删了。

2.2 DEEPSORT关键点梳理

DEEPSORT

相对于SORT算法比,该算法多了级联匹配(Matching Cascade)和新轨迹的确认(confirmed)。

2.2.1 跟踪器(Tracker)

在介绍每个零件的之前,我们先介绍每个跟踪器(tracker)的类。

# vim: expandtab:ts=4:sw=4


class TrackState:
    """
    Enumeration type for the single target track state. Newly created tracks are
    classified as `tentative` until enough evidence has been collected. Then,
    the track state is changed to `confirmed`. Tracks that are no longer alive
    are classified as `deleted` to mark them for removal from the set of active
    tracks.

    """

    Tentative = 1
    Confirmed = 2
    Deleted = 3


class Track:
    """
    A single target track with state space `(x, y, a, h)` and associated
    velocities, where `(x, y)` is the center of the bounding box, `a` is the
    aspect ratio and `h` is the height.

    Parameters
    ----------
    mean : ndarray
        Mean vector of the initial state distribution.
    covariance : ndarray
        Covariance matrix of the initial state distribution.
    track_id : int
        A unique track identifier.
    n_init : int
        Number of consecutive detections before the track is confirmed. The
        track state is set to `Deleted` if a miss occurs within the first
        `n_init` frames.
    max_age : int
        The maximum number of consecutive misses before the track state is
        set to `Deleted`.
    feature : Optional[ndarray]
        Feature vector of the detection this track originates from. If not None,
        this feature is added to the `features` cache.

    Attributes
    ----------
    mean : ndarray
        Mean vector of the initial state distribution.
    covariance : ndarray
        Covariance matrix of the initial state distribution.
    track_id : int
        A unique track identifier.
    hits : int
        Total number of measurement updates.
    age : int
        Total number of frames since first occurance.
    time_since_update : int
        Total number of frames since last measurement update.
    state : TrackState
        The current track state.
    features : List[ndarray]
        A cache of features. On each measurement update, the associated feature
        vector is added to this list.

    """

    def __init__(self, mean, covariance, track_id, class_id, n_init, max_age,
                 feature=None):
        self.mean = mean
        self.covariance = covariance
        self.track_id = track_id
        self.class_id = class_id
        self.hits = 1 # 连续匹配多少次
        self.age = 1 
        self.time_since_update = 0 # 有多少帧track还没有匹配上,更新+1, 匹配清0
        self.yolo_bbox = [0, 0, 0, 0]

        self.state = TrackState.Tentative
        self.features = []
        if feature is not None:
            self.features.append(feature)

        self._n_init = n_init
        self._max_age = max_age

    def to_tlwh(self):
        """Get current position in bounding box format `(top left x, top left y,
        width, height)`.

        Returns
        -------
        ndarray
            The bounding box.

        """
        ret = self.mean[:4].copy()
        ret[2] *= ret[3]
        ret[:2] -= ret[2:] / 2
        return ret

    def to_tlbr(self):
        """Get kf estimated current position in bounding box format `(min x, miny, max x,
        max y)`.

        Returns
        -------
        ndarray
            The predicted kf bounding box.

        """
        ret = self.to_tlwh()
        ret[2:] = ret[:2] + ret[2:]
        return ret

    def get_yolo_pred(self):
        """Get yolo prediction`.

        Returns
        -------
        ndarray
            The yolo bounding box.

        """
        return self.yolo_bbox

    def increment_age(self):
        self.age += 1
        self.time_since_update += 1

    def predict(self, kf):
        """Propagate the state distribution to the current time step using a
        Kalman filter prediction step.

        Parameters
        ----------
        kf : kalman_filter.KalmanFilter
            The Kalman filter.

        """
        self.mean, self.covariance = kf.predict(self.mean, self.covariance)
        self.increment_age()

    def update(self, kf, detection, class_id):
        """Perform Kalman filter measurement update step and update the feature
        cache.

        Parameters
        ----------
        kf : kalman_filter.KalmanFilter
            The Kalman filter.
        detection : Detection
            The associated detection.

        """
        self.yolo_bbox = detection
        self.mean, self.covariance = kf.update(
            self.mean, self.covariance, detection.to_xyah())
        self.features.append(detection.feature)
        self.class_id = class_id

        self.hits += 1
        self.time_since_update = 0
        # 匹配的时候(update)time_since_update +需要置0,predict加1
        if self.state == TrackState.Tentative and self.hits >= self._n_init:
            self.state = TrackState.Confirmed

    def mark_missed(self):
        """Mark this track as missed (no association at the current time step).
        """
        if self.state == TrackState.Tentative:
            self.state = TrackState.Deleted
        elif self.time_since_update > self._max_age:
            self.state = TrackState.Deleted

    def is_tentative(self):
        """Returns True if this track is tentative (unconfirmed).
        """
        return self.state == TrackState.Tentative

    def is_confirmed(self):
        """Returns True if this track is confirmed."""
        return self.state == TrackState.Confirmed

    def is_deleted(self):
        """Returns True if this track is dead and should be deleted."""
        return self.state == TrackState.Deleted

2.2.2 目标检测器(Detections)

这里很好理解利用检测器检测出目标在图片中的位置(通常由4个坐标形成),对应的检测器有centernet, yolov系列等。这里使用的是yolov5

        pred = model(img, augment=opt.augment, visualize=visualize)

        # Apply NMS
        pred = non_max_suppression(pred, opt.conf_thres, opt.iou_thres, opt.classes, opt.agnostic_nms, max_det=opt.max_det)

        # Process detections
        for i, det in enumerate(pred):  # detections per image
            seen += 1
            if webcam:  # batch_size >= 1
                p, im0, frame = path[i], im0s[i].copy(), dataset.count
                s += f'{i}: '
            else:
                p, im0, frame = path, im0s.copy(), getattr(dataset, 'frame', 0)

            s += '%gx%g ' % img.shape[2:]  # print string
            save_path = str(Path(out) / Path(p).name)

            annotator = Annotator(im0, line_width=2, pil=not ascii)

            if det is not None and len(det):
                # Rescale boxes from img_size to im0 size
                det[:, :4] = scale_coords(
                    img.shape[2:], det[:, :4], im0.shape).round()

                # Print results
                for c in det[:, -1].unique():
                    n = (det[:, -1] == c).sum()  # detections per class
                    s += f"{n} {names[int(c)]}{'s' * (n > 1)}, "  # add to string

                xywhs = xyxy2xywh(det[:, 0:4])
                confs = det[:, 4]
                clss = det[:, 5]

2.2.3 卡尔曼滤波器(Kalman Filter Predict)

我们基于卡尔曼滤波通过前面帧的信息来预测下一帧目标的位置。这里关于卡尔曼滤波的代码上面介绍了就不再说明了。

2.2.4 确认状态设定(Confirmed)

首先需要说的一点就是图中的tracks对应不同的目标,对应每一个目标则属于一个track。刚开始的时候我们所有的track都属于未确定的状态,如果我们的目标检测器检测到的目标与卡尔曼滤波预测目标匹配的时候我们认为其已经匹配,在这里如果连续3次的时候此时我们设定该track(目标)为确认状态。

2.2.5 IOU匹配(IOU Matched)

这里关于IOU匹配的阈值设定为0.7,如果大于0.7我们认为其是匹配的,否则我们认为是不匹配的, 将该tracker删除。

2.2.6 级联匹配(Matching Cascade)

这里使用级联匹配的目的是两个目标遮挡的情况,刚刚被遮挡的目标的track无法匹配detection,且暂时 从图像中消失,之后背遮挡目标再次出现的时候,应该尽量让被遮挡的目标分配的ID不发生变动,减少ID交换的情况,因此我们就需要用到级联匹配。关于级联匹配的过程我们可以如下的图来理解:

Matching Cascade

【1】首先能进入到级联匹配的预设条件:
连续匹配3次,即该tracker的状态必须为confirmed
【2】并且进入该状态的tracker有一个vip待遇:
即如果遇到检测器级联匹配失败同时IOU匹配失败的情况(缺失一次,missing_age + 1),仍然可以容忍max_age=70帧。如果missing_age超过这个帧数(70)则将该track删除。
【3】级联匹配的代价函数:
级联匹配的代价函数是由卡尔曼滤波的均值(目标坐标)以及协方差与检测器的均值(目标坐标)的马式距离(Mahalanobis distance)检测器目标的ReId特征与卡尔曼滤波ReId特征余弦相似度(Cosine Distance)(⭐️注意这里我们取相似度最大及距离最小的相似度)相加。这里有一个\lambda参数,关于这个参数我们可以根据相机的运动抖动情况判断,当其运动明显的时候,我们设置该参数为0是一个合理的选择。
【4】级联匹配的优先级:
级联匹配优先匹配丢失匹配帧最少的目标,也就是说没有丢失过的轨迹赋予优先匹配的权利,而丢失的最久的轨迹最后匹配
【5】门矩阵(Gate Matrix)
当cost matrix的某些值超出一定范围,我们设定其上限,具体我们可以根据代码细说。

代码如下:

# linear_assignment.py
def matching_cascade(
        distance_metric, max_distance, cascade_depth, tracks, detections,
        track_indices=None, detection_indices=None):
    """Run matching cascade.

    Parameters
    ----------
    distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray
        The distance metric is given a list of tracks and detections as well as
        a list of N track indices and M detection indices. The metric should
        return the NxM dimensional cost matrix, where element (i, j) is the
        association cost between the i-th track in the given track indices and
        the j-th detection in the given detection indices.
    max_distance : float
        Gating threshold. Associations with cost larger than this value are
        disregarded.
    cascade_depth: int
        The cascade depth, should be se to the maximum track age.
    tracks : List[track.Track]
        A list of predicted tracks at the current time step.
    detections : List[detection.Detection]
        A list of detections at the current time step.
    track_indices : Optional[List[int]]
        List of track indices that maps rows in `cost_matrix` to tracks in
        `tracks` (see description above). Defaults to all tracks.
    detection_indices : Optional[List[int]]
        List of detection indices that maps columns in `cost_matrix` to
        detections in `detections` (see description above). Defaults to all
        detections.

    Returns
    -------
    (List[(int, int)], List[int], List[int])
        Returns a tuple with the following three entries:
        * A list of matched track and detection indices.
        * A list of unmatched track indices.
        * A list of unmatched detection indices.

    """
    if track_indices is None:
        track_indices = list(range(len(tracks)))
    if detection_indices is None:
        detection_indices = list(range(len(detections)))

    unmatched_detections = detection_indices
    matches = []
    for level in range(cascade_depth):
        if len(unmatched_detections) == 0:  # No detections left
            break

        track_indices_l = [
            k for k in track_indices
            if tracks[k].time_since_update == 1 + level
        ]
        if len(track_indices_l) == 0:  # Nothing to match at this level
            continue

        matches_l, _, unmatched_detections = \
            min_cost_matching(
                distance_metric, max_distance, tracks, detections,
                track_indices_l, unmatched_detections)
        matches += matches_l
    unmatched_tracks = list(set(track_indices) - set(k for k, _ in matches))
    return matches, unmatched_tracks, unmatched_detections
# tracker.py
def gated_metric(tracks, dets, track_indices, detection_indices):
    features = np.array([dets[i].feature for i in detection_indices])
    targets = np.array([tracks[i].track_id for i in track_indices])
    cost_matrix = self.metric.distance(features, targets)
    cost_matrix = linear_assignment.gate_cost_matrix(
    self.kf, cost_matrix, tracks, dets, track_indices,
    detection_indices)
    return cost_matrix

2.2.7 特征提取

这里我们可以使用简单的reid网络或普通的分类网络即可,只要我们通过该网络可以提取每个目标的特征即可,这里我们通过网络得到128维度的特征。在输入该模型之前我们需要将图片resize(64,128)大小的图片整理成一个batch然后再进入模型。

# 特征提取器
class Extractor(object):
    def __init__(self, model_path, use_cuda=True):
        self.net = Net(reid=True)
        self.device = "cuda" if torch.cuda.is_available() and use_cuda else "cpu"
        state_dict = torch.load(model_path, map_location=torch.device(self.device))[
            'net_dict']
        self.net.load_state_dict(state_dict)
        logger = logging.getLogger("root.tracker")
        logger.info("Loading weights from {}... Done!".format(model_path))
        self.net.to(self.device)
        self.size = (64, 128)
        self.norm = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
        ])

    # 图片预处理
    def _preprocess(self, im_crops):
        """
        TODO:
            1. to float with scale from 0 to 1
            2. resize to (64, 128) as Market1501 dataset did
            3. concatenate to a numpy array
            3. to torch Tensor
            4. normalize
        """
        def _resize(im, size):
            return cv2.resize(im.astype(np.float32)/255., size)

        im_batch = torch.cat([self.norm(_resize(im, self.size)).unsqueeze(
            0) for im in im_crops], dim=0).float()
        return im_batch
  
    # 输入模型
    def __call__(self, im_crops):
        im_batch = self._preprocess(im_crops)
        with torch.no_grad():
            im_batch = im_batch.to(self.device)
            features = self.net(im_batch)
        return features.cpu().numpy()

特征提取网络结构如下,然后得到128维的特征:

# 特征提取网络
class Net(nn.Module):
    def __init__(self, num_classes=751, reid=False):
        super(Net, self).__init__()
        # 3 128 64
        self.conv = nn.Sequential(
            nn.Conv2d(3, 64, 3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            # nn.Conv2d(32,32,3,stride=1,padding=1),
            # nn.BatchNorm2d(32),
            # nn.ReLU(inplace=True),
            nn.MaxPool2d(3, 2, padding=1),
        )
        # 32 64 32
        self.layer1 = make_layers(64, 64, 2, False)
        # 32 64 32
        self.layer2 = make_layers(64, 128, 2, True)
        # 64 32 16
        self.layer3 = make_layers(128, 256, 2, True)
        # 128 16 8
        self.layer4 = make_layers(256, 512, 2, True)
        # 256 8 4
        self.avgpool = nn.AvgPool2d((8, 4), 1)
        # 256 1 1
        self.reid = reid
        self.classifier = nn.Sequential(
            nn.Linear(512, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(256, num_classes),
        )

    def forward(self, x):
        x = self.conv(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        # B x 128
        if self.reid:
            x = x.div(x.norm(p=2, dim=1, keepdim=True))
            return x
        # classifier
        x = self.classifier(x)
        return x

2.2.8 ⭐️参数更新(Filter)

从图中可以看到无论是IOU匹配还是级联匹配,只要匹配上我们就需要更新我们的卡尔曼参数。

2.3 DEEPSORT流程

【1】首先我们输入的图片经过目标检测器2.2.2得到目标框的坐标位置,目标的置信度以及目标的类别
【2】将上述的信息输入到deepsort环节中去

import numpy as np
import torch

from .deep.feature_extractor import Extractor
from .sort.nn_matching import NearestNeighborDistanceMetric
from .sort.detection import Detection
from .sort.tracker import Tracker


__all__ = ['DeepSort']


class DeepSort(object):
    def __init__(self, model_path, max_dist=0.2, min_confidence=0.3, max_iou_distance=0.7, max_age=70, n_init=3, nn_budget=100, use_cuda=True):
        self.min_confidence = min_confidence

        self.extractor = Extractor(model_path, use_cuda=use_cuda)

        max_cosine_distance = max_dist
        metric = NearestNeighborDistanceMetric(
            "cosine", max_cosine_distance, nn_budget)
        self.tracker = Tracker(
            metric, max_iou_distance=max_iou_distance, max_age=max_age, n_init=n_init)

    def update(self, bbox_xywh, confidences, classes, ori_img, use_yolo_preds=True):
        self.height, self.width = ori_img.shape[:2]
        # generate detections
        features = self._get_features(bbox_xywh, ori_img)
        bbox_tlwh = self._xywh_to_tlwh(bbox_xywh)
        detections = [Detection(bbox_tlwh[i], conf, features[i]) for i, conf in enumerate(
            confidences) if conf > self.min_confidence]

        # run on non-maximum supression
        boxes = np.array([d.tlwh for d in detections])
        scores = np.array([d.confidence for d in detections])

        # update tracker
        self.tracker.predict()
        self.tracker.update(detections, classes)

        # output bbox identities
        outputs = []
        for track in self.tracker.tracks:
            if not track.is_confirmed() or track.time_since_update > 1:
                continue
            if use_yolo_preds:
                det = track.get_yolo_pred()
                x1, y1, x2, y2 = self._tlwh_to_xyxy(det.tlwh)
            else:
                box = track.to_tlwh()
                x1, y1, x2, y2 = self._tlwh_to_xyxy(box)
            track_id = track.track_id
            class_id = track.class_id
            outputs.append(np.array([x1, y1, x2, y2, track_id, class_id], dtype=np.int))
        if len(outputs) > 0:
            outputs = np.stack(outputs, axis=0)
        return outputs

    """
    TODO:
        Convert bbox from xc_yc_w_h to xtl_ytl_w_h
    Thanks JieChen91@github.com for reporting this bug!
    """
    @staticmethod
    def _xywh_to_tlwh(bbox_xywh):
        if isinstance(bbox_xywh, np.ndarray):
            bbox_tlwh = bbox_xywh.copy()
        elif isinstance(bbox_xywh, torch.Tensor):
            bbox_tlwh = bbox_xywh.clone()
        bbox_tlwh[:, 0] = bbox_xywh[:, 0] - bbox_xywh[:, 2] / 2.
        bbox_tlwh[:, 1] = bbox_xywh[:, 1] - bbox_xywh[:, 3] / 2.
        return bbox_tlwh

    def _xywh_to_xyxy(self, bbox_xywh):
        x, y, w, h = bbox_xywh
        x1 = max(int(x - w / 2), 0)
        x2 = min(int(x + w / 2), self.width - 1)
        y1 = max(int(y - h / 2), 0)
        y2 = min(int(y + h / 2), self.height - 1)
        return x1, y1, x2, y2

    def _tlwh_to_xyxy(self, bbox_tlwh):
        """
        TODO:
            Convert bbox from xtl_ytl_w_h to xc_yc_w_h
        Thanks JieChen91@github.com for reporting this bug!
        """
        x, y, w, h = bbox_tlwh
        x1 = max(int(x), 0)
        x2 = min(int(x+w), self.width - 1)
        y1 = max(int(y), 0)
        y2 = min(int(y+h), self.height - 1)
        return x1, y1, x2, y2

    def increment_ages(self):
        self.tracker.increment_ages()

    def _xyxy_to_tlwh(self, bbox_xyxy):
        x1, y1, x2, y2 = bbox_xyxy

        t = x1
        l = y1
        w = int(x2 - x1)
        h = int(y2 - y1)
        return t, l, w, h

    def _get_features(self, bbox_xywh, ori_img):
        im_crops = []
        for box in bbox_xywh:
            x1, y1, x2, y2 = self._xywh_to_xyxy(box)
            im = ori_img[y1:y2, x1:x2]
            im_crops.append(im)
        if im_crops:
            features = self.extractor(im_crops)
        else:
            features = np.array([])
        return features

(1) update函数
a. 通过目标的位置以及原图通过特征提取器提取128维度的目标特征,接着过滤掉置信度低于0.3的目标存储到detections类。

class Detection(object):
    """
    This class represents a bounding box detection in a single image.

    Parameters
    ----------
    tlwh : array_like
        Bounding box in format `(x, y, w, h)`.
    confidence : float
        Detector confidence score.
    feature : array_like
        A feature vector that describes the object contained in this image.

    Attributes
    ----------
    tlwh : ndarray
        Bounding box in format `(top left x, top left y, width, height)`.
    confidence : ndarray
        Detector confidence score.
    feature : ndarray | NoneType
        A feature vector that describes the object contained in this image.

    """

    def __init__(self, tlwh, confidence, feature):
        self.tlwh = np.asarray(tlwh, dtype=np.float)
        self.confidence = float(confidence)
        self.feature = np.asarray(feature, dtype=np.float32)

    def to_tlbr(self):
        """Convert bounding box to format `(min x, min y, max x, max y)`, i.e.,
        `(top left, bottom right)`.
        """
        ret = self.tlwh.copy()
        ret[2:] += ret[:2]
        return ret

    def to_xyah(self):
        """Convert bounding box to format `(center x, center y, aspect ratio,
        height)`, where the aspect ratio is `width / height`.
        """
        ret = self.tlwh.copy()
        ret[:2] += ret[2:] / 2
        ret[2] /= ret[3]
        return ret
features = self._get_features(bbox_xywh, ori_img)
bbox_tlwh = self._xywh_to_tlwh(bbox_xywh)
detections = [Detection(bbox_tlwh[i], conf, features[i]) for i, conf in enumerate(confidences) if conf > self.min_confidence]

b. 卡尔曼滤波更新与预测

self.tracker.predict()
self.tracker.update(detections, classes)

预测的时候, 输入mean以及convariance,输出新的mean, 新的convariance,🚩同时age以及,time_since_update 加1。

    def increment_age(self):
        self.age += 1  # 该track自出现以来的总帧数加1
        self.time_since_update += 1 #  # 该track自最近一次更新以来的总帧数加1

    def predict(self, kf):
        """Propagate the state distribution to the current time step using a
        Kalman filter prediction step.

        Parameters
        ----------
        kf : kalman_filter.KalmanFilter
            The Kalman filter.

        """
        self.mean, self.covariance = kf.predict(self.mean, self.covariance)
        self.increment_age()

更新的时候是一个相对复杂的过程。

    def update(self, detections, classes):
        """Perform measurement update and track management.

        Parameters
        ----------
        detections : List[deep_sort.detection.Detection]
            A list of detections at the current time step.

        """
        # Run matching cascade.
        matches, unmatched_tracks, unmatched_detections = \
            self._match(detections)

        # Update track set.
        for track_idx, detection_idx in matches: # 对匹配上的进行参数更新
            self.tracks[track_idx].update(
                self.kf, detections[detection_idx], classes[detection_idx])
        for track_idx in unmatched_tracks:
            self.tracks[track_idx].mark_missed()
        for detection_idx in unmatched_detections:
            self._initiate_track(detections[detection_idx], classes[detection_idx].item())
        self.tracks = [t for t in self.tracks if not t.is_deleted()]

        # Update distance metric.
        active_targets = [t.track_id for t in self.tracks if t.is_confirmed()]
        features, targets = [], []
        for track in self.tracks:
            if not track.is_confirmed():
                continue
            features += track.features
            targets += [track.track_id for _ in track.features]
            track.features = []
        self.metric.partial_fit(
            np.asarray(features), np.asarray(targets), active_targets)

    def _match(self, detections):

        def gated_metric(tracks, dets, track_indices, detection_indices):
            features = np.array([dets[i].feature for i in detection_indices])
            targets = np.array([tracks[i].track_id for i in track_indices])
            cost_matrix = self.metric.distance(features, targets)
            cost_matrix = linear_assignment.gate_cost_matrix(
                self.kf, cost_matrix, tracks, dets, track_indices,
                detection_indices)

            return cost_matrix

        # Split track set into confirmed and unconfirmed tracks.
        confirmed_tracks = [
            i for i, t in enumerate(self.tracks) if t.is_confirmed()]
        unconfirmed_tracks = [
            i for i, t in enumerate(self.tracks) if not t.is_confirmed()]

        # Associate confirmed tracks using appearance features.
       # 对confirmd tracks进行级联匹配
        matches_a, unmatched_tracks_a, unmatched_detections = \
            linear_assignment.matching_cascade(
                gated_metric, self.metric.matching_threshold, self.max_age,
                self.tracks, detections, confirmed_tracks)

        # Associate remaining tracks together with unconfirmed tracks using IOU.
        # 将所有状态为未确定态的轨迹和刚刚没有匹配上的轨迹组合为iou_track_candidates,进行IoU的匹配
        iou_track_candidates = unconfirmed_tracks + [ # a 是级联匹配失败剩下的
            k for k in unmatched_tracks_a if
            self.tracks[k].time_since_update == 1] # 刚刚没有匹配上
        # 未匹配
        unmatched_tracks_a = [
            k for k in unmatched_tracks_a if
            self.tracks[k].time_since_update != 1]  # 已经很久没有匹配上
        matches_b, unmatched_tracks_b, unmatched_detections = \
            linear_assignment.min_cost_matching(
                iou_matching.iou_cost, self.max_iou_distance, self.tracks,
                detections, iou_track_candidates, unmatched_detections)

        matches = matches_a + matches_b
        unmatched_tracks = list(set(unmatched_tracks_a + unmatched_tracks_b))
        return matches, unmatched_tracks, unmatched_detections
# linear_assignment.py
def matching_cascade(
        distance_metric, max_distance, cascade_depth, tracks, detections,
        track_indices=None, detection_indices=None):
    """Run matching cascade.

    Parameters
    ----------
    distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray
        The distance metric is given a list of tracks and detections as well as
        a list of N track indices and M detection indices. The metric should
        return the NxM dimensional cost matrix, where element (i, j) is the
        association cost between the i-th track in the given track indices and
        the j-th detection in the given detection indices.
    max_distance : float
        Gating threshold. Associations with cost larger than this value are
        disregarded.
    cascade_depth: int
        The cascade depth, should be se to the maximum track age.
    tracks : List[track.Track]
        A list of predicted tracks at the current time step.
    detections : List[detection.Detection]
        A list of detections at the current time step.
    track_indices : Optional[List[int]]
        List of track indices that maps rows in `cost_matrix` to tracks in
        `tracks` (see description above). Defaults to all tracks.
    detection_indices : Optional[List[int]]
        List of detection indices that maps columns in `cost_matrix` to
        detections in `detections` (see description above). Defaults to all
        detections.

    Returns
    -------
    (List[(int, int)], List[int], List[int])
        Returns a tuple with the following three entries:
        * A list of matched track and detection indices.
        * A list of unmatched track indices.
        * A list of unmatched detection indices.

    """
    if track_indices is None:
        track_indices = list(range(len(tracks)))
    if detection_indices is None:
        detection_indices = list(range(len(detections)))

    unmatched_detections = detection_indices
    matches = []
   # 由小到大依次对每个level的tracks做匹配, 最近丢失匹配帧越小的目标越先进行匹配
    for level in range(cascade_depth):
        if len(unmatched_detections) == 0:  # No detections left
            break

        track_indices_l = [
            k for k in track_indices
            if tracks[k].time_since_update == 1 + level
        ]
        if len(track_indices_l) == 0:  # Nothing to match at this level
            continue

        matches_l, _, unmatched_detections = \
            min_cost_matching(
                distance_metric, max_distance, tracks, detections,
                track_indices_l, unmatched_detections)
        matches += matches_l
    unmatched_tracks = list(set(track_indices) - set(k for k, _ in matches))
    return matches, unmatched_tracks, unmatched_detections

首先通过_self.match函数来判断哪些目标匹配,哪些卡尔曼滤波没有匹配,哪些目标检测没有匹配。接着由小到大依次对每个level的tracks做匹配, 最近丢失匹配帧越小的目标越先进行匈牙利匹配。

matches, unmatched_tracks, unmatched_detections = \
self._match(detections)

匈牙利匹配如下代码,这部分代码中还有一个核心的函数min_cost_matching,这个函数可以接收不同的distance_metric,在级联匹配和IoU匹配中都有用到。
如:

        # Associate confirmed tracks using appearance features.
        matches_a, unmatched_tracks_a, unmatched_detections = \
            linear_assignment.matching_cascade(
                gated_metric, self.metric.matching_threshold, self.max_age,
                self.tracks, detections, confirmed_tracks)


        matches_b, unmatched_tracks_b, unmatched_detections = \
            linear_assignment.min_cost_matching(
                iou_matching.iou_cost, self.max_iou_distance, self.tracks,
                detections, iou_track_candidates, unmatched_detections)
def min_cost_matching(
        distance_metric, max_distance, tracks, detections, track_indices=None,
        detection_indices=None):
    """Solve linear assignment problem.

    Parameters
    ----------
    distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray
        The distance metric is given a list of tracks and detections as well as
        a list of N track indices and M detection indices. The metric should
        return the NxM dimensional cost matrix, where element (i, j) is the
        association cost between the i-th track in the given track indices and
        the j-th detection in the given detection_indices.
    max_distance : float
        Gating threshold. Associations with cost larger than this value are
        disregarded.
    tracks : List[track.Track]
        A list of predicted tracks at the current time step.
    detections : List[detection.Detection]
        A list of detections at the current time step.
    track_indices : List[int]
        List of track indices that maps rows in `cost_matrix` to tracks in
        `tracks` (see description above).
    detection_indices : List[int]
        List of detection indices that maps columns in `cost_matrix` to
        detections in `detections` (see description above).

    Returns
    -------
    (List[(int, int)], List[int], List[int])
        Returns a tuple with the following three entries:
        * A list of matched track and detection indices.
        * A list of unmatched track indices.
        * A list of unmatched detection indices.

    """
    if track_indices is None:
        track_indices = np.arange(len(tracks))
    if detection_indices is None:
        detection_indices = np.arange(len(detections))

    if len(detection_indices) == 0 or len(track_indices) == 0:
        return [], track_indices, detection_indices  # Nothing to match.
   # -----------------------------------------
    # Gated_distance——>
    #       1. cosine distance
    #       2. 马氏距离
    # 得到代价矩阵
    # -----------------------------------------
    # iou_cost——>
    #       仅仅计算track和detection之间的iou距离
    # -----------------------------------------

    cost_matrix = distance_metric(
        tracks, detections, track_indices, detection_indices)
    cost_matrix[cost_matrix > max_distance] = max_distance + 1e-5
    # -----------------------------------------
    # gated_distance中设置距离中最高上限,
    # 这里最远距离实际是在deep sort类中的max_dist参数设置的
    # 默认max_dist=0.2, 距离越小越好
    # -----------------------------------------
    # iou_cost情况下,max_distance的设置对应tracker中的max_iou_distance,
    # 默认值为max_iou_distance=0.7
    # 注意结果是1-iou,所以越小越好
    # -----------------------------------------

    row_indices, col_indices = linear_assignment(cost_matrix)

    matches, unmatched_tracks, unmatched_detections = [], [], []
    for col, detection_idx in enumerate(detection_indices): # 没匹配的tdetection有哪些
        if col not in col_indices: # 查看检测和匈牙利匹配是否对应 对应上了则匹配,对应不上不匹配
            unmatched_detections.append(detection_idx)
    for row, track_idx in enumerate(track_indices): # 没匹配的track有哪些
        if row not in row_indices:
            unmatched_tracks.append(track_idx)
    for row, col in zip(row_indices, col_indices):查看卡尔曼匹配和匈牙利匹配是否对应 对应上了则匹配,对应不上不匹配
        track_idx = track_indices[row]
        detection_idx = detection_indices[col]
        if cost_matrix[row, col] > max_distance: # 就算很匹配,但是当其距离大于0.7仍放到没配对中
            unmatched_tracks.append(track_idx)
            unmatched_detections.append(detection_idx)
        else:
            matches.append((track_idx, detection_idx))
    return matches, unmatched_tracks, unmatched_detections

首先通过如下代码计算mean,covarinace马氏距离以及特征的余弦距离。

def gated_metric(tracks, dets, track_indices, detection_indices):
      features = np.array([dets[i].feature for i in detection_indices])
      targets = np.array([tracks[i].track_id for i in track_indices])
      cost_matrix = self.metric.distance(features, targets)
      cost_matrix = linear_assignment.gate_cost_matrix(self.kf,cost_matrix, tracks, dets, track_indices,detection_indices)
      return cost_matrix
    cost_matrix = distance_metric(tracks, detections, track_indices, detection_indices)
    cost_matrix[cost_matrix > max_distance] = max_distance + 1e-5

这里linear_assignment.gate_cost_matrix函数如下.

门控矩阵的作用就是通过计算卡尔曼滤波的状态分布和测量值之间的距离对代价矩阵进行限制。代价矩阵中的距离是Track和Detection之间的表观相似度,假如一个轨迹要去匹配两个表观特征非常相似的Detection,这样就很容易出错,但是这个时候分别让两个Detection计算与这个轨迹的马氏距离,并使用一个阈值gating_threshold进行限制,所以就可以将马氏距离较远的那个Detection区分开,可以降低错误的匹配。

def gate_cost_matrix(
        kf, cost_matrix, tracks, detections, track_indices, detection_indices,
        gated_cost=INFTY_COST, only_position=False):
    """Invalidate infeasible entries in cost matrix based on the state
    distributions obtained by Kalman filtering.

    Parameters
    ----------
    kf : The Kalman filter.
    cost_matrix : ndarray
        The NxM dimensional cost matrix, where N is the number of track indices
        and M is the number of detection indices, such that entry (i, j) is the
        association cost between `tracks[track_indices[i]]` and
        `detections[detection_indices[j]]`.
    tracks : List[track.Track]
        A list of predicted tracks at the current time step.
    detections : List[detection.Detection]
        A list of detections at the current time step.
    track_indices : List[int]
        List of track indices that maps rows in `cost_matrix` to tracks in
        `tracks` (see description above).
    detection_indices : List[int]
        List of detection indices that maps columns in `cost_matrix` to
        detections in `detections` (see description above).
    gated_cost : Optional[float]
        Entries in the cost matrix corresponding to infeasible associations are
        set this value. Defaults to a very large value.
    only_position : Optional[bool]
        If True, only the x, y position of the state distribution is considered
        during gating. Defaults to False.

    Returns
    -------
    ndarray
        Returns the modified cost matrix.

    """
    gating_dim = 2 if only_position else 4
    gating_threshold = kalman_filter.chi2inv95[gating_dim]
    measurements = np.asarray(
        [detections[i].to_xyah() for i in detection_indices])
    for row, track_idx in enumerate(track_indices):
        track = tracks[track_idx]
        gating_distance = kf.gating_distance(
            track.mean, track.covariance, measurements, only_position)
        cost_matrix[row, gating_distance > gating_threshold] = gated_cost
    return cost_matrix

如果级联匹配失败的我们将进行IOU匹配。

# Associate remaining tracks together with unconfirmed tracks using IOU.
#  对级联匹配中未匹配的tracks和unconfirmed tracks中time_since_update为1(大于1说明一直没有匹配以前做过iou匹配了, 这里就不需要了)的tracks进行IOU匹配
iou_track_candidates = unconfirmed_tracks + [ # a 是级联匹配失败剩下的
    k for k in unmatched_tracks_a if
    self.tracks[k].time_since_update == 1] # 刚刚没有匹配上
    # 未匹配
    unmatched_tracks_a = [k for k in unmatched_tracks_a if
            self.tracks[k].time_since_update != 1]   # 已经很久没有匹配上,说明做过iou匹配了
   matches_b, unmatched_tracks_b, unmatched_detections = \
   linear_assignment.min_cost_matching(
   iou_matching.iou_cost, self.max_iou_distance, self.tracks,
   detections, iou_track_candidates, unmatched_detections)
# iou_matching.py

def iou(bbox, candidates):
    """Computer intersection over union.

    Parameters
    ----------
    bbox : ndarray
        A bounding box in format `(top left x, top left y, width, height)`.
    candidates : ndarray
        A matrix of candidate bounding boxes (one per row) in the same format
        as `bbox`.

    Returns
    -------
    ndarray
        The intersection over union in [0, 1] between the `bbox` and each
        candidate. A higher score means a larger fraction of the `bbox` is
        occluded by the candidate.

    """
    bbox_tl, bbox_br = bbox[:2], bbox[:2] + bbox[2:]
    candidates_tl = candidates[:, :2]
    candidates_br = candidates[:, :2] + candidates[:, 2:]

    tl = np.c_[np.maximum(bbox_tl[0], candidates_tl[:, 0])[:, np.newaxis],
               np.maximum(bbox_tl[1], candidates_tl[:, 1])[:, np.newaxis]]
    br = np.c_[np.minimum(bbox_br[0], candidates_br[:, 0])[:, np.newaxis],
               np.minimum(bbox_br[1], candidates_br[:, 1])[:, np.newaxis]]
    wh = np.maximum(0., br - tl)

    area_intersection = wh.prod(axis=1)
    area_bbox = bbox[2:].prod()
    area_candidates = candidates[:, 2:].prod(axis=1)
    return area_intersection / (area_bbox + area_candidates - area_intersection)


def iou_cost(tracks, detections, track_indices=None,
             detection_indices=None):
    """An intersection over union distance metric.

    Parameters
    ----------
    tracks : List[deep_sort.track.Track]
        A list of tracks.
    detections : List[deep_sort.detection.Detection]
        A list of detections.
    track_indices : Optional[List[int]]
        A list of indices to tracks that should be matched. Defaults to
        all `tracks`.
    detection_indices : Optional[List[int]]
        A list of indices to detections that should be matched. Defaults
        to all `detections`.

    Returns
    -------
    ndarray
        Returns a cost matrix of shape
        len(track_indices), len(detection_indices) where entry (i, j) is
        `1 - iou(tracks[track_indices[i]], detections[detection_indices[j]])`.

    """
    if track_indices is None:
        track_indices = np.arange(len(tracks))
    if detection_indices is None:
        detection_indices = np.arange(len(detections))

    cost_matrix = np.zeros((len(track_indices), len(detection_indices)))
    for row, track_idx in enumerate(track_indices):
        if tracks[track_idx].time_since_update > 1:
            cost_matrix[row, :] = linear_assignment.INFTY_COST
            continue

        bbox = tracks[track_idx].to_tlwh()
        candidates = np.asarray(
            [detections[i].tlwh for i in detection_indices])
        cost_matrix[row, :] = 1. - iou(bbox, candidates)
    return cost_matrix

接着我们需要完成update余下的工作

    def mark_missed(self):
        """Mark this track as missed (no association at the current time step).
        """
        if self.state == TrackState.Tentative:
            self.state = TrackState.Deleted
        elif self.time_since_update > self._max_age:
            self.state = TrackState.Deleted
    def _initiate_track(self, detection, class_id):
        mean, covariance = self.kf.initiate(detection.to_xyah())
        self.tracks.append(Track(
            mean, covariance, self._next_id, class_id, self.n_init, self.max_age,
            detection.feature))
        self._next_id += 1


    # 进行测量的更新和轨迹管理
    # Run matching cascade.
    def update(self,detections):
        matches,unmatched_tracks,unmatched_detections=self._match(detections)

        # Update track set.
        # 1.针对匹配上的结果
        for track_idx,detection_idx in matches:
            # track更新对应的detection
            self.tracks[track_idx].update(self.kf, detections[detection_idx])

        # 2.针对未匹配的tracker,调用mark_missed标记
        # track失配,若待定(unconfirmed)则删除,若update时间很久也删除
        # max age是一个存活期限,默认为70帧
        for track_idx in unmatched_tracks:
            self.tracks[track_idx].mark_missed()
        # 3.针对未匹配的detection,detection失配,进行初始化
        for detection_idx in unmatched_detections:
            self._initiate_track(detections[detection_idx])
        # 得到最新的tracks列表,保存的是标记为confirmed和Tentative的track
        self.tracks=[t for t in self.tracks if not t.is_deleted()]
        # Update distance metric.
        active_targets=[t.track_id for t in self.tracks if t.is_confirmed()]
# active_targets框可以在画面展示
        # 获取所有confirmed状态的track id
        features,targets=[],[]
        for track in self.tracks:
            if not track.is_confirmed():
                continue
            features+=track.features  # 将tracks列表拼接到features列表
            # 获取每个feature对应的track id
            targets+=[track.track_id for _ in track.features]
            track.features=[]
        # 距离度量中的特征集更新
        self.metric.partial_fit(np.asarray(features),np.asarray(targets),
                                                          active_targets)
class NearestNeighborDistanceMetric(object):
    # 对于每个目标,返回一个最近的距离
    def __init__(self, metric, matching_threshold, budget=None):
        # 默认matching_threshold = 0.2 budge = 100
        if metric == "euclidean":
            # 使用最近邻欧氏距离
            self._metric = _nn_euclidean_distance
        elif metric == "cosine":
            # 使用最近邻余弦距离
            self._metric = _nn_cosine_distance
        else:
            raise ValueError("Invalid metric; must be either 'euclidean' or 'cosine'")

        self.matching_threshold = matching_threshold
        # 在级联匹配的函数中调用
        self.budget = budget
        # budge 预算,控制feature的多少
        self.samples = {}
        # samples是一个字典{id->feature list}

    def partial_fit(self, features, targets, active_targets):
        # 作用:部分拟合,用新的数据更新测量距离
        # 调用:在特征集更新模块部分调用,tracker.update()中
        for feature, target in zip(features, targets):
            self.samples.setdefault(target, []).append(feature)
            # 对应目标下添加新的feature,更新feature集合
            # 目标id  :  feature list
            if self.budget is not None:
                self.samples[target] = self.samples[target][-self.budget:]
            # 设置预算,每个类最多多少个目标,超过直接忽略

        # 筛选激活的目标
        self.samples = {k: self.samples[k] for k in active_targets}

    def distance(self, features, targets):
        # 作用:比较feature和targets之间的距离,返回一个代价矩阵
        # 调用:在匹配阶段,将distance封装为gated_metric,
        #       进行外观信息(reid得到的深度特征)+
        #       运动信息(马氏距离用于度量两个分布相似程度)
        cost_matrix = np.zeros((len(targets), len(features)))
        for i, target in enumerate(targets):
            cost_matrix[i, :] = self._metric(self.samples[target], features)
        return cost_matrix

★上述中应该是每一个目标tracker每一帧保留一个feature, 每次都与其自己的feature作相似度匹配,选最小的,同时随着时间推移不断更新feature budget(这里设置为100), 即每个activate的追踪器保留最近的self.budget条特征,就是自己检测器检测的特征与以往检测出来的feature作比较,选取最接近的feature作为判断,代价函数越小就越属于那一个id。

整体流程可以参考改图( 来源:Deep SORT多目标跟踪算法代码解析

image.png

参考

[1] 【MOT】详解SORT与卡尔曼滤波算法
[2] 图说卡尔曼滤波,一份通俗易懂的教程
[3] 目标跟踪初探(DeepSORT)
[4] 马氏距离通俗理解
[5] 关于 Deep Sort 的一些理解
[6] 多目标跟踪(MOT)入门
[7] Deep SORT多目标跟踪算法代码解析

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,928评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,192评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,468评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,186评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,295评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,374评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,403评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,186评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,610评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,906评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,075评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,755评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,393评论 3 320
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,079评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,313评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,934评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,963评论 2 351

推荐阅读更多精彩内容