deepsort代码解析

2023-05-16

DeepSort代码解析

项目地址:deepsort

if __name__ == "__main__":
    args = parse_args()
    cfg = get_config()
    cfg.merge_from_file(args.config_detection)
    cfg.merge_from_file(args.config_deepsort)

    with VideoTracker(cfg, args, video_path=args.VIDEO_PATH) as vdo_trk:
        vdo_trk.run()

接下来看一下配置文件:
yolov3.yml:

YOLOV3:
  CFG: "./detector/YOLOv3/cfg/yolo_v3.cfg"
  WEIGHT: "./detector/YOLOv3/weight/yolov3.weights"
  CLASS_NAMES: "./detector/YOLOv3/cfg/coco.names"

  SCORE_THRESH: 0.5
  NMS_THRESH: 0.4

deepsort.yaml

DEEPSORT:
  REID_CKPT: "./deep_sort/deep/checkpoint/ckpt.t7"
  MAX_DIST: 0.2
  MIN_CONFIDENCE: 0.3
  NMS_MAX_OVERLAP: 0.5
  MAX_IOU_DISTANCE: 0.7
  MAX_AGE: 70
  N_INIT: 3
  NN_BUDGET: 100

再进入VideoTracker的init方法:

class VideoTracker(object):
    def __init__(self, cfg, args, video_path):
        self.cfg = cfg
        
        self.args = args
        self.video_path = video_path
        self.logger = get_logger("root")

        use_cuda = args.use_cuda and torch.cuda.is_available()
        if not use_cuda:
            warnings.warn("Running in cpu mode which maybe very slow!", UserWarning)

        if args.display:
            cv2.namedWindow("test", cv2.WINDOW_NORMAL)
            cv2.resizeWindow("test", args.display_width, args.display_height)

        if args.cam != -1:
            print("Using webcam " + str(args.cam))
            self.vdo = cv2.VideoCapture(args.cam)
        else:
            self.vdo = cv2.VideoCapture()
        if args.backbone == 'retinaface':
            return_layers = {'layer2':1,'layer3':2,'layer4':3}
            from retinaface import anchors,losses,torchvision_model,utils
            RetinaFace = torchvision_model.create_retinaface(return_layers)
            retina_dict = RetinaFace.state_dict()
            pre_state_dict = torch.load('model.pt')
            pretrained_dict = {k[7:]: v for k, v in pre_state_dict.items() if k[7:] in retina_dict}
            RetinaFace.load_state_dict(pretrained_dict)
            RetinaFace = RetinaFace.cuda()
            RetinaFace.eval()
            self.detector = RetinaFace
            
        elif args.backbone == 'yolov3':
            self.detector = build_detector(cfg, use_cuda=use_cuda)
            self.class_names = self.detector.class_names
        self.deepsort = build_tracker(cfg, use_cuda=use_cuda)

前面就是一些判断CUDA,调用摄像头,加载模型的操作,再来看 build_tracker():

def build_tracker(cfg, use_cuda):
    return DeepSort(cfg.DEEPSORT.REID_CKPT, 
                max_dist=cfg.DEEPSORT.MAX_DIST, min_confidence=cfg.DEEPSORT.MIN_CONFIDENCE, 
                nms_max_overlap=cfg.DEEPSORT.NMS_MAX_OVERLAP, max_iou_distance=cfg.DEEPSORT.MAX_IOU_DISTANCE, 
                max_age=cfg.DEEPSORT.MAX_AGE, n_init=cfg.DEEPSORT.N_INIT, nn_budget=cfg.DEEPSORT.NN_BUDGET, use_cuda=use_cuda)

DeepSort类init方法:

class DeepSort(object):
    def __init__(self, model_path, max_dist=0.2, min_confidence=0.3, nms_max_overlap=1.0, max_iou_distance=0.7, max_age=70, n_init=3, nn_budget=100, use_cuda=True):
        self.min_confidence = min_confidence
        self.nms_max_overlap = nms_max_overlap

        self.extractor = Extractor(model_path, use_cuda=use_cuda)

        max_cosine_distance = max_dist
        nn_budget = 100
        metric = NearestNeighborDistanceMetric("cosine", max_cosine_distance, nn_budget)
        self.tracker = Tracker(metric, max_iou_distance=max_iou_distance, max_age=max_age, n_init=n_init)

定义了关于检测的一些阈值,和加载了特征提取器网络,NearestNeighborDistanceMetric是距离计算的类,返回距离最近的样本,提供了欧氏距离和余弦距离两种度量方式。

Tracker类:

class Tracker:
    """
    This is the multi-target tracker.

    Parameters
    ----------
    metric : nn_matching.NearestNeighborDistanceMetric
        A distance metric for measurement-to-track association.
    max_age : int
        Maximum number of missed misses before a track is deleted.
    n_init : int
        Number of consecutive detections before the track is confirmed. The
        track state is set to `Deleted` if a miss occurs within the first
        `n_init` frames.

    Attributes
    ----------
    metric : nn_matching.NearestNeighborDistanceMetric
        The distance metric used for measurement to track association.
    max_age : int
        Maximum number of missed misses before a track is deleted.
    n_init : int
        Number of frames that a track remains in initialization phase.
    kf : kalman_filter.KalmanFilter
        A Kalman filter to filter target trajectories in image space.
    tracks : List[Track]
        The list of active tracks at the current time step.

    """

    def __init__(self, metric, max_iou_distance=0.7, max_age=70, n_init=3):
        self.metric = metric
        self.max_iou_distance = max_iou_distance
        self.max_age = max_age
        self.n_init = n_init

        self.kf = kalman_filter.KalmanFilter()
        self.tracks = []
        self._next_id = 1

定义了一些关于追踪的超参数,初始化卡尔曼滤波。
KalmanFilter():

class KalmanFilter(object):
    """
    A simple Kalman filter for tracking bounding boxes in image space.

    The 8-dimensional state space

        x, y, a, h, vx, vy, va, vh

    contains the bounding box center position (x, y), aspect ratio a, height h,
    and their respective velocities.

    Object motion follows a constant velocity model. The bounding box location
    (x, y, a, h) is taken as direct observation of the state space (linear
    observation model).

    """

    def __init__(self):
        ndim, dt = 4, 1.

        # Create Kalman filter model matrices.
        self._motion_mat = np.eye(2 * ndim, 2 * ndim)
        #     array([[1., 0., 0., 0., 0., 0., 0., 0.],
    	#    [0., 1., 0., 0., 0., 0., 0., 0.],
    	#    [0., 0., 1., 0., 0., 0., 0., 0.],
    	#    [0., 0., 0., 1., 0., 0., 0., 0.],
    	#    [0., 0., 0., 0., 1., 0., 0., 0.],
    	#    [0., 0., 0., 0., 0., 1., 0., 0.],
    	#    [0., 0., 0., 0., 0., 0., 1., 0.],
    	#    [0., 0., 0., 0., 0., 0., 0., 1.]])
        for i in range(ndim):
            self._motion_mat[i, ndim + i] = dt
        # [[1. 0. 0. 0. 1. 0. 0. 0.]
        # [0. 1. 0. 0. 0. 1. 0. 0.]
        # [0. 0. 1. 0. 0. 0. 1. 0.]
        # [0. 0. 0. 1. 0. 0. 0. 1.]
        # [0. 0. 0. 0. 1. 0. 0. 0.]
        # [0. 0. 0. 0. 0. 1. 0. 0.]
        # [0. 0. 0. 0. 0. 0. 1. 0.]
        # [0. 0. 0. 0. 0. 0. 0. 1.]]
        self._update_mat = np.eye(ndim, 2 * ndim)
		# array([[1., 0., 0., 0., 0., 0., 0., 0.],
        #         [0., 1., 0., 0., 0., 0., 0., 0.],
        #         [0., 0., 1., 0., 0., 0., 0., 0.],
        #         [0., 0., 0., 1., 0., 0., 0., 0.]])
        # Motion and observation uncertainty are chosen relative to the current
        # state estimate. These weights control the amount of uncertainty in
        # the model. This is a bit hacky.
        self._std_weight_position = 1. / 20
        self._std_weight_velocity = 1. / 160

定义了一些二维矩阵,这些矩阵的作用后面会介绍到,还定义了关于位置和速度的标准差。

VideoTracker的一系列初始化就是这样,下面看VideoTracker的方法:

