一、 背景知识
1. 1 卡尔曼滤波(Kalman Filter)
这里我们可以参考这篇文章【MOT】详解SORT与卡尔曼滤波算法, 这篇文章讲解相对较易理
解, 其对应的原版英文地址 How a Kalman filter works, in pictures。如果想具体了解卡尔曼滤波推导公式可以参考卡尔曼滤波详解。
1.2 SORT中的卡尔曼滤波
在目标跟踪的过程中我们主要估计track的两个重要的状态:均值, 方差。
, 高
, 以及各自的速度变化值组成,即
, 各个速度初始化为0。
【1】类初始化 init
1.2.1 类初始化
class KalmanFilter(object):
A simple Kalman filter for tracking bounding boxes in image space.
The 8-dimensional state space
x, y, a, h, vx, vy, va, vh
contains the bounding box center position (x, y), aspect ratio a, height h,
and their respective velocities.
Object motion follows a constant velocity model. The bounding box location
(x, y, a, h) is taken as direct observation of the state space (linear
observation model).
def __init__(self):
ndim, dt = 4, 1.
# Create Kalman filter model matrices.
# 卡尔曼滤波的转移矩阵
self._motion_mat = np.eye(2 * ndim, 2 * ndim)
for i in range(ndim):
self._motion_mat[i, ndim + i] = dt
# 卡尔曼滤波投射矩阵
self._update_mat = np.eye(ndim, 2 * ndim)
# Motion and observation uncertainty are chosen relative to the current
# state estimate. These weights control the amount of uncertainty in
# the model. This is a bit hacky.
self._std_weight_position = 1. / 20
self._std_weight_velocity = 1. / 160
:1.2.2 初始化参数
def _initiate_track(self, detection):
mean, covariance = self.kf.initiate(detection.to_xyah())
def initiate(self, measurement):
"""Create track from unassociated measurement.
measurement : ndarray
Bounding box coordinates (x, y, a, h) with center position (x, y),
aspect ratio a, and height h.
(ndarray, ndarray)
Returns the mean vector (8 dimensional) and covariance matrix (8x8
dimensional) of the new track. Unobserved velocities are initialized
to 0 mean.
mean_pos = measurement
mean_vel = np.zeros_like(mean_pos)
mean = np.r_[mean_pos, mean_vel]
std = [
2 * self._std_weight_position * measurement[0], # the center point x
2 * self._std_weight_position * measurement[1], # the center point y
1 * measurement[2], # the ratio of width/height
2 * self._std_weight_position * measurement[3], # the height
10 * self._std_weight_velocity * measurement[0],
10 * self._std_weight_velocity * measurement[1],
0.1 * measurement[2],
10 * self._std_weight_velocity * measurement[3]]
covariance = np.diag(np.square(std))
return mean, covariance
: eg:
: eg 1.2.3 预测
def predict(self, mean, covariance):
"""Run Kalman filter prediction step.
mean : ndarray
The 8 dimensional mean vector of the object state at the previous
time step.
covariance : ndarray
The 8x8 dimensional covariance matrix of the object state at the
previous time step.
(ndarray, ndarray)
Returns the mean vector and covariance matrix of the predicted
state. Unobserved velocities are initialized to 0 mean.
std_pos = [
self._std_weight_position * mean[0],
self._std_weight_position * mean[1],
1 * mean[2],
self._std_weight_position * mean[3]]
std_vel = [
self._std_weight_velocity * mean[0],
self._std_weight_velocity * mean[1],
0.1 * mean[2],
self._std_weight_velocity * mean[3]]
motion_cov = np.diag(np.square(np.r_[std_pos, std_vel]))
mean = np.dot(self._motion_mat, mean)
covariance = np.linalg.multi_dot((
self._motion_mat, covariance, self._motion_mat.T)) + motion_cov
return mean, covariance
代表的是 ➡
motion_cov = np.diag(np.square(np.r_[std_pos, std_vel]))
➡ mean = np.dot(self._motion_mat, mean)
, eg:根据公式:
➡ covariance = np.linalg.multi_dot((self._motion_mat, covariance, self._motion_mat.T)) + motion_cov
, eg:
1.2.4 投射(分布矩阵)
def project(self, mean, covariance):
"""Project state distribution to measurement space.
mean : ndarray
The state's mean vector (8 dimensional array).
covariance : ndarray
The state's covariance matrix (8x8 dimensional).
(ndarray, ndarray)
Returns the projected mean and covariance matrix of the given state
std = [
self._std_weight_position * mean[0],
self._std_weight_position * mean[1],
0.1 * mean[2],
self._std_weight_position * mean[3]]
innovation_cov = np.diag(np.square(std))
mean = np.dot(self._update_mat, mean) # 将均值向量映射到检测空间,由于测量矩阵H是一个4X8的单位矩阵,所以就相当于提取前4个位置向量[cx, cy, r, h] 8 位变4位
covariance = np.linalg.multi_dot((
self._update_mat, covariance, self._update_mat.T)) # 8*8变成4*4
return mean, covariance + innovation_cov
mean = np.dot(self._update_mat, mean)
covariance = np.linalg.multi_dot(( self._update_mat, covariance, self._update_mat.T)) # 8*8变成4*4
covariance + innovation_cov
1.2.5 更新
def update(self, mean, covariance, measurement):
"""Run Kalman filter correction step.
mean : ndarray
The predicted state's mean vector (8 dimensional).
covariance : ndarray
The state's covariance matrix (8x8 dimensional).
measurement : ndarray
The 4 dimensional measurement vector (x, y, a, h), where (x, y)
is the center position, a the aspect ratio, and h the height of the
bounding box.
(ndarray, ndarray)
Returns the measurement-corrected state distribution.
projected_mean, projected_cov = self.project(mean, covariance)
chol_factor, lower = scipy.linalg.cho_factor(
projected_cov, lower=True, check_finite=False)
kalman_gain = scipy.linalg.cho_solve(
(chol_factor, lower), np.dot(covariance, self._update_mat.T).T,
innovation = measurement - projected_mean # measurement(观测值) 只有坐标没有速度, 所以projected_mean 做映射只选取位置信息去掉速度信息
new_mean = mean + np.dot(innovation, kalman_gain.T)
new_covariance = covariance - np.linalg.multi_dot((
kalman_gain, projected_cov, kalman_gain.T))
return new_mean, new_covariance
① 首先将mean
, covariance
参数通过投射变换转入到传感器(目标检测器)空间向量中去projected_mean, projected_cov = self.project(mean, covariance)
② 接着计算卡尔曼增益
, 因上面在计算协防方差矩阵的时候已经考虑到检测器的误差所以这里的不需要考虑。
chol_factor, lower = scipy.linalg.cho_factor(projected_cov, lower=True, check_finite=False)
kalman_gain = scipy.linalg.cho_solve((chol_factor, lower), np.dot(covariance, self._update_mat.T).T,check_finite=False).T
③ 计算更新后新的mean
innovation = measurement - projected_mean # measurement
new_mean = mean + np.dot(innovation, kalman_gain.T)
new_covariance = covariance - np.linalg.multi_dot((kalman_gain, projected_cov, kalman_gain.T))
1.2.6 计算状态分布和测量检测框之间距离函数
def gating_distance(self, mean, covariance, measurements,
"""Compute gating distance between state distribution and measurements.
A suitable distance threshold can be obtained from `chi2inv95`. If
`only_position` is False, the chi-square distribution has 4 degrees of
freedom, otherwise 2.
mean : ndarray
Mean vector over the state distribution (8 dimensional).
covariance : ndarray
Covariance of the state distribution (8x8 dimensional).
measurements : ndarray
An Nx4 dimensional matrix of N measurements, each in
format (x, y, a, h) where (x, y) is the bounding box center
position, a the aspect ratio, and h the height.
only_position : Optional[bool]
If True, distance computation is done with respect to the bounding
box center position only.
Returns an array of length N, where the i-th element contains the
squared Mahalanobis distance between (mean, covariance) and
mean, covariance = self.project(mean, covariance)
if only_position:
mean, covariance = mean[:2], covariance[:2, :2]
measurements = measurements[:, :2]
cholesky_factor = np.linalg.cholesky(covariance)
d = measurements - mean
z = scipy.linalg.solve_triangular(
cholesky_factor, d.T, lower=True, check_finite=False,
squared_maha = np.sum(z * z, axis=0)
return squared_maha
这里根据器检测的均值,卡尔曼滤波均值以及卡尔曼滤波协方差矩阵计算卡尔曼滤波得出的估计值(mean, variance)以及检测器得出的估计值(mean1,variance)的马氏距离(Mahalanobis)。
至于为什么要使用马氏距离可以参考这篇博客 马氏距离通俗理解。
1.3 匈牙利匹配算法(Hungarian Matching Algorithm)
为了能更好的方便解释匈牙利匹配算法,这篇文章 目标跟踪初探(DeepSORT)的示例非常好的进行了诠释。
首先,先介绍一下什么是分配问题(Assignment Problem):假设有N个人和N个任务,每个任务可以任意分配给不同的人,已知每个人完成每个任务要花费的代价不尽相同,那么如何分配可以使得总的代价最小。举个例子,假设现在有3个任务,要分别分配给3个人,每个人完成各个任务所需代价矩阵(cost matrix)如下所示(这个代价可以是金钱、时间等等):
import numpy as np
from sklearn.utils.linear_assignment_ import linear_assignment
from scipy.optimize import linear_sum_assignment
cost_matrix = np.array([
matches = linear_assignment(cost_matrix)
print('sklearn API result:\n', matches)
matches = linear_sum_assignment(cost_matrix)
print('scipy API result:\n', matches)
sklearn API result:
[[0 1]
[1 0]
[2 2]]
scipy API result:
(array([0, 1, 2], dtype=int64), array([1, 0, 2], dtype=int64))
二、 流程
2.1 SORT算法流程图
关键步骤:轨迹卡尔曼滤波预测→ 使用匈牙利算法将预测后的tracks和当前帧中的detecions进行匹配(IOU匹配) → 卡尔曼滤波更新。 对于没有匹配上的轨迹,也不是马上就删掉了,有个T_lost的保存时间,但SORT里把这个时间阈值设置的是1,也就是说对于没匹配上的轨迹相当于直接删了。
2.2 DEEPSORT关键点梳理
相对于SORT算法比,该算法多了级联匹配(Matching Cascade)和新轨迹的确认(confirmed)。
2.2.1 跟踪器(Tracker)
# vim: expandtab:ts=4:sw=4
class TrackState:
Enumeration type for the single target track state. Newly created tracks are
classified as `tentative` until enough evidence has been collected. Then,
the track state is changed to `confirmed`. Tracks that are no longer alive
are classified as `deleted` to mark them for removal from the set of active
Tentative = 1
Confirmed = 2
Deleted = 3
class Track:
A single target track with state space `(x, y, a, h)` and associated
velocities, where `(x, y)` is the center of the bounding box, `a` is the
aspect ratio and `h` is the height.
mean : ndarray
Mean vector of the initial state distribution.
covariance : ndarray
Covariance matrix of the initial state distribution.
track_id : int
A unique track identifier.
n_init : int
Number of consecutive detections before the track is confirmed. The
track state is set to `Deleted` if a miss occurs within the first
`n_init` frames.
max_age : int
The maximum number of consecutive misses before the track state is
set to `Deleted`.
feature : Optional[ndarray]
Feature vector of the detection this track originates from. If not None,
this feature is added to the `features` cache.
mean : ndarray
Mean vector of the initial state distribution.
covariance : ndarray
Covariance matrix of the initial state distribution.
track_id : int
A unique track identifier.
hits : int
Total number of measurement updates.
age : int
Total number of frames since first occurance.
time_since_update : int
Total number of frames since last measurement update.
state : TrackState
The current track state.
features : List[ndarray]
A cache of features. On each measurement update, the associated feature
vector is added to this list.
def __init__(self, mean, covariance, track_id, class_id, n_init, max_age,
self.mean = mean
self.covariance = covariance
self.track_id = track_id
self.class_id = class_id
self.hits = 1 # 连续匹配多少次
self.age = 1
self.time_since_update = 0 # 有多少帧track还没有匹配上,更新+1, 匹配清0
self.yolo_bbox = [0, 0, 0, 0]
self.state = TrackState.Tentative
self.features = []
if feature is not None:
self._n_init = n_init
self._max_age = max_age
def to_tlwh(self):
"""Get current position in bounding box format `(top left x, top left y,
width, height)`.
The bounding box.
ret = self.mean[:4].copy()
ret[2] *= ret[3]
ret[:2] -= ret[2:] / 2
return ret
def to_tlbr(self):
"""Get kf estimated current position in bounding box format `(min x, miny, max x,
max y)`.
The predicted kf bounding box.
ret = self.to_tlwh()
ret[2:] = ret[:2] + ret[2:]
return ret
def get_yolo_pred(self):
"""Get yolo prediction`.
The yolo bounding box.
return self.yolo_bbox
def increment_age(self):
self.age += 1
self.time_since_update += 1
def predict(self, kf):
"""Propagate the state distribution to the current time step using a
Kalman filter prediction step.
kf : kalman_filter.KalmanFilter
The Kalman filter.
self.mean, self.covariance = kf.predict(self.mean, self.covariance)
def update(self, kf, detection, class_id):
"""Perform Kalman filter measurement update step and update the feature
kf : kalman_filter.KalmanFilter
The Kalman filter.
detection : Detection
The associated detection.
self.yolo_bbox = detection
self.mean, self.covariance = kf.update(
self.mean, self.covariance, detection.to_xyah())
self.class_id = class_id
self.hits += 1
self.time_since_update = 0
# 匹配的时候(update)time_since_update +需要置0,predict加1
if self.state == TrackState.Tentative and self.hits >= self._n_init:
self.state = TrackState.Confirmed
def mark_missed(self):
"""Mark this track as missed (no association at the current time step).
if self.state == TrackState.Tentative:
self.state = TrackState.Deleted
elif self.time_since_update > self._max_age:
self.state = TrackState.Deleted
def is_tentative(self):
"""Returns True if this track is tentative (unconfirmed).
return self.state == TrackState.Tentative
def is_confirmed(self):
"""Returns True if this track is confirmed."""
return self.state == TrackState.Confirmed
def is_deleted(self):
"""Returns True if this track is dead and should be deleted."""
return self.state == TrackState.Deleted
2.2.2 目标检测器(Detections)
这里很好理解利用检测器检测出目标在图片中的位置(通常由4个坐标形成),对应的检测器有centernet, yolov系列等。这里使用的是yolov5
pred = model(img, augment=opt.augment, visualize=visualize)
# Apply NMS
pred = non_max_suppression(pred, opt.conf_thres, opt.iou_thres, opt.classes, opt.agnostic_nms, max_det=opt.max_det)
# Process detections
for i, det in enumerate(pred): # detections per image
seen += 1
if webcam: # batch_size >= 1
p, im0, frame = path[i], im0s[i].copy(), dataset.count
s += f'{i}: '
p, im0, frame = path, im0s.copy(), getattr(dataset, 'frame', 0)
s += '%gx%g ' % img.shape[2:] # print string
save_path = str(Path(out) / Path(p).name)
annotator = Annotator(im0, line_width=2, pil=not ascii)
if det is not None and len(det):
# Rescale boxes from img_size to im0 size
det[:, :4] = scale_coords(
img.shape[2:], det[:, :4], im0.shape).round()
# Print results
for c in det[:, -1].unique():
n = (det[:, -1] == c).sum() # detections per class
s += f"{n} {names[int(c)]}{'s' * (n > 1)}, " # add to string
xywhs = xyxy2xywh(det[:, 0:4])
confs = det[:, 4]
clss = det[:, 5]
2.2.3 卡尔曼滤波器(Kalman Filter Predict)
2.2.4 确认状态设定(Confirmed)
2.2.5 IOU匹配(IOU Matched)
我们认为其是匹配的,否则我们认为是不匹配的, 将该tracker删除。
2.2.6 级联匹配(Matching Cascade)
这里使用级联匹配的目的是两个目标遮挡的情况,刚刚被遮挡的目标的track无法匹配detection,且暂时 从图像中消失,之后背遮挡目标再次出现的时候,应该尽量让被遮挡的目标分配的ID不发生变动,减少ID交换的情况,因此我们就需要用到级联匹配。关于级联匹配的过程我们可以如下的图来理解:
即如果遇到检测器级联匹配失败同时IOU匹配失败的情况(缺失一次,missing_age + 1),仍然可以容忍max_age=70帧。如果missing_age超过这个帧数(70)则将该track删除。
级联匹配的代价函数是由卡尔曼滤波的均值(目标坐标)以及协方差与检测器的均值(目标坐标)的马式距离(Mahalanobis distance)
与检测器目标的ReId特征与卡尔曼滤波ReId特征余弦相似度(Cosine Distance)(⭐️注意这里我们取相似度最大及距离最小的相似度)
。【5】门矩阵(Gate Matrix)
当cost matrix
# linear_assignment.py
def matching_cascade(
distance_metric, max_distance, cascade_depth, tracks, detections,
track_indices=None, detection_indices=None):
"""Run matching cascade.
distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray
The distance metric is given a list of tracks and detections as well as
a list of N track indices and M detection indices. The metric should
return the NxM dimensional cost matrix, where element (i, j) is the
association cost between the i-th track in the given track indices and
the j-th detection in the given detection indices.
max_distance : float
Gating threshold. Associations with cost larger than this value are
cascade_depth: int
The cascade depth, should be se to the maximum track age.
tracks : List[track.Track]
A list of predicted tracks at the current time step.
detections : List[detection.Detection]
A list of detections at the current time step.
track_indices : Optional[List[int]]
List of track indices that maps rows in `cost_matrix` to tracks in
`tracks` (see description above). Defaults to all tracks.
detection_indices : Optional[List[int]]
List of detection indices that maps columns in `cost_matrix` to
detections in `detections` (see description above). Defaults to all
(List[(int, int)], List[int], List[int])
Returns a tuple with the following three entries:
* A list of matched track and detection indices.
* A list of unmatched track indices.
* A list of unmatched detection indices.
if track_indices is None:
track_indices = list(range(len(tracks)))
if detection_indices is None:
detection_indices = list(range(len(detections)))
unmatched_detections = detection_indices
matches = []
for level in range(cascade_depth):
if len(unmatched_detections) == 0: # No detections left
track_indices_l = [
k for k in track_indices
if tracks[k].time_since_update == 1 + level
if len(track_indices_l) == 0: # Nothing to match at this level
matches_l, _, unmatched_detections = \
distance_metric, max_distance, tracks, detections,
track_indices_l, unmatched_detections)
matches += matches_l
unmatched_tracks = list(set(track_indices) - set(k for k, _ in matches))
return matches, unmatched_tracks, unmatched_detections
# tracker.py
def gated_metric(tracks, dets, track_indices, detection_indices):
features = np.array([dets[i].feature for i in detection_indices])
targets = np.array([tracks[i].track_id for i in track_indices])
cost_matrix = self.metric.distance(features, targets)
cost_matrix = linear_assignment.gate_cost_matrix(
self.kf, cost_matrix, tracks, dets, track_indices,
return cost_matrix
2.2.7 特征提取
# 特征提取器
class Extractor(object):
def __init__(self, model_path, use_cuda=True):
self.net = Net(reid=True)
self.device = "cuda" if torch.cuda.is_available() and use_cuda else "cpu"
state_dict = torch.load(model_path, map_location=torch.device(self.device))[
logger = logging.getLogger("root.tracker")
logger.info("Loading weights from {}... Done!".format(model_path))
self.size = (64, 128)
self.norm = transforms.Compose([
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
# 图片预处理
def _preprocess(self, im_crops):
1. to float with scale from 0 to 1
2. resize to (64, 128) as Market1501 dataset did
3. concatenate to a numpy array
3. to torch Tensor
4. normalize
def _resize(im, size):
return cv2.resize(im.astype(np.float32)/255., size)
im_batch = torch.cat([self.norm(_resize(im, self.size)).unsqueeze(
0) for im in im_crops], dim=0).float()
return im_batch
# 输入模型
def __call__(self, im_crops):
im_batch = self._preprocess(im_crops)
with torch.no_grad():
im_batch = im_batch.to(self.device)
features = self.net(im_batch)
return features.cpu().numpy()
# 特征提取网络
class Net(nn.Module):
def __init__(self, num_classes=751, reid=False):
super(Net, self).__init__()
# 3 128 64
self.conv = nn.Sequential(
nn.Conv2d(3, 64, 3, stride=1, padding=1),
# nn.Conv2d(32,32,3,stride=1,padding=1),
# nn.BatchNorm2d(32),
# nn.ReLU(inplace=True),
nn.MaxPool2d(3, 2, padding=1),
# 32 64 32
self.layer1 = make_layers(64, 64, 2, False)
# 32 64 32
self.layer2 = make_layers(64, 128, 2, True)
# 64 32 16
self.layer3 = make_layers(128, 256, 2, True)
# 128 16 8
self.layer4 = make_layers(256, 512, 2, True)
# 256 8 4
self.avgpool = nn.AvgPool2d((8, 4), 1)
# 256 1 1
self.reid = reid
self.classifier = nn.Sequential(
nn.Linear(512, 256),
nn.Linear(256, num_classes),
def forward(self, x):
x = self.conv(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.avgpool(x)
x = x.view(x.size(0), -1)
# B x 128
if self.reid:
x = x.div(x.norm(p=2, dim=1, keepdim=True))
return x
# classifier
x = self.classifier(x)
return x
2.2.8 ⭐️参数更新(Filter)
import numpy as np
import torch
from .deep.feature_extractor import Extractor
from .sort.nn_matching import NearestNeighborDistanceMetric
from .sort.detection import Detection
from .sort.tracker import Tracker
__all__ = ['DeepSort']
class DeepSort(object):
def __init__(self, model_path, max_dist=0.2, min_confidence=0.3, max_iou_distance=0.7, max_age=70, n_init=3, nn_budget=100, use_cuda=True):
self.min_confidence = min_confidence
self.extractor = Extractor(model_path, use_cuda=use_cuda)
max_cosine_distance = max_dist
metric = NearestNeighborDistanceMetric(
"cosine", max_cosine_distance, nn_budget)
self.tracker = Tracker(
metric, max_iou_distance=max_iou_distance, max_age=max_age, n_init=n_init)
def update(self, bbox_xywh, confidences, classes, ori_img, use_yolo_preds=True):
self.height, self.width = ori_img.shape[:2]
# generate detections
features = self._get_features(bbox_xywh, ori_img)
bbox_tlwh = self._xywh_to_tlwh(bbox_xywh)
detections = [Detection(bbox_tlwh[i], conf, features[i]) for i, conf in enumerate(
confidences) if conf > self.min_confidence]
# run on non-maximum supression
boxes = np.array([d.tlwh for d in detections])
scores = np.array([d.confidence for d in detections])
# update tracker
self.tracker.update(detections, classes)
# output bbox identities
outputs = []
for track in self.tracker.tracks:
if not track.is_confirmed() or track.time_since_update > 1:
if use_yolo_preds:
det = track.get_yolo_pred()
x1, y1, x2, y2 = self._tlwh_to_xyxy(det.tlwh)
box = track.to_tlwh()
x1, y1, x2, y2 = self._tlwh_to_xyxy(box)
track_id = track.track_id
class_id = track.class_id
outputs.append(np.array([x1, y1, x2, y2, track_id, class_id], dtype=np.int))
if len(outputs) > 0:
outputs = np.stack(outputs, axis=0)
return outputs
Convert bbox from xc_yc_w_h to xtl_ytl_w_h
Thanks JieChen91@github.com for reporting this bug!
def _xywh_to_tlwh(bbox_xywh):
if isinstance(bbox_xywh, np.ndarray):
bbox_tlwh = bbox_xywh.copy()
elif isinstance(bbox_xywh, torch.Tensor):
bbox_tlwh = bbox_xywh.clone()
bbox_tlwh[:, 0] = bbox_xywh[:, 0] - bbox_xywh[:, 2] / 2.
bbox_tlwh[:, 1] = bbox_xywh[:, 1] - bbox_xywh[:, 3] / 2.
return bbox_tlwh
def _xywh_to_xyxy(self, bbox_xywh):
x, y, w, h = bbox_xywh
x1 = max(int(x - w / 2), 0)
x2 = min(int(x + w / 2), self.width - 1)
y1 = max(int(y - h / 2), 0)
y2 = min(int(y + h / 2), self.height - 1)
return x1, y1, x2, y2
def _tlwh_to_xyxy(self, bbox_tlwh):
Convert bbox from xtl_ytl_w_h to xc_yc_w_h
Thanks JieChen91@github.com for reporting this bug!
x, y, w, h = bbox_tlwh
x1 = max(int(x), 0)
x2 = min(int(x+w), self.width - 1)
y1 = max(int(y), 0)
y2 = min(int(y+h), self.height - 1)
return x1, y1, x2, y2
def increment_ages(self):
def _xyxy_to_tlwh(self, bbox_xyxy):
x1, y1, x2, y2 = bbox_xyxy
t = x1
l = y1
w = int(x2 - x1)
h = int(y2 - y1)
return t, l, w, h
def _get_features(self, bbox_xywh, ori_img):
im_crops = []
for box in bbox_xywh:
x1, y1, x2, y2 = self._xywh_to_xyxy(box)
im = ori_img[y1:y2, x1:x2]
if im_crops:
features = self.extractor(im_crops)
features = np.array([])
return features
(1) update
a. 通过目标的位置以及原图通过特征提取器提取128维度的目标特征,接着过滤掉置信度低于0.3
class Detection(object):
This class represents a bounding box detection in a single image.
tlwh : array_like
Bounding box in format `(x, y, w, h)`.
confidence : float
Detector confidence score.
feature : array_like
A feature vector that describes the object contained in this image.
tlwh : ndarray
Bounding box in format `(top left x, top left y, width, height)`.
confidence : ndarray
Detector confidence score.
feature : ndarray | NoneType
A feature vector that describes the object contained in this image.
def __init__(self, tlwh, confidence, feature):
self.tlwh = np.asarray(tlwh, dtype=np.float)
self.confidence = float(confidence)
self.feature = np.asarray(feature, dtype=np.float32)
def to_tlbr(self):
"""Convert bounding box to format `(min x, min y, max x, max y)`, i.e.,
`(top left, bottom right)`.
ret = self.tlwh.copy()
ret[2:] += ret[:2]
return ret
def to_xyah(self):
"""Convert bounding box to format `(center x, center y, aspect ratio,
height)`, where the aspect ratio is `width / height`.
ret = self.tlwh.copy()
ret[:2] += ret[2:] / 2
ret[2] /= ret[3]
return ret
features = self._get_features(bbox_xywh, ori_img)
bbox_tlwh = self._xywh_to_tlwh(bbox_xywh)
detections = [Detection(bbox_tlwh[i], conf, features[i]) for i, conf in enumerate(confidences) if conf > self.min_confidence]
b. 卡尔曼滤波更新与预测
self.tracker.update(detections, classes)
预测的时候, 输入mean以及convariance,输出新的mean, 新的convariance,🚩同时age以及,time_since_update 加1。
def increment_age(self):
self.age += 1 # 该track自出现以来的总帧数加1
self.time_since_update += 1 # # 该track自最近一次更新以来的总帧数加1
def predict(self, kf):
"""Propagate the state distribution to the current time step using a
Kalman filter prediction step.
kf : kalman_filter.KalmanFilter
The Kalman filter.
self.mean, self.covariance = kf.predict(self.mean, self.covariance)
def update(self, detections, classes):
"""Perform measurement update and track management.
detections : List[deep_sort.detection.Detection]
A list of detections at the current time step.
# Run matching cascade.
matches, unmatched_tracks, unmatched_detections = \
# Update track set.
for track_idx, detection_idx in matches: # 对匹配上的进行参数更新
self.kf, detections[detection_idx], classes[detection_idx])
for track_idx in unmatched_tracks:
for detection_idx in unmatched_detections:
self._initiate_track(detections[detection_idx], classes[detection_idx].item())
self.tracks = [t for t in self.tracks if not t.is_deleted()]
# Update distance metric.
active_targets = [t.track_id for t in self.tracks if t.is_confirmed()]
features, targets = [], []
for track in self.tracks:
if not track.is_confirmed():
features += track.features
targets += [track.track_id for _ in track.features]
track.features = []
np.asarray(features), np.asarray(targets), active_targets)
def _match(self, detections):
def gated_metric(tracks, dets, track_indices, detection_indices):
features = np.array([dets[i].feature for i in detection_indices])
targets = np.array([tracks[i].track_id for i in track_indices])
cost_matrix = self.metric.distance(features, targets)
cost_matrix = linear_assignment.gate_cost_matrix(
self.kf, cost_matrix, tracks, dets, track_indices,
return cost_matrix
# Split track set into confirmed and unconfirmed tracks.
confirmed_tracks = [
i for i, t in enumerate(self.tracks) if t.is_confirmed()]
unconfirmed_tracks = [
i for i, t in enumerate(self.tracks) if not t.is_confirmed()]
# Associate confirmed tracks using appearance features.
# 对confirmd tracks进行级联匹配
matches_a, unmatched_tracks_a, unmatched_detections = \
gated_metric, self.metric.matching_threshold, self.max_age,
self.tracks, detections, confirmed_tracks)
# Associate remaining tracks together with unconfirmed tracks using IOU.
# 将所有状态为未确定态的轨迹和刚刚没有匹配上的轨迹组合为iou_track_candidates,进行IoU的匹配
iou_track_candidates = unconfirmed_tracks + [ # a 是级联匹配失败剩下的
k for k in unmatched_tracks_a if
self.tracks[k].time_since_update == 1] # 刚刚没有匹配上
# 未匹配
unmatched_tracks_a = [
k for k in unmatched_tracks_a if
self.tracks[k].time_since_update != 1] # 已经很久没有匹配上
matches_b, unmatched_tracks_b, unmatched_detections = \
iou_matching.iou_cost, self.max_iou_distance, self.tracks,
detections, iou_track_candidates, unmatched_detections)
matches = matches_a + matches_b
unmatched_tracks = list(set(unmatched_tracks_a + unmatched_tracks_b))
return matches, unmatched_tracks, unmatched_detections
# linear_assignment.py
def matching_cascade(
distance_metric, max_distance, cascade_depth, tracks, detections,
track_indices=None, detection_indices=None):
"""Run matching cascade.
distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray
The distance metric is given a list of tracks and detections as well as
a list of N track indices and M detection indices. The metric should
return the NxM dimensional cost matrix, where element (i, j) is the
association cost between the i-th track in the given track indices and
the j-th detection in the given detection indices.
max_distance : float
Gating threshold. Associations with cost larger than this value are
cascade_depth: int
The cascade depth, should be se to the maximum track age.
tracks : List[track.Track]
A list of predicted tracks at the current time step.
detections : List[detection.Detection]
A list of detections at the current time step.
track_indices : Optional[List[int]]
List of track indices that maps rows in `cost_matrix` to tracks in
`tracks` (see description above). Defaults to all tracks.
detection_indices : Optional[List[int]]
List of detection indices that maps columns in `cost_matrix` to
detections in `detections` (see description above). Defaults to all
(List[(int, int)], List[int], List[int])
Returns a tuple with the following three entries:
* A list of matched track and detection indices.
* A list of unmatched track indices.
* A list of unmatched detection indices.
if track_indices is None:
track_indices = list(range(len(tracks)))
if detection_indices is None:
detection_indices = list(range(len(detections)))
unmatched_detections = detection_indices
matches = []
# 由小到大依次对每个level的tracks做匹配, 最近丢失匹配帧越小的目标越先进行匹配
for level in range(cascade_depth):
if len(unmatched_detections) == 0: # No detections left
track_indices_l = [
k for k in track_indices
if tracks[k].time_since_update == 1 + level
if len(track_indices_l) == 0: # Nothing to match at this level
matches_l, _, unmatched_detections = \
distance_metric, max_distance, tracks, detections,
track_indices_l, unmatched_detections)
matches += matches_l
unmatched_tracks = list(set(track_indices) - set(k for k, _ in matches))
return matches, unmatched_tracks, unmatched_detections
函数来判断哪些目标匹配,哪些卡尔曼滤波没有匹配,哪些目标检测没有匹配。接着由小到大依次对每个level的tracks做匹配, 最近丢失匹配帧越小的目标越先进行匈牙利匹配。
matches, unmatched_tracks, unmatched_detections = \
# Associate confirmed tracks using appearance features.
matches_a, unmatched_tracks_a, unmatched_detections = \
gated_metric, self.metric.matching_threshold, self.max_age,
self.tracks, detections, confirmed_tracks)
matches_b, unmatched_tracks_b, unmatched_detections = \
iou_matching.iou_cost, self.max_iou_distance, self.tracks,
detections, iou_track_candidates, unmatched_detections)
def min_cost_matching(
distance_metric, max_distance, tracks, detections, track_indices=None,
"""Solve linear assignment problem.
distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray
The distance metric is given a list of tracks and detections as well as
a list of N track indices and M detection indices. The metric should
return the NxM dimensional cost matrix, where element (i, j) is the
association cost between the i-th track in the given track indices and
the j-th detection in the given detection_indices.
max_distance : float
Gating threshold. Associations with cost larger than this value are
tracks : List[track.Track]
A list of predicted tracks at the current time step.
detections : List[detection.Detection]
A list of detections at the current time step.
track_indices : List[int]
List of track indices that maps rows in `cost_matrix` to tracks in
`tracks` (see description above).
detection_indices : List[int]
List of detection indices that maps columns in `cost_matrix` to
detections in `detections` (see description above).
(List[(int, int)], List[int], List[int])
Returns a tuple with the following three entries:
* A list of matched track and detection indices.
* A list of unmatched track indices.
* A list of unmatched detection indices.
if track_indices is None:
track_indices = np.arange(len(tracks))
if detection_indices is None:
detection_indices = np.arange(len(detections))
if len(detection_indices) == 0 or len(track_indices) == 0:
return [], track_indices, detection_indices # Nothing to match.
# -----------------------------------------
# Gated_distance——>
# 1. cosine distance
# 2. 马氏距离
# 得到代价矩阵
# -----------------------------------------
# iou_cost——>
# 仅仅计算track和detection之间的iou距离
# -----------------------------------------
cost_matrix = distance_metric(
tracks, detections, track_indices, detection_indices)
cost_matrix[cost_matrix > max_distance] = max_distance + 1e-5
# -----------------------------------------
# gated_distance中设置距离中最高上限,
# 这里最远距离实际是在deep sort类中的max_dist参数设置的
# 默认max_dist=0.2, 距离越小越好
# -----------------------------------------
# iou_cost情况下,max_distance的设置对应tracker中的max_iou_distance,
# 默认值为max_iou_distance=0.7
# 注意结果是1-iou,所以越小越好
# -----------------------------------------
row_indices, col_indices = linear_assignment(cost_matrix)
matches, unmatched_tracks, unmatched_detections = [], [], []
for col, detection_idx in enumerate(detection_indices): # 没匹配的tdetection有哪些
if col not in col_indices: # 查看检测和匈牙利匹配是否对应 对应上了则匹配,对应不上不匹配
for row, track_idx in enumerate(track_indices): # 没匹配的track有哪些
if row not in row_indices:
for row, col in zip(row_indices, col_indices):查看卡尔曼匹配和匈牙利匹配是否对应 对应上了则匹配,对应不上不匹配
track_idx = track_indices[row]
detection_idx = detection_indices[col]
if cost_matrix[row, col] > max_distance: # 就算很匹配,但是当其距离大于0.7仍放到没配对中
matches.append((track_idx, detection_idx))
return matches, unmatched_tracks, unmatched_detections
def gated_metric(tracks, dets, track_indices, detection_indices):
features = np.array([dets[i].feature for i in detection_indices])
targets = np.array([tracks[i].track_id for i in track_indices])
cost_matrix = self.metric.distance(features, targets)
cost_matrix = linear_assignment.gate_cost_matrix(self.kf,cost_matrix, tracks, dets, track_indices,detection_indices)
return cost_matrix
cost_matrix = distance_metric(tracks, detections, track_indices, detection_indices)
cost_matrix[cost_matrix > max_distance] = max_distance + 1e-5
def gate_cost_matrix(
kf, cost_matrix, tracks, detections, track_indices, detection_indices,
gated_cost=INFTY_COST, only_position=False):
"""Invalidate infeasible entries in cost matrix based on the state
distributions obtained by Kalman filtering.
kf : The Kalman filter.
cost_matrix : ndarray
The NxM dimensional cost matrix, where N is the number of track indices
and M is the number of detection indices, such that entry (i, j) is the
association cost between `tracks[track_indices[i]]` and
tracks : List[track.Track]
A list of predicted tracks at the current time step.
detections : List[detection.Detection]
A list of detections at the current time step.
track_indices : List[int]
List of track indices that maps rows in `cost_matrix` to tracks in
`tracks` (see description above).
detection_indices : List[int]
List of detection indices that maps columns in `cost_matrix` to
detections in `detections` (see description above).
gated_cost : Optional[float]
Entries in the cost matrix corresponding to infeasible associations are
set this value. Defaults to a very large value.
only_position : Optional[bool]
If True, only the x, y position of the state distribution is considered
during gating. Defaults to False.
Returns the modified cost matrix.
gating_dim = 2 if only_position else 4
gating_threshold = kalman_filter.chi2inv95[gating_dim]
measurements = np.asarray(
[detections[i].to_xyah() for i in detection_indices])
for row, track_idx in enumerate(track_indices):
track = tracks[track_idx]
gating_distance = kf.gating_distance(
track.mean, track.covariance, measurements, only_position)
cost_matrix[row, gating_distance > gating_threshold] = gated_cost
return cost_matrix
# Associate remaining tracks together with unconfirmed tracks using IOU.
# 对级联匹配中未匹配的tracks和unconfirmed tracks中time_since_update为1(大于1说明一直没有匹配以前做过iou匹配了, 这里就不需要了)的tracks进行IOU匹配
iou_track_candidates = unconfirmed_tracks + [ # a 是级联匹配失败剩下的
k for k in unmatched_tracks_a if
self.tracks[k].time_since_update == 1] # 刚刚没有匹配上
# 未匹配
unmatched_tracks_a = [k for k in unmatched_tracks_a if
self.tracks[k].time_since_update != 1] # 已经很久没有匹配上,说明做过iou匹配了
matches_b, unmatched_tracks_b, unmatched_detections = \
iou_matching.iou_cost, self.max_iou_distance, self.tracks,
detections, iou_track_candidates, unmatched_detections)
# iou_matching.py
def iou(bbox, candidates):
"""Computer intersection over union.
bbox : ndarray
A bounding box in format `(top left x, top left y, width, height)`.
candidates : ndarray
A matrix of candidate bounding boxes (one per row) in the same format
as `bbox`.
The intersection over union in [0, 1] between the `bbox` and each
candidate. A higher score means a larger fraction of the `bbox` is
occluded by the candidate.
bbox_tl, bbox_br = bbox[:2], bbox[:2] + bbox[2:]
candidates_tl = candidates[:, :2]
candidates_br = candidates[:, :2] + candidates[:, 2:]
tl = np.c_[np.maximum(bbox_tl[0], candidates_tl[:, 0])[:, np.newaxis],
np.maximum(bbox_tl[1], candidates_tl[:, 1])[:, np.newaxis]]
br = np.c_[np.minimum(bbox_br[0], candidates_br[:, 0])[:, np.newaxis],
np.minimum(bbox_br[1], candidates_br[:, 1])[:, np.newaxis]]
wh = np.maximum(0., br - tl)
area_intersection = wh.prod(axis=1)
area_bbox = bbox[2:].prod()
area_candidates = candidates[:, 2:].prod(axis=1)
return area_intersection / (area_bbox + area_candidates - area_intersection)
def iou_cost(tracks, detections, track_indices=None,
"""An intersection over union distance metric.
tracks : List[deep_sort.track.Track]
A list of tracks.
detections : List[deep_sort.detection.Detection]
A list of detections.
track_indices : Optional[List[int]]
A list of indices to tracks that should be matched. Defaults to
all `tracks`.
detection_indices : Optional[List[int]]
A list of indices to detections that should be matched. Defaults
to all `detections`.
Returns a cost matrix of shape
len(track_indices), len(detection_indices) where entry (i, j) is
`1 - iou(tracks[track_indices[i]], detections[detection_indices[j]])`.
if track_indices is None:
track_indices = np.arange(len(tracks))
if detection_indices is None:
detection_indices = np.arange(len(detections))
cost_matrix = np.zeros((len(track_indices), len(detection_indices)))
for row, track_idx in enumerate(track_indices):
if tracks[track_idx].time_since_update > 1:
cost_matrix[row, :] = linear_assignment.INFTY_COST
bbox = tracks[track_idx].to_tlwh()
candidates = np.asarray(
[detections[i].tlwh for i in detection_indices])
cost_matrix[row, :] = 1. - iou(bbox, candidates)
return cost_matrix
def mark_missed(self):
"""Mark this track as missed (no association at the current time step).
if self.state == TrackState.Tentative:
self.state = TrackState.Deleted
elif self.time_since_update > self._max_age:
self.state = TrackState.Deleted
def _initiate_track(self, detection, class_id):
mean, covariance = self.kf.initiate(detection.to_xyah())
mean, covariance, self._next_id, class_id, self.n_init, self.max_age,
self._next_id += 1
# 进行测量的更新和轨迹管理
# Run matching cascade.
def update(self,detections):
# Update track set.
# 1.针对匹配上的结果
for track_idx,detection_idx in matches:
# track更新对应的detection
self.tracks[track_idx].update(self.kf, detections[detection_idx])
# 2.针对未匹配的tracker,调用mark_missed标记
# track失配,若待定(unconfirmed)则删除,若update时间很久也删除
# max age是一个存活期限,默认为70帧
for track_idx in unmatched_tracks:
# 3.针对未匹配的detection,detection失配,进行初始化
for detection_idx in unmatched_detections:
# 得到最新的tracks列表,保存的是标记为confirmed和Tentative的track
self.tracks=[t for t in self.tracks if not t.is_deleted()]
# Update distance metric.
active_targets=[t.track_id for t in self.tracks if t.is_confirmed()]
# active_targets框可以在画面展示
# 获取所有confirmed状态的track id
for track in self.tracks:
if not track.is_confirmed():
features+=track.features # 将tracks列表拼接到features列表
# 获取每个feature对应的track id
targets+=[track.track_id for _ in track.features]
# 距离度量中的特征集更新
class NearestNeighborDistanceMetric(object):
# 对于每个目标,返回一个最近的距离
def __init__(self, metric, matching_threshold, budget=None):
# 默认matching_threshold = 0.2 budge = 100
if metric == "euclidean":
# 使用最近邻欧氏距离
self._metric = _nn_euclidean_distance
elif metric == "cosine":
# 使用最近邻余弦距离
self._metric = _nn_cosine_distance
raise ValueError("Invalid metric; must be either 'euclidean' or 'cosine'")
self.matching_threshold = matching_threshold
# 在级联匹配的函数中调用
self.budget = budget
# budge 预算,控制feature的多少
self.samples = {}
# samples是一个字典{id->feature list}
def partial_fit(self, features, targets, active_targets):
# 作用:部分拟合,用新的数据更新测量距离
# 调用:在特征集更新模块部分调用,tracker.update()中
for feature, target in zip(features, targets):
self.samples.setdefault(target, []).append(feature)
# 对应目标下添加新的feature,更新feature集合
# 目标id : feature list
if self.budget is not None:
self.samples[target] = self.samples[target][-self.budget:]
# 设置预算,每个类最多多少个目标,超过直接忽略
# 筛选激活的目标
self.samples = {k: self.samples[k] for k in active_targets}
def distance(self, features, targets):
# 作用:比较feature和targets之间的距离,返回一个代价矩阵
# 调用:在匹配阶段,将distance封装为gated_metric,
# 进行外观信息(reid得到的深度特征)+
# 运动信息(马氏距离用于度量两个分布相似程度)
cost_matrix = np.zeros((len(targets), len(features)))
for i, target in enumerate(targets):
cost_matrix[i, :] = self._metric(self.samples[target], features)
return cost_matrix
★上述中应该是每一个目标tracker每一帧保留一个feature, 每次都与其自己的feature作相似度匹配,选最小的,同时随着时间推移不断更新feature budget(这里设置为100), 即每个activate的追踪器保留最近的self.budget条特征,就是自己检测器检测的特征与以往检测出来的feature作比较,选取最接近的feature作为判断,代价函数越小就越属于那一个id。
整体流程可以参考改图( 来源:Deep SORT多目标跟踪算法代码解析)
[1] 【MOT】详解SORT与卡尔曼滤波算法
[2] 图说卡尔曼滤波,一份通俗易懂的教程
[3] 目标跟踪初探(DeepSORT)
[4] 马氏距离通俗理解
[5] 关于 Deep Sort 的一些理解
[6] 多目标跟踪(MOT)入门
[7] Deep SORT多目标跟踪算法代码解析