def run(self):
        results = []
        idx_frame = 0
        while self.vdo.grab():
            idx_frame += 1
            if idx_frame % self.args.frame_interval:
                continue

            start = time.time()
            _, ori_im = self.vdo.retrieve()
            outputs = []
            if args.backbone == 'yolov3':
                im = cv2.cvtColor(ori_im, cv2.COLOR_BGR2RGB)
                bbox_xywh, cls_conf, cls_ids = self.detector(im)

                # select person class
                mask = cls_ids == 0

                bbox_xywh = bbox_xywh[mask]
                # bbox dilation just in case bbox too small, delete this line if using a better pedestrian detector
                bbox_xywh[:, 3:] *= 1.2
                cls_conf = cls_conf[mask]
                # do tracking
                if len(bbox_xywh) > 0 :
                    outputs = self.deepsort.update(bbox_xywh, cls_conf, im)
            elif args.backbone == 'retinaface':
                from retinaface import eval_widerface
                img = torch.from_numpy(ori_im).permute(2, 0, 1).unsqueeze(0).float().cuda()
                
                picked_boxes, picked_landmarks, picked_scores = eval_widerface.get_detections(img, self.detector, score_threshold=0.5, iou_threshold=0.3)
                
                bbox_xywh = []
                if np.array(picked_boxes).ndim == 3:
                    picked_boxes = np.squeeze(np.array(picked_boxes))
                for box in picked_boxes:
                    if box is None:
                        break
                    x,y,w,h = _xyxy_to_xywh(box)
                    box=[x,y,w,h]
                    bbox_xywh.append(box)
                bbox_xywh = np.array(bbox_xywh) 
                cls_conf = np.array(picked_scores)
                if np.array(cls_conf).ndim == 2:
                    cls_conf = np.squeeze(cls_conf) 
                
                # do tracking
                if len(bbox_xywh) > 0 :
                    outputs = self.deepsort.update(bbox_xywh, cls_conf, ori_im)
                
            
            # draw boxes for visualization
            if len(outputs) > 0:
                bbox_tlwh = []
                bbox_xyxy = outputs[:, :4]
                identities = outputs[:, -1]
                ori_im = draw_boxes(ori_im, bbox_xyxy, identities)

                for bb_xyxy in bbox_xyxy:
                    bbox_tlwh.append(self.deepsort._xyxy_to_tlwh(bb_xyxy))

                results.append((idx_frame - 1, bbox_tlwh, identities))

            end = time.time()

            if self.args.display:
                cv2.imshow("test", ori_im)
                cv2.waitKey(1)

            if self.args.save_path:
                self.writer.write(ori_im)

            # save results
            write_results(self.save_results_path, results, 'mot')

            # logging
            self.logger.info("time: {:.03f}s, fps: {:.03f}, detection numbers: {}, tracking numbers: {}" \
                             .format(end - start, 1 / (end - start), bbox_xywh.shape[0], len(outputs)))

对于每一帧图片,12到48行定义了基于两种检测器的跟踪算法,其中yolov3是行人检测的检测器,相对应于行人跟踪,retinaface为人脸检测的检测器,相对应于人脸跟踪。这里要注意的是第这里第21行,yolov3把检测框放大到了1.2倍,融入更多的背景信息一起判断。用的是yolov3的行人跟踪,
直接看第24行,当检测器检测到视野内有目标时,便采取跟踪的一系列操作。
self.deepsort.update():

def update(self, bbox_xywh, confidences, ori_img):
        self.height, self.width = ori_img.shape[:2]
        # generate detections
        features = self._get_features(bbox_xywh, ori_img)
        bbox_tlwh = self._xywh_to_tlwh(bbox_xywh)
        detections = [Detection(bbox_tlwh[i], conf, features[i]) for i,conf in enumerate(confidences) if conf>self.min_confidence]
        # run on non-maximum supression
        boxes = np.array([d.tlwh for d in detections])
        scores = np.array([d.confidence for d in detections])
        indices = non_max_suppression(boxes, self.nms_max_overlap, scores)
        detections = [detections[i] for i in indices]

        # update tracker
        self.tracker.predict()
        self.tracker.update(detections)

        # output bbox identities
        outputs = []
        for track in self.tracker.tracks:
            if not track.is_confirmed() or track.time_since_update > 1:
                continue
            box = track.to_tlwh()
            x1,y1,x2,y2 = self._tlwh_to_xyxy(box)
            track_id = track.track_id
            outputs.append(np.array([x1,y1,x2,y2,track_id], dtype=np.int))
        if len(outputs) > 0:
            outputs = np.stack(outputs,axis=0)
        return outputs

第四行根据检测框,通过特征提取网络得到特征信息。第五行self._xywh_to_tlwh是把坐标从图像中点的x,y坐标和长宽转换成左上角的x,y坐标和长宽。第6行至第11行是常规的目标检测后处理操作,通过置信度和nms过滤多余的bounding box。
self.tracker.predict():

def predict(self):
        """Propagate track state distributions one time step forward.

        This function should be called once every time step, before `update`.
        """
        for track in self.tracks:
            track.predict(self.kf)

初始帧图片啥也没干,

self.tracker.update(detections):

def update(self, detections):
        """Perform measurement update and track management.

        Parameters
        ----------
        detections : List[deep_sort.detection.Detection]
            A list of detections at the current time step.

        """
        # Run matching cascade.
        matches, unmatched_tracks, unmatched_detections = \
            self._match(detections)

        # Update track set.
        for track_idx, detection_idx in matches:
            self.tracks[track_idx].update(
                self.kf, detections[detection_idx])
        for track_idx in unmatched_tracks:
            self.tracks[track_idx].mark_missed()
        for detection_idx in unmatched_detections:
            self._initiate_track(detections[detection_idx])
        self.tracks = [t for t in self.tracks if not t.is_deleted()]

        # Update distance metric.
        active_targets = [t.track_id for t in self.tracks if t.is_confirmed()]
        features, targets = [], []
        for track in self.tracks:
            if not track.is_confirmed():
                continue
            features += track.features
            targets += [track.track_id for _ in track.features]
            track.features = []
        self.metric.partial_fit(
            np.asarray(features), np.asarray(targets), active_targets)

先来看20,21行,因为初始帧是没有目标可跟踪的,要先初始化。

to_xyah()的代码注释里是这样写的:Convert bounding box to format (center x, center y, aspect ratio, height), where the aspect ratio is width / height.

然后是卡尔曼滤波的initiate()函数:

def initiate(self, measurement):
        """Create track from unassociated measurement.

        Parameters
        ----------
        measurement : ndarray
            Bounding box coordinates (x, y, a, h) with center position (x, y),
            aspect ratio a, and height h.

        Returns
        -------
        (ndarray, ndarray)
            Returns the mean vector (8 dimensional) and covariance matrix (8x8
            dimensional) of the new track. Unobserved velocities are initialized
            to 0 mean.

        """
        mean_pos = measurement
        mean_vel = np.zeros_like(mean_pos) #初始速度为0
        # array([0., 0., 0., 0.])
        mean = np.r_[mean_pos, mean_vel] # np.r_是按列连接两个矩阵
        # array([6.04250183e+02, 4.01637024e+02, 7.10380774e-01, 7.42519043e+02,
        # 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00])

        std = [
            2 * self._std_weight_position * measurement[3],
            2 * self._std_weight_position * measurement[3],
            1e-2,
            2 * self._std_weight_position * measurement[3],
            10 * self._std_weight_velocity * measurement[3],
            10 * self._std_weight_velocity * measurement[3],
            1e-5,
            10 * self._std_weight_velocity * measurement[3]]
            # [74.251904296875, 74.251904296875, 0.01, 74.251904296875, 46.407440185546875, 46.407440185546875, 1e-05, 46.407440185546875]
        covariance = np.diag(np.square(std))
        # array([[5.51334529e+03, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
        #         [0.00000000e+00, 5.51334529e+03, 0.00000000e+00, 0.00000000e+00,0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
        #         [0.00000000e+00, 0.00000000e+00, 1.00000000e-04, 0.00000000e+00,0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
        #         [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 5.51334529e+03,0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
        #         [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,2.15365050e+03, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
        #         [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,0.00000000e+00, 2.15365050e+03, 0.00000000e+00, 0.00000000e+00],
        #         [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,0.00000000e+00, 0.00000000e+00, 1.00000000e-10, 0.00000000e+00],
        #         [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 2.15365050e+03]])
        return mean, covariance

self._std_weight_position和self._std_weight_velocity是什么提到的标准差。由代码注释可以看出初始化了协方差矩阵并返回均值和协方差矩阵。

继续看_initiate_track(self, detection):

def _initiate_track(self, detection):
        mean, covariance = self.kf.initiate(detection.to_xyah())
        self.tracks.append(Track(
            mean, covariance, self._next_id, self.n_init, self.max_age,
            detection.feature))
        self._next_id += 1

第四行初始化时,self._next_id为1,self.n_init为3,self.max_age为70。每添加一个Track,self._next_id加1。
Track类:

class Track:
    """
    A single target track with state space `(x, y, a, h)` and associated
    velocities, where `(x, y)` is the center of the bounding box, `a` is the
    aspect ratio and `h` is the height.

    Parameters
    ----------
    mean : ndarray
        Mean vector of the initial state distribution.
    covariance : ndarray
        Covariance matrix of the initial state distribution.
    track_id : int
        A unique track identifier.
    n_init : int
        Number of consecutive detections before the track is confirmed. The
        track state is set to `Deleted` if a miss occurs within the first
        `n_init` frames.
    max_age : int
        The maximum number of consecutive misses before the track state is
        set to `Deleted`.
    feature : Optional[ndarray]
        Feature vector of the detection this track originates from. If not None,
        this feature is added to the `features` cache.

    Attributes
    ----------
    mean : ndarray
        Mean vector of the initial state distribution.
    covariance : ndarray
        Covariance matrix of the initial state distribution.
    track_id : int
        A unique track identifier.
    hits : int
        Total number of measurement updates.
    age : int
        Total number of frames since first occurance.
    time_since_update : int
        Total number of frames since last measurement update.
    state : TrackState
        The current track state.
    features : List[ndarray]
        A cache of features. On each measurement update, the associated feature
        vector is added to this list.

    """

    def __init__(self, mean, covariance, track_id, n_init, max_age,
                 feature=None):
        self.mean = mean
        self.covariance = covariance
        self.track_id = track_id
        self.hits = 1
        self.age = 1
        self.time_since_update = 0

        self.state = TrackState.Tentative
        self.features = []
        if feature is not None:
            self.features.append(feature)

        self._n_init = n_init
        self._max_age = max_age

第57行为追踪器的状态。追踪器有如下几个状态:

class TrackState:
    """
    Enumeration type for the single target track state. Newly created tracks are
    classified as `tentative` until enough evidence has been collected. Then,
    the track state is changed to `confirmed`. Tracks that are no longer alive
    are classified as `deleted` to mark them for removal from the set of active
    tracks.

    """

    Tentative = 1
    Confirmed = 2
    Deleted = 3

第59,60行把features也保存了起来。
继续回朔到Tracker的update()方法:
self.tracker.update(detections):

def update(self, detections):
        """Perform measurement update and track management.

        Parameters
        ----------
        detections : List[deep_sort.detection.Detection]
            A list of detections at the current time step.

        """
        # Run matching cascade.
        matches, unmatched_tracks, unmatched_detections = \
            self._match(detections)

        # Update track set.
        for track_idx, detection_idx in matches:
            self.tracks[track_idx].update(
                self.kf, detections[detection_idx])
        for track_idx in unmatched_tracks:
            self.tracks[track_idx].mark_missed()
        for detection_idx in unmatched_detections:
            self._initiate_track(detections[detection_idx])
        self.tracks = [t for t in self.tracks if not t.is_deleted()]

        # Update distance metric.
        active_targets = [t.track_id for t in self.tracks if t.is_confirmed()]
        features, targets = [], []
        for track in self.tracks:
            if not track.is_confirmed():
                continue
            features += track.features
            targets += [track.track_id for _ in track.features]
            track.features = []
        self.metric.partial_fit(
            np.asarray(features), np.asarray(targets), active_targets)

初始帧因为没有目标可以追踪,所以第一帧的作用只是初始化Track,继续回朔到self.deepsort.update():

def update(self, bbox_xywh, confidences, ori_img):
        self.height, self.width = ori_img.shape[:2]
        # generate detections
        features = self._get_features(bbox_xywh, ori_img)
        bbox_tlwh = self._xywh_to_tlwh(bbox_xywh)
        detections = [Detection(bbox_tlwh[i], conf, features[i]) for i,conf in enumerate(confidences) if conf>self.min_confidence]
        # run on non-maximum supression
        boxes = np.array([d.tlwh for d in detections])
        scores = np.array([d.confidence for d in detections])
        indices = non_max_suppression(boxes, self.nms_max_overlap, scores)
        detections = [detections[i] for i in indices]

        # update tracker
        self.tracker.predict()
        self.tracker.update(detections)

        # output bbox identities
        outputs = []
        for track in self.tracker.tracks:
            if not track.is_confirmed() or track.time_since_update > 1:
                continue
            box = track.to_tlwh()
            x1,y1,x2,y2 = self._tlwh_to_xyxy(box)
            track_id = track.track_id
            outputs.append(np.array([x1,y1,x2,y2,track_id], dtype=np.int))
        if len(outputs) > 0:
            outputs = np.stack(outputs,axis=0)
        return outputs

14行self.tracker.predict():

def predict(self):
        """Propagate track state distributions one time step forward.

        This function should be called once every time step, before `update`.
        """
        for track in self.tracks:
            track.predict(self.kf)

初始帧因为self.tracks为空,当self.tracks不为空时,Track的predict方法:

def predict(self, kf):
        """Propagate the state distribution to the current time step using a
        Kalman filter prediction step.

        Parameters
        ----------
        kf : kalman_filter.KalmanFilter
            The Kalman filter.

        """
        self.mean, self.covariance = kf.predict(self.mean, self.covariance)
        self.age += 1
        self.time_since_update += 1

执行了卡尔曼滤波的predict方法,age和time_since_update加1,卡尔曼滤波的predict方法:

def predict(self, mean, covariance):
        """Run Kalman filter prediction step.

        Parameters
        ----------
        mean : ndarray
            The 8 dimensional mean vector of the object state at the previous
            time step.
        covariance : ndarray
            The 8x8 dimensional covariance matrix of the object state at the
            previous time step.

        Returns
        -------
        (ndarray, ndarray)
            Returns the mean vector and covariance matrix of the predicted
            state. Unobserved velocities are initialized to 0 mean.

        """
        std_pos = [
            self._std_weight_position * mean[3],
            self._std_weight_position * mean[3],
            1e-2,
            self._std_weight_position * mean[3]]
        std_vel = [
            self._std_weight_velocity * mean[3],
            self._std_weight_velocity * mean[3],
            1e-5,
            self._std_weight_velocity * mean[3]]
        motion_cov = np.diag(np.square(np.r_[std_pos, std_vel]))

        mean = np.dot(self._motion_mat, mean)
        covariance = np.linalg.multi_dot((
            self._motion_mat, covariance, self._motion_mat.T)) + motion_cov

        return mean, covariance

20到30行和上面一样,也是根据位置和速度信息求协方差。
第32行self._motion_mat由上面可知为8x8矩阵:

# [[1. 0. 0. 0. 1. 0. 0. 0.]
# [0. 1. 0. 0. 0. 1. 0. 0.]
# [0. 0. 1. 0. 0. 0. 1. 0.]
# [0. 0. 0. 1. 0. 0. 0. 1.]
# [0. 0. 0. 0. 1. 0. 0. 0.]
# [0. 0. 0. 0. 0. 1. 0. 0.]
# [0. 0. 0. 0. 0. 0. 1. 0.]
# [0. 0. 0. 0. 0. 0. 0. 1.]

x t = x t − 1 + Δ t ⋅ v t x_t=x_{t-1}+\Delta t\cdot v_t xt=xt1+Δtvt 
匀速运动模型(速度分量为1,并且不变化)。
第33-34行为协方差更新公式:
P = ( F ∗ P ∗ F T ) + Q P=(F*P*F^T)+Q P=(FPFT)+Q
具体原理参考:轻松理解卡尔曼滤波
self.deepsort.update()14行到这里便执行完了,接下来继续进入15行self.tracker.update(detections):

def update(self, detections):
        """Perform measurement update and track management.

        Parameters
        ----------
        detections : List[deep_sort.detection.Detection]
            A list of detections at the current time step.

        """
        # Run matching cascade.
        matches, unmatched_tracks, unmatched_detections = \
            self._match(detections)

        # Update track set.
        for track_idx, detection_idx in matches:
            self.tracks[track_idx].update(
                self.kf, detections[detection_idx])
        for track_idx in unmatched_tracks:
            self.tracks[track_idx].mark_missed()
        for detection_idx in unmatched_detections:
            self._initiate_track(detections[detection_idx])
        self.tracks = [t for t in self.tracks if not t.is_deleted()]

        # Update distance metric.
        active_targets = [t.track_id for t in self.tracks if t.is_confirmed()]
        features, targets = [], []
        for track in self.tracks:
            if not track.is_confirmed():
                continue
            features += track.features
            targets += [track.track_id for _ in track.features]
            track.features = []
        self.metric.partial_fit(
            np.asarray(features), np.asarray(targets), active_targets)

12行self._match:

def _match(self, detections):

        def gated_metric(tracks, dets, track_indices, detection_indices):
            features = np.array([dets[i].feature for i in detection_indices])
            targets = np.array([tracks[i].track_id for i in track_indices])
            cost_matrix = self.metric.distance(features, targets)
            cost_matrix = linear_assignment.gate_cost_matrix(
                self.kf, cost_matrix, tracks, dets, track_indices,
                detection_indices)

            return cost_matrix

        # Split track set into confirmed and unconfirmed tracks.
        confirmed_tracks = [
            i for i, t in enumerate(self.tracks) if t.is_confirmed()]
        unconfirmed_tracks = [
            i for i, t in enumerate(self.tracks) if not t.is_confirmed()]

        # Associate confirmed tracks using appearance features.
        matches_a, unmatched_tracks_a, unmatched_detections = \
            linear_assignment.matching_cascade(
                gated_metric, self.metric.matching_threshold, self.max_age,
                self.tracks, detections, confirmed_tracks)

        # Associate remaining tracks together with unconfirmed tracks using IOU.
        iou_track_candidates = unconfirmed_tracks + [
            k for k in unmatched_tracks_a if
            self.tracks[k].time_since_update == 1]
        unmatched_tracks_a = [
            k for k in unmatched_tracks_a if
            self.tracks[k].time_since_update != 1]
        matches_b, unmatched_tracks_b, unmatched_detections = \
            linear_assignment.min_cost_matching(
                iou_matching.iou_cost, self.max_iou_distance, self.tracks,
                detections, iou_track_candidates, unmatched_detections)

        matches = matches_a + matches_b
        unmatched_tracks = list(set(unmatched_tracks_a + unmatched_tracks_b))
        return matches, unmatched_tracks, unmatched_detections

13-17行把track分为confirmed和not confirmed,然后是21行linear_assignment.matching_cascade(
gated_metric, self.metric.matching_threshold, self.max_age,
self.tracks, detections, confirmed_tracks)

def matching_cascade(
        distance_metric, max_distance, cascade_depth, tracks, detections,
        track_indices=None, detection_indices=None):
    """Run matching cascade.

    Parameters
    ----------
    distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray
        The distance metric is given a list of tracks and detections as well as
        a list of N track indices and M detection indices. The metric should
        return the NxM dimensional cost matrix, where element (i, j) is the
        association cost between the i-th track in the given track indices and
        the j-th detection in the given detection indices.
    max_distance : float
        Gating threshold. Associations with cost larger than this value are
        disregarded.
    cascade_depth: int
        The cascade depth, should be se to the maximum track age.
    tracks : List[track.Track]
        A list of predicted tracks at the current time step.
    detections : List[detection.Detection]
        A list of detections at the current time step.
    track_indices : Optional[List[int]]
        List of track indices that maps rows in `cost_matrix` to tracks in
        `tracks` (see description above). Defaults to all tracks.
    detection_indices : Optional[List[int]]
        List of detection indices that maps columns in `cost_matrix` to
        detections in `detections` (see description above). Defaults to all
        detections.

    Returns
    -------
    (List[(int, int)], List[int], List[int])
        Returns a tuple with the following three entries:
        * A list of matched track and detection indices.
        * A list of unmatched track indices.
        * A list of unmatched detection indices.

    """
    if track_indices is None:
        track_indices = list(range(len(tracks)))
    if detection_indices is None:
        detection_indices = list(range(len(detections)))

    unmatched_detections = detection_indices
    matches = []
    
    for level in range(cascade_depth):
        if len(unmatched_detections) == 0:  # No detections left
            break

        track_indices_l = [
            k for k in track_indices
            if tracks[k].time_since_update == 1 + level
        ]
        if len(track_indices_l) == 0:  # Nothing to match at this level
            continue

        matches_l, _, unmatched_detections = \
            min_cost_matching(
                distance_metric, max_distance, tracks, detections,
                track_indices_l, unmatched_detections)
        matches += matches_l
    unmatched_tracks = list(set(track_indices) - set(k for k, _ in matches))
    return matches, unmatched_tracks, unmatched_detections

代码的注释已经很详细,翻译过来就是:距离度量给出了一个轨迹和检测列表,以及N个轨迹索引和M个检测索引列表。该度量应返回NxM维代价矩阵,其中元素(i,j)是给定轨道指标中第i条轨道与给定检测索引中第j条检测之间的关联成本。

第52-55行为查看有没有跟丢了的目标(跟丢70帧内,超过70帧放弃这个跟踪目标),如果发现有,则计算距离度量(59-62行):

def min_cost_matching(
        distance_metric, max_distance, tracks, detections, track_indices=None,
        detection_indices=None):
    """Solve linear assignment problem.

    Parameters
    ----------
    distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray
        The distance metric is given a list of tracks and detections as well as
        a list of N track indices and M detection indices. The metric should
        return the NxM dimensional cost matrix, where element (i, j) is the
        association cost between the i-th track in the given track indices and
        the j-th detection in the given detection_indices.
    max_distance : float
        Gating threshold. Associations with cost larger than this value are
        disregarded.
    tracks : List[track.Track]
        A list of predicted tracks at the current time step.
    detections : List[detection.Detection]
        A list of detections at the current time step.
    track_indices : List[int]
        List of track indices that maps rows in `cost_matrix` to tracks in
        `tracks` (see description above).
    detection_indices : List[int]
        List of detection indices that maps columns in `cost_matrix` to
        detections in `detections` (see description above).

    Returns
    -------
    (List[(int, int)], List[int], List[int])
        Returns a tuple with the following three entries:
        * A list of matched track and detection indices.
        * A list of unmatched track indices.
        * A list of unmatched detection indices.

    """
    if track_indices is None:
        track_indices = np.arange(len(tracks))
    if detection_indices is None:
        detection_indices = np.arange(len(detections))

    if len(detection_indices) == 0 or len(track_indices) == 0:
        return [], track_indices, detection_indices  # Nothing to match.

    cost_matrix = distance_metric(
        tracks, detections, track_indices, detection_indices)
    cost_matrix[cost_matrix > max_distance] = max_distance + 1e-5

    row_indices, col_indices = linear_assignment(cost_matrix)

    matches, unmatched_tracks, unmatched_detections = [], [], []
    for col, detection_idx in enumerate(detection_indices):
        if col not in col_indices:
            unmatched_detections.append(detection_idx)
    for row, track_idx in enumerate(track_indices):
        if row not in row_indices:
            unmatched_tracks.append(track_idx)
    for row, col in zip(row_indices, col_indices):
        track_idx = track_indices[row]
        detection_idx = detection_indices[col]
        if cost_matrix[row, col] > max_distance:
            unmatched_tracks.append(track_idx)
            unmatched_detections.append(detection_idx)
        else:
            matches.append((track_idx, detection_idx))
    return matches, unmatched_tracks, unmatched_detections

45行距离度量:

def gated_metric(tracks, dets, track_indices, detection_indices):
            features = np.array([dets[i].feature for i in detection_indices])
            targets = np.array([tracks[i].track_id for i in track_indices])
            cost_matrix = self.metric.distance(features, targets)
            cost_matrix = linear_assignment.gate_cost_matrix(
                self.kf, cost_matrix, tracks, dets, track_indices,
                detection_indices)

            return cost_matrix

self.metric.distance():

def distance(self, features, targets):
        """Compute distance between features and targets.

        Parameters
        ----------
        features : ndarray
            An NxM matrix of N features of dimensionality M.
        targets : List[int]
            A list of targets to match the given `features` against.

        Returns
        -------
        ndarray
            Returns a cost matrix of shape len(targets), len(features), where
            element (i, j) contains the closest squared distance between
            `targets[i]` and `features[j]`.

        """
        cost_matrix = np.zeros((len(targets), len(features)))
        for i, target in enumerate(targets):
            cost_matrix[i, :] = self._metric(self.samples[target], features)
        return cost_matrix

代码很直白,没什么好解释的,直接看他具体是如何计算距离的:
self._metric():

def _nn_cosine_distance(x, y):
    """ Helper function for nearest neighbor distance metric (cosine).
    Parameters
    ----------
    x : ndarray
        A matrix of N row-vectors (sample points).
    y : ndarray
        A matrix of M row-vectors (query points).

    Returns
    -------
    ndarray
        A vector of length M that contains for each entry in `y` the
        smallest cosine distance to a sample in `x`.

    """
    distances = _cosine_distance(x, y)
    return distances.min(axis=0)

计算当前帧每个新检测结果的深度特征与这一层中每个trk已保存的特征集之间的余弦距离矩阵cost_matrix(取最小值作为该trk与检测结果之间的计算值)

def _cosine_distance(a, b, data_is_normalized=False):
    """Compute pair-wise cosine distance between points in `a` and `b`.

    Parameters
    ----------
    a : array_like
        An NxM matrix of N samples of dimensionality M.
    b : array_like
        An LxM matrix of L samples of dimensionality M.
    data_is_normalized : Optional[bool]
        If True, assumes rows in a and b are unit length vectors.
        Otherwise, a and b are explicitly normalized to lenght 1.

    Returns
    -------
    ndarray
        Returns a matrix of size len(a), len(b) such that eleement (i, j)
        contains the squared distance between `a[i]` and `b[j]`.

    """
    if not data_is_normalized:
        a = np.asarray(a) / np.linalg.norm(a, axis=1, keepdims=True)
        b = np.asarray(b) / np.linalg.norm(b, axis=1, keepdims=True)
    return 1. - np.dot(a, b.T)

再回溯到gated_metric:

def gated_metric(tracks, dets, track_indices, detection_indices):
            features = np.array([dets[i].feature for i in detection_indices])
            targets = np.array([tracks[i].track_id for i in track_indices])
            cost_matrix = self.metric.distance(features, targets)
            cost_matrix = linear_assignment.gate_cost_matrix(
                self.kf, cost_matrix, tracks, dets, track_indices,
                detection_indices)

            return cost_matrix

但是,仅凭外观特征计算的相似度来判定两个物体为同一物体,可能误差还比较大,比如两个穿同款衣服的高度基本相同的人,所以还需要结合这两个人的位置来进一步判断(依据就是:相邻两帧之间时间间隔很短,人不能会在两帧之间的物理位置差异特别大),所以我们凭接着对某个track在当前帧的预测位置mean,让其与当前帧的目标detection的检测位置进行比较,来判断其为同一个人的可能性,具体操作体现在gate_cost_matrix():
linear_assignment.gate_cost_matrix:

# 基于卡尔曼滤波获得的状态分布,使成本矩阵中的不可行条目无效
def gate_cost_matrix(
        kf, cost_matrix, tracks, detections, track_indices, detection_indices,
        gated_cost=INFTY_COST, only_position=False):
    """Invalidate infeasible entries in cost matrix based on the state
    distributions obtained by Kalman filtering.

    Parameters
    ----------
    kf : The Kalman filter.
    cost_matrix : ndarray
        The NxM dimensional cost matrix, where N is the number of track indices
        and M is the number of detection indices, such that entry (i, j) is the
        association cost between `tracks[track_indices[i]]` and
        `detections[detection_indices[j]]`.
    tracks : List[track.Track]
        A list of predicted tracks at the current time step.
    detections : List[detection.Detection]
        A list of detections at the current time step.
    track_indices : List[int]
        List of track indices that maps rows in `cost_matrix` to tracks in
        `tracks` (see description above).
    detection_indices : List[int]
        List of detection indices that maps columns in `cost_matrix` to
        detections in `detections` (see description above).
    gated_cost : Optional[float]
        Entries in the cost matrix corresponding to infeasible associations are
        set this value. Defaults to a very large value.
    only_position : Optional[bool]
        If True, only the x, y position of the state distribution is considered
        during gating. Defaults to False.

    Returns
    -------
    ndarray
        Returns the modified cost matrix.

    """
    gating_dim = 2 if only_position else 4
    gating_threshold = kalman_filter.chi2inv95[gating_dim]
    # chi2inv95 = {
#     1: 3.8415,
#     2: 5.9915,
#     3: 7.8147,
#     4: 9.4877,
#     5: 11.070,
#     6: 12.592,
#     7: 14.067,
#     8: 15.507,
#     9: 16.919}
    measurements = np.asarray(
        [detections[i].to_xyah() for i in detection_indices])
    for row, track_idx in enumerate(track_indices):
        track = tracks[track_idx]
        gating_distance = kf.gating_distance(
            track.mean, track.covariance, measurements, only_position)
        cost_matrix[row, gating_distance > gating_threshold] = gated_cost
    return cost_matrix

52行to_xyah():Convert bounding box to format (center x, center y, aspect ratio,height), where the aspect ratio is width / height.
kf.gating_distance():

#这里基于卡尔曼滤波器,预测了track的均值和方差,然后计算出了当前track与所有的检测目标detections之间的门距离
def gating_distance(self, mean, covariance, measurements,
                        only_position=False):
        """Compute gating distance between state distribution and measurements.

        A suitable distance threshold can be obtained from `chi2inv95`. If
        `only_position` is False, the chi-square distribution has 4 degrees of
        freedom, otherwise 2.

        Parameters
        ----------
        mean : ndarray
            Mean vector over the state distribution (8 dimensional).
        covariance : ndarray
            Covariance of the state distribution (8x8 dimensional).
        measurements : ndarray
            An Nx4 dimensional matrix of N measurements, each in
            format (x, y, a, h) where (x, y) is the bounding box center
            position, a the aspect ratio, and h the height.
        only_position : Optional[bool]
            If True, distance computation is done with respect to the bounding
            box center position only.

        Returns
        -------
        ndarray
            Returns an array of length N, where the i-th element contains the
            squared Mahalanobis distance between (mean, covariance) and
            `measurements[i]`.

        """
        mean, covariance = self.project(mean, covariance)
        if only_position:
            mean, covariance = mean[:2], covariance[:2, :2]
            measurements = measurements[:, :2]

        cholesky_factor = np.linalg.cholesky(covariance)
        d = measurements - mean
        z = scipy.linalg.solve_triangular(
            cholesky_factor, d.T, lower=True, check_finite=False,
            overwrite_b=True)
        squared_maha = np.sum(z * z, axis=0)
        return squared_maha

self.project:

def project(self, mean, covariance):
        """Project state distribution to measurement space.

        Parameters
        ----------
        mean : ndarray
            The state's mean vector (8 dimensional array).
        covariance : ndarray
            The state's covariance matrix (8x8 dimensional).

        Returns
        -------
        (ndarray, ndarray)
            Returns the projected mean and covariance matrix of the given state
            estimate.

        """
        std = [
            self._std_weight_position * mean[3],
            self._std_weight_position * mean[3],
            1e-1,
            self._std_weight_position * mean[3]]
            # [37.09232278406579, 37.09232278406579, 0.1, 37.09232278406579]
        innovation_cov = np.diag(np.square(std))
        # array([[1.37584041e+03, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
        #         [0.00000000e+00, 1.37584041e+03, 0.00000000e+00, 0.00000000e+00],
        #         [0.00000000e+00, 0.00000000e+00, 1.00000000e-02, 0.00000000e+00],
        #         [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 1.37584041e+03]])

        mean = np.dot(self._update_mat, mean)# array([ 1.01493103e+03,  3.10279335e+02,  6.50138151e-01,  7.41846456e+02,5.15075825e-01, -2.37273481e+00,  1.51336479e-11, -2.75417309e+00])
        #array([1.01493103e+03, 3.10279335e+02, 6.50138151e-01, 7.41846456e+02])
        #self._update_mat
        # array([[1., 0., 0., 0., 0., 0., 0., 0.],
        #         [0., 1., 0., 0., 0., 0., 0., 0.],
        #         [0., 0., 1., 0., 0., 0., 0., 0.],
        #         [0., 0., 0., 1., 0., 0., 0., 0.]])
        covariance = np.linalg.multi_dot((
            self._update_mat, covariance, self._update_mat.T))
        return mean, covariance + innovation_cov

计算 H x H_x Hx S = ( H ∗ P ∗ H T ) + R S=(H*P*H^T)+R S=(HPT)+R,self._update_mat即对应卡尔曼滤波里的H。
回溯到gating_distance:

def gating_distance(self, mean, covariance, measurements,
                        only_position=False):
        """Compute gating distance between state distribution and measurements.

        A suitable distance threshold can be obtained from `chi2inv95`. If
        `only_position` is False, the chi-square distribution has 4 degrees of
        freedom, otherwise 2.

        Parameters
        ----------
        mean : ndarray
            Mean vector over the state distribution (8 dimensional).
        covariance : ndarray
            Covariance of the state distribution (8x8 dimensional).
        measurements : ndarray
            An Nx4 dimensional matrix of N measurements, each in
            format (x, y, a, h) where (x, y) is the bounding box center
            position, a the aspect ratio, and h the height.
        only_position : Optional[bool]
            If True, distance computation is done with respect to the bounding
            box center position only.

        Returns
        -------
        ndarray
            Returns an array of length N, where the i-th element contains the
            squared Mahalanobis distance between (mean, covariance) and
            `measurements[i]`.

        """
        mean, covariance = self.project(mean, covariance)
        # covariance
        # array([[5.49024634e+03, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
        #         [0.00000000e+00, 5.49024634e+03, 0.00000000e+00, 0.00000000e+00],
        #         [0.00000000e+00, 0.00000000e+00, 1.05517373e-02, 0.00000000e+00],
        #         [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 5.49024634e+03]])
        
        if only_position:
            mean, covariance = mean[:2], covariance[:2, :2]
            measurements = measurements[:, :2]

        cholesky_factor = np.linalg.cholesky(covariance)
        # array([[74.09619655,  0.        ,  0.        ,  0.        ],
        #         [ 0.        , 74.09619655,  0.        ,  0.        ],
        #         [ 0.        ,  0.        ,  0.10272165,  0.        ],
        #         [ 0.        ,  0.        ,  0.        , 74.09619655]])
        d = measurements - mean
        z = scipy.linalg.solve_triangular(
            cholesky_factor, d.T, lower=True, check_finite=False,
            overwrite_b=True)
        # array([[-0.16899016, -8.12494505],
        #         [-0.260157  ,  0.2972169 ],
        #         [-0.21768034,  3.05297728],
        #         [-0.67689182, -0.34832872]])
        squared_maha = np.sum(z * z, axis=0)
        return squared_maha

第42行为Cholesky分解。47行为马氏距离计算公式中对应的 d j − y i d_j-y_i djyi。scipy.linalg.solve_triangular 假设a是三角阵,求解x的等式a x = b。最后return马氏距离。
Cholesky 分解在计算马氏距离时的作用

回退到gate_cost_matrix:

# 基于卡尔曼滤波获得的状态分布,使成本矩阵中的不可行条目无效
def gate_cost_matrix(
        kf, cost_matrix, tracks, detections, track_indices, detection_indices,
        gated_cost=INFTY_COST, only_position=False):
    """Invalidate infeasible entries in cost matrix based on the state
    distributions obtained by Kalman filtering.

    Parameters
    ----------
    kf : The Kalman filter.
    cost_matrix : ndarray
        The NxM dimensional cost matrix, where N is the number of track indices
        and M is the number of detection indices, such that entry (i, j) is the
        association cost between `tracks[track_indices[i]]` and
        `detections[detection_indices[j]]`.
    tracks : List[track.Track]
        A list of predicted tracks at the current time step.
    detections : List[detection.Detection]
        A list of detections at the current time step.
    track_indices : List[int]
        List of track indices that maps rows in `cost_matrix` to tracks in
        `tracks` (see description above).
    detection_indices : List[int]
        List of detection indices that maps columns in `cost_matrix` to
        detections in `detections` (see description above).
    gated_cost : Optional[float]
        Entries in the cost matrix corresponding to infeasible associations are
        set this value. Defaults to a very large value.
    only_position : Optional[bool]
        If True, only the x, y position of the state distribution is considered
        during gating. Defaults to False.

    Returns
    -------
    ndarray
        Returns the modified cost matrix.

    """
    gating_dim = 2 if only_position else 4
    gating_threshold = kalman_filter.chi2inv95[gating_dim]
    
    measurements = np.asarray(
        [detections[i].to_xyah() for i in detection_indices])
    for row, track_idx in enumerate(track_indices):
        track = tracks[track_idx]
        gating_distance = kf.gating_distance(
            track.mean, track.covariance, measurements, only_position)
        cost_matrix[row, gating_distance > gating_threshold] = gated_cost
    return cost_matrix

在cost_matrix中,进行运动信息约束,对每个trk,计算其预测结果和检测结果之间的马氏距离,并将cost_matrix中,相应的trk的马氏距离大于阈值的置置为无穷大。
回溯到min_cost_matching:

def min_cost_matching(
        distance_metric, max_distance, tracks, detections, track_indices=None,
        detection_indices=None):
    """Solve linear assignment problem.

    Parameters
    ----------
    distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray
        The distance metric is given a list of tracks and detections as well as
        a list of N track indices and M detection indices. The metric should
        return the NxM dimensional cost matrix, where element (i, j) is the
        association cost between the i-th track in the given track indices and
        the j-th detection in the given detection_indices.
    max_distance : float
        Gating threshold. Associations with cost larger than this value are
        disregarded.
    tracks : List[track.Track]
        A list of predicted tracks at the current time step.
    detections : List[detection.Detection]
        A list of detections at the current time step.
    track_indices : List[int]
        List of track indices that maps rows in `cost_matrix` to tracks in
        `tracks` (see description above).
    detection_indices : List[int]
        List of detection indices that maps columns in `cost_matrix` to
        detections in `detections` (see description above).

    Returns
    -------
    (List[(int, int)], List[int], List[int])
        Returns a tuple with the following three entries:
        * A list of matched track and detection indices.
        * A list of unmatched track indices.
        * A list of unmatched detection indices.

    """
    if track_indices is None:
        track_indices = np.arange(len(tracks))
    if detection_indices is None:
        detection_indices = np.arange(len(detections))

    if len(detection_indices) == 0 or len(track_indices) == 0:
        return [], track_indices, detection_indices  # Nothing to match.

    cost_matrix = distance_metric(
        tracks, detections, track_indices, detection_indices)
    cost_matrix[cost_matrix > max_distance] = max_distance + 1e-5

    row_indices, col_indices = linear_assignment(cost_matrix)

    matches, unmatched_tracks, unmatched_detections = [], [], []
    for col, detection_idx in enumerate(detection_indices):
        if col not in col_indices:
            unmatched_detections.append(detection_idx)
    for row, track_idx in enumerate(track_indices):
        if row not in row_indices:
            unmatched_tracks.append(track_idx)
    for row, col in zip(row_indices, col_indices):
        track_idx = track_indices[row]
        detection_idx = detection_indices[col]
        if cost_matrix[row, col] > max_distance:
            unmatched_tracks.append(track_idx)
            unmatched_detections.append(detection_idx)
        else:
            matches.append((track_idx, detection_idx))
    return matches, unmatched_tracks, unmatched_detections

47-49行,将经由max_distance处理之后的cost_matrix作为匈牙利算法的输入,得到线性匹配的结果,并去除差距较大的匹配对。
回溯到_match:

def _match(self, detections):

        def gated_metric(tracks, dets, track_indices, detection_indices):
            features = np.array([dets[i].feature for i in detection_indices])
            targets = np.array([tracks[i].track_id for i in track_indices])
            cost_matrix = self.metric.distance(features, targets)
            cost_matrix = linear_assignment.gate_cost_matrix(
                self.kf, cost_matrix, tracks, dets, track_indices,
                detection_indices)

            return cost_matrix

        # Split track set into confirmed and unconfirmed tracks.
        confirmed_tracks = [
            i for i, t in enumerate(self.tracks) if t.is_confirmed()]
        unconfirmed_tracks = [
            i for i, t in enumerate(self.tracks) if not t.is_confirmed()]

        # Associate confirmed tracks using appearance features.
        matches_a, unmatched_tracks_a, unmatched_detections = \
            linear_assignment.matching_cascade(
                gated_metric, self.metric.matching_threshold, self.max_age,
                self.tracks, detections, confirmed_tracks)

        # Associate remaining tracks together with unconfirmed tracks using IOU.
        iou_track_candidates = unconfirmed_tracks + [
            k for k in unmatched_tracks_a if
            self.tracks[k].time_since_update == 1]
        unmatched_tracks_a = [
            k for k in unmatched_tracks_a if
            self.tracks[k].time_since_update != 1]
        matches_b, unmatched_tracks_b, unmatched_detections = \
            linear_assignment.min_cost_matching(
                iou_matching.iou_cost, self.max_iou_distance, self.tracks,
                detections, iou_track_candidates, unmatched_detections)

        matches = matches_a + matches_b
        unmatched_tracks = list(set(unmatched_tracks_a + unmatched_tracks_b))
        return matches, unmatched_tracks, unmatched_detections

unconfirmed tracks和没有匹配的tracker(unmatcher_tracks_a)一起组成iou_track_candidates,与还没有匹配上的检测结果(unmatched_detections)进行IOU匹配。缓解因为表现突变或者部分遮挡导致的较大特征变化。这样做也有可能导致一些新产生的轨迹被连接到了一些旧的轨迹上。
iou距离:

def iou_cost(tracks, detections, track_indices=None,
             detection_indices=None):
    """An intersection over union distance metric.

    Parameters
    ----------
    tracks : List[deep_sort.track.Track]
        A list of tracks.
    detections : List[deep_sort.detection.Detection]
        A list of detections.
    track_indices : Optional[List[int]]
        A list of indices to tracks that should be matched. Defaults to
        all `tracks`.
    detection_indices : Optional[List[int]]
        A list of indices to detections that should be matched. Defaults
        to all `detections`.

    Returns
    -------
    ndarray
        Returns a cost matrix of shape
        len(track_indices), len(detection_indices) where entry (i, j) is
        `1 - iou(tracks[track_indices[i]], detections[detection_indices[j]])`.

    """
    if track_indices is None:
        track_indices = np.arange(len(tracks))
    if detection_indices is None:
        detection_indices = np.arange(len(detections))

    cost_matrix = np.zeros((len(track_indices), len(detection_indices)))
    for row, track_idx in enumerate(track_indices):
        if tracks[track_idx].time_since_update > 1:
            cost_matrix[row, :] = linear_assignment.INFTY_COST
            continue

        bbox = tracks[track_idx].to_tlwh()
        candidates = np.asarray([detections[i].tlwh for i in detection_indices])
        cost_matrix[row, :] = 1. - iou(bbox, candidates)
    return cost_matrix
def iou(bbox, candidates):
    """Computer intersection over union.

    Parameters
    ----------
    bbox : ndarray
        A bounding box in format `(top left x, top left y, width, height)`.
    candidates : ndarray
        A matrix of candidate bounding boxes (one per row) in the same format
        as `bbox`.

    Returns
    -------
    ndarray
        The intersection over union in [0, 1] between the `bbox` and each
        candidate. A higher score means a larger fraction of the `bbox` is
        occluded by the candidate.

    """
    bbox_tl, bbox_br = bbox[:2], bbox[:2] + bbox[2:]
    candidates_tl = candidates[:, :2]
    candidates_br = candidates[:, :2] + candidates[:, 2:]

    tl = np.c_[np.maximum(bbox_tl[0], candidates_tl[:, 0])[:, np.newaxis],
               np.maximum(bbox_tl[1], candidates_tl[:, 1])[:, np.newaxis]]
    br = np.c_[np.minimum(bbox_br[0], candidates_br[:, 0])[:, np.newaxis],
               np.minimum(bbox_br[1], candidates_br[:, 1])[:, np.newaxis]]
    wh = np.maximum(0., br - tl)

    area_intersection = wh.prod(axis=1)
    area_bbox = bbox[2:].prod()
    area_candidates = candidates[:, 2:].prod(axis=1)
    return area_intersection / (area_bbox + area_candidates - area_intersection)

然后合并用余弦距离和IOU距离进行匹配的结果。
然后是一些跟新操作:
卡尔曼更新:

def update(self, mean, covariance, measurement):
        """Run Kalman filter correction step.

        Parameters
        ----------
        mean : ndarray
            The predicted state's mean vector (8 dimensional).
        covariance : ndarray
            The state's covariance matrix (8x8 dimensional).
        measurement : ndarray
            The 4 dimensional measurement vector (x, y, a, h), where (x, y)
            is the center position, a the aspect ratio, and h the height of the
            bounding box.

        Returns
        -------
        (ndarray, ndarray)
            Returns the measurement-corrected state distribution.

        """
        projected_mean, projected_cov = self.project(mean, covariance)

        chol_factor, lower = scipy.linalg.cho_factor(
            projected_cov, lower=True, check_finite=False)
        kalman_gain = scipy.linalg.cho_solve(
            (chol_factor, lower), np.dot(covariance, self._update_mat.T).T,
            check_finite=False).T
        innovation = measurement - projected_mean

        new_mean = mean + np.dot(innovation, kalman_gain.T)
        new_covariance = covariance - np.linalg.multi_dot((
            kalman_gain, projected_cov, kalman_gain.T))
        return new_mean, new_covariance

Track更新:

def update(self, kf, detection):
        """Perform Kalman filter measurement update step and update the feature
        cache.

        Parameters
        ----------
        kf : kalman_filter.KalmanFilter
            The Kalman filter.
        detection : Detection
            The associated detection.

        """
        self.mean, self.covariance = kf.update(
            self.mean, self.covariance, detection.to_xyah())
        self.features.append(detection.feature)

        self.hits += 1
        self.time_since_update = 0
        if self.state == TrackState.Tentative and self.hits >= self._n_init:
            self.state = TrackState.Confirmed

把det的深度特征保存在这个trk的特征集中。hits+1,time_since_update重置为0。hits>3时trk的状态由tentative改为confirmed。tentative阶段的trk没有匹配结果则状态直接转换为deleted。
然后把结果添加到output。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

deepsort代码解析 的相关文章

  • deepsort代码解析

    DeepSort代码解析 项目地址 xff1a deepsort span class token keyword if span name span class token operator 61 61 span span class t
  • 解决yolov5_deepsort报错:no module named “torchreid”

    遇到了同样的问题 xff0c 直接pip install torchreid没用 xff0c 从GitHub上找到了解决方法 https github com phil bergmann tracking wo bnw issues 159
  • TensorRT量化工具pytorch_quantization代码解析(一)

    量化工具箱pytorch quantization 通过提供一个方便的 PyTorch 库来补充 TensorRT xff0c 该库有助于生成可优化的 QAT 模型 该工具包提供了一个 API 来自动或手动为 QAT 或 PTQ 准备模型
  • TensorRT量化工具pytorch_quantization代码解析(二)

    后续继续补充 xff01 继续看张量量化函数 xff0c 代码位于 xff1a tools pytorch quantization pytorch quantization tensor quant py ScaledQuantDescr
  • TensorRT量化工具pytorch_quantization代码解析(四)

    继续看pytorch quantiation calib 中Calibrator类 xff0c 代码位于 xff1a tools pytorch quantization pytorch quantization calib 其作用 xff
  • PX4代码解析:振动分析

    本篇文章首发于公众号 xff1a 无人机系统技术 更多无人机技术相关文章请关注此公众号 一 前言 前面的文章主要都是一些理论知识为主 xff0c 很多读者朋友看了之后可能会有点枯燥 xff0c 里面很多公式看起来也比较晦涩 xff0c 今天
  • 位置控制器PX4代码解析(文中有福利!!!)

    号外号外 xff01 xff01 xff01 本公众号将联合电子工业出版社于9月11号送出15本价值98元的全权老师著作 多旋翼飞行器设计与控制 xff0c 关注本公众号的朋友均可参加 xff0c 敬请期待 还没关注的朋友赶紧关注吧 xff
  • EGO Planner代码解析bspline_optimizer部分(3)

    1 int BsplineOptimizer earlyExit void func data const double x const double g const double fx const double xnorm const d
  • vins-mobile代码解析3:drawAR

    AR功能主要流程是 xff1a 先检测平面 xff0c 然后计算AR物体 xff08 box xff09 的各个顶点3d位置 xff0c 然后写程序把每个顶点投影到相平面 xff0c 用cv的多边形填充功能来绘制到照片上面 先求一个相机朝向
  • [jetson浅试] yolov5+deepsort+Tensorrt C++部署(Xavier AGX)

    1 简介 xff1a 这学期刚开学的时候搞的 xff0c 空下来整理一些 xff08 以后还是应该养成边搞边写博客的好习惯 xff09 本文主要是对yolov5 deepsort tensorrt A c 43 43 implementat
  • PX4代码解析(1)

    前言 做pixhawk飞控有一段时间了 xff0c 但在学习过程中遇到许多困难 xff0c 目前网上找不到比较完整的PX4学习笔记 xff0c 我打算结合自己理解 xff0c 写写自己对PX4源码的理解 xff0c 不一定对 xff0c 只
  • PX4代码解析(2)

    前言 在大致了解PX4代码架构后 xff0c 我们需要了解PX4的通信机制 在PX4代码架构中 xff0c 每通信总线主要分为两个部分 xff0c 一是内部通信总线uORB 即PX4内部进程通信采用的协议 xff0c 例如PX4内部姿态控制
  • px4代码解析(3)

    前言 在使用PX4飞控时 xff0c 我们难免要对其进行二次开发 xff0c 例如修改控制算法 xff0c 添加新的传感器 xff0c 这时需要在代码中定义属于自己的消息 本节主要分享一下如何在PX4代码中添加自己的消息 一 消息添加与配置
  • PX4代码解析(5)

    一 前言 我所讨论的PX4代码是基于v1 11版本 xff0c 该版本与之前的版本有不少不同 xff0c 其中一个比较大的区别在于新版本大部分用到了C 43 43 中模板 xff0c 使得代码没有以前那么容易理解 xff0c 因此我在后面介
  • deepsort C++代码关于卡尔曼滤波的一些思考

    卡尔曼滤波公式 xff1a 以上公式 xff0c 在C 43 43 代码中 xff0c 状态估计x用mean表示 xff0c 协方差P用covariance表示 xff0c 状态转换模型F用 motion mat表示 xff0c 观测模型H
  • RTKlib PPP代码解析

    文章目录 ppposudstate pppudbias pppcorr measppp res 欢迎关注个人公众号 xff1a 导航员学习札记 我所基于的代码版本是RTKlib 2 4 3的一个拓展版本RTKexplore Demo5 xf
  • stm32驱动NRF24L01_原理+代码解析

    目录 概念 废话篇 xff08 24L01简介 xff09 引脚分配 工作模式 通信地址理解 xff08 个人疑难点 xff09 原理分析 寄存器赏析 寄存器操作指令 配置寄存器 xff08 CONFIG xff0c 位置 xff1a 0X
  • Jetson Xavier NX使用yolov5+deepsort实现CSI摄像头的目标跟踪

    文章目录 安装过程运行效果用python代码来打开CSI摄像头实现CSI摄像头目标跟踪报错 AttributeError 39 NoneType 39 object has no attribute 39 shape 39 运行效果 参考文
  • 【深度学习】yolov5+deepsort 完成计数和行人行人重识别的追踪

    文章目录 前言 1 知识体系 1 1 前置说明 1 2 Sort的工作流程 1 3 deepsort 2 实践应用 3 干货补充 总结 前言 行人重识别是计算机视觉的基本任务之一 首先要有一个detector 检测器来检测到目标 然后将检测
  • StrongSORT(deepsort强化版)学习体会

    少废话 先做备忘录 StrongSORT deepsort强化版 浅实战 代码解析 参考 https blog csdn net weixin 50862344 article details 127070391 https github

随机推荐

  • k8s client-go workqueue

    1 基础队列 1 1 基础队列接口 type Interface interface Add item interface 向队列中添加一个元素 xff0c interface 类型 xff0c 说明可以添加任何类型的元素 Len int
  • 相机与imu的标定(Kalibr)

    在进行vio算法开发前最重要的事是对设备内参外参的标定 xff0c 其准确性直接决定了算法的有效性 xff0e 这里我将对最著名的kalibr标定工具的使用步骤进行说明 xff0c 包括安装 相机标定 imu标定 相机与imu联合标定等步骤
  • 解决cv_bridge依赖opencv版本问题

    1 问题来源 在安装ros的过程中 xff0c 系统会默认安装cv bridge库 xff0c 但该库指定了依赖的opencv库路径 xff0c 拿ros melodic版本来说 xff0c 默认依赖opencv库 usr lib x86
  • 使用ORB_SLAM3运行Realsense T265

    关于硬件 官网说明 使用说明 Realsense T265是一款跟踪相机 xff0c 配有两个FOV为111 7 x 108 6的广角相机 xff0c 并且带有IMU BMI055 惯性测量单元 设备内部配有vpu处理器并嵌入了建图和定位算
  • ceres-solver和g2o性能比较

    前言 ceres solver 和 g2o 是slam领域常见的优化器 xff0c 其中ceres solver被vins mono使用 xff0c 而g2o被orb slam3使用 xff0c 因此到底哪个优化器更适合于在slam算法开发
  • FreeRTOS的vTaskDelete使用说明

    FreeRTOS的vTaskDelete使用说明 函数说明 参数 xff1a xTaskToDelete 要删除的任务的任务句柄 返回值 无 说明 删除一个用函数xTaskCreate 或者xTaskCreateStatic 创建的任务 x
  • 机器学习——随机森林(Random Forest)

    1 随机森林 xff08 random forest xff09 简介 随机森林是一种集成算法 xff08 Ensemble Learning xff09 xff0c 它属于Bagging类型 xff0c 通过组合多个弱分类器 xff0c
  • 《基础知识——C和C++的主要区别》

    C和C 43 43 的主要区别 设计思想上 xff1a C 43 43 是面向对象的语言 xff0c 而C是面向过程的结构化编程语言 语法上 xff1a C 43 43 具有封装 继承和多态三种特性 C 43 43 相比C xff0c 增加
  • 数据库原理及应用(十三)E-R图、关系模式

    数据库设计的过程 数据分析 gt 数据建模 gt 关系数据库模式 gt 关系数据库管理 用户需求 gt 概念模型 E R Model gt 逻辑模型 xff08 三层结构 xff09 现实世界 gt 信息世界 gt 机器世界 概念设计工具E
  • Ubuntu数据备份与恢复工具(一)

    在我们日常工作中 xff0c 个人文件 业务数据及应用信息的备份与恢复策略是一个重要的环节 意外删除 硬件故障 操作失误 网络攻击 xff0c 甚至是自然灾害 xff0c 都可以直接或间接导不可估价的数据损失 为了避免损失 xff0c 缩少
  • 百度移动端面试回忆

    百度一面 xff1a 1 自我介绍 2 悲观锁和乐观锁 乐观锁 xff1a 总是认为不会产生并发问题 xff0c 每次去取数据的时候总认为不会有其他线程对数据进行修改 xff0c 因此不会上锁 xff0c 但是在更新时会判断其他线程在这之前
  • Quagga编译安装

    Quagga源码编译安装 1 Quagga下载 1 官网下载quagga 1 2 4 tar gz并拖入虚拟机桌面 2 解压到 opt目录下 sudo tar zxvf Desktop quagga 1 2 4 tar gz C opt 2
  • VINS-FUSION 源码 双目 单线程 按执行顺序阅读

    VINS FUSION 源码 双目 单线程 按执行顺序阅读 Keywords xff1a VINS FUSION vins 源码解读 源码梳理 vins数据结构 vinsfusion vins双目 双目vins 双目vinsfusion 双
  • 【C语言】__attribute__使用

    一 介绍 GNU C 的一大特色就是 attribute 机制attribute 可以设置函数属性 xff08 Function Attribute xff09 变量属性 xff08 Variable Attribute xff09 和类型
  • Ubuntu20.04下CUDA、cuDNN的详细安装与配置过程(图文)

    Ubuntu20 04下CUDA cuDNN的详细安装与配置过程 xff0c 亲测试可用 xff08 图文 xff09 一 NVIDIA xff08 英伟达 xff09 显卡驱动安装1 1 关闭系统自带驱动nouveau2 2 NVIDIA
  • 使用动量(Momentum)的SGD、使用Nesterov动量的SGD

    使用动量 Momentum 的SGD 使用Nesterov动量的SGD 参考 xff1a 使用动量 Momentum 的SGD 使用Nesterov动量的SGD 一 使用动量 Momentum 的随机梯度下降 虽然随机梯度下降是非常受欢迎的
  • Data Uncertainty Learning in Face Recognition

    Data Uncertainty Learning in Face Recognition 建模数据的不确定性对含噪音图像非常重要 xff0c 但对于人脸识别的研究却很少 先驱者的工作 35 通过将每个人脸图像嵌入建模为高斯分布来考虑不确定
  • ENAS代码解读

    ENAS代码解读 参考代码 xff1a https github com TDeVries enas pytorch 数据集 xff1a cifar10 main函数 xff1a span class token keyword def s
  • PC-DARTS Partial Channel Connections for Memory-Efficient Differentiable Architecture Search

    PC DARTS Partial Channel Connections for Memory Efficient Differentiable Architecture Search Abstract 可微体系结构搜索 xff08 DAR
  • deepsort代码解析

    DeepSort代码解析 项目地址 xff1a deepsort span class token keyword if span name span class token operator 61 61 span span class t