



if __name__ == "__main__":
    args = parse_args()
    cfg = get_config()

    with VideoTracker(cfg, args, video_path=args.VIDEO_PATH) as vdo_trk:


  CFG: "./detector/YOLOv3/cfg/yolo_v3.cfg"
  WEIGHT: "./detector/YOLOv3/weight/yolov3.weights"
  CLASS_NAMES: "./detector/YOLOv3/cfg/coco.names"



  REID_CKPT: "./deep_sort/deep/checkpoint/ckpt.t7"
  MAX_DIST: 0.2
  MAX_AGE: 70
  N_INIT: 3
  NN_BUDGET: 100


class VideoTracker(object):
    def __init__(self, cfg, args, video_path):
        self.cfg = cfg
        self.args = args
        self.video_path = video_path
        self.logger = get_logger("root")

        use_cuda = args.use_cuda and torch.cuda.is_available()
        if not use_cuda:
            warnings.warn("Running in cpu mode which maybe very slow!", UserWarning)

        if args.display:
            cv2.namedWindow("test", cv2.WINDOW_NORMAL)
            cv2.resizeWindow("test", args.display_width, args.display_height)

        if != -1:
            print("Using webcam " + str(
            self.vdo = cv2.VideoCapture(
            self.vdo = cv2.VideoCapture()
        if args.backbone == 'retinaface':
            return_layers = {'layer2':1,'layer3':2,'layer4':3}
            from retinaface import anchors,losses,torchvision_model,utils
            RetinaFace = torchvision_model.create_retinaface(return_layers)
            retina_dict = RetinaFace.state_dict()
            pre_state_dict = torch.load('')
            pretrained_dict = {k[7:]: v for k, v in pre_state_dict.items() if k[7:] in retina_dict}
            RetinaFace = RetinaFace.cuda()
            self.detector = RetinaFace
        elif args.backbone == 'yolov3':
            self.detector = build_detector(cfg, use_cuda=use_cuda)
            self.class_names = self.detector.class_names
        self.deepsort = build_tracker(cfg, use_cuda=use_cuda)

前面就是一些判断CUDA,调用摄像头,加载模型的操作,再来看 build_tracker():

def build_tracker(cfg, use_cuda):
    return DeepSort(cfg.DEEPSORT.REID_CKPT, 
                max_dist=cfg.DEEPSORT.MAX_DIST, min_confidence=cfg.DEEPSORT.MIN_CONFIDENCE, 
                nms_max_overlap=cfg.DEEPSORT.NMS_MAX_OVERLAP, max_iou_distance=cfg.DEEPSORT.MAX_IOU_DISTANCE, 
                max_age=cfg.DEEPSORT.MAX_AGE, n_init=cfg.DEEPSORT.N_INIT, nn_budget=cfg.DEEPSORT.NN_BUDGET, use_cuda=use_cuda)


class DeepSort(object):
    def __init__(self, model_path, max_dist=0.2, min_confidence=0.3, nms_max_overlap=1.0, max_iou_distance=0.7, max_age=70, n_init=3, nn_budget=100, use_cuda=True):
        self.min_confidence = min_confidence
        self.nms_max_overlap = nms_max_overlap

        self.extractor = Extractor(model_path, use_cuda=use_cuda)

        max_cosine_distance = max_dist
        nn_budget = 100
        metric = NearestNeighborDistanceMetric("cosine", max_cosine_distance, nn_budget)
        self.tracker = Tracker(metric, max_iou_distance=max_iou_distance, max_age=max_age, n_init=n_init)



class Tracker:
    This is the multi-target tracker.

    metric : nn_matching.NearestNeighborDistanceMetric
        A distance metric for measurement-to-track association.
    max_age : int
        Maximum number of missed misses before a track is deleted.
    n_init : int
        Number of consecutive detections before the track is confirmed. The
        track state is set to `Deleted` if a miss occurs within the first
        `n_init` frames.

    metric : nn_matching.NearestNeighborDistanceMetric
        The distance metric used for measurement to track association.
    max_age : int
        Maximum number of missed misses before a track is deleted.
    n_init : int
        Number of frames that a track remains in initialization phase.
    kf : kalman_filter.KalmanFilter
        A Kalman filter to filter target trajectories in image space.
    tracks : List[Track]
        The list of active tracks at the current time step.


    def __init__(self, metric, max_iou_distance=0.7, max_age=70, n_init=3):
        self.metric = metric
        self.max_iou_distance = max_iou_distance
        self.max_age = max_age
        self.n_init = n_init

        self.kf = kalman_filter.KalmanFilter()
        self.tracks = []
        self._next_id = 1


class KalmanFilter(object):
    A simple Kalman filter for tracking bounding boxes in image space.

    The 8-dimensional state space

        x, y, a, h, vx, vy, va, vh

    contains the bounding box center position (x, y), aspect ratio a, height h,
    and their respective velocities.

    Object motion follows a constant velocity model. The bounding box location
    (x, y, a, h) is taken as direct observation of the state space (linear
    observation model).


    def __init__(self):
        ndim, dt = 4, 1.

        # Create Kalman filter model matrices.
        self._motion_mat = np.eye(2 * ndim, 2 * ndim)
        #     array([[1., 0., 0., 0., 0., 0., 0., 0.],
    	#    [0., 1., 0., 0., 0., 0., 0., 0.],
    	#    [0., 0., 1., 0., 0., 0., 0., 0.],
    	#    [0., 0., 0., 1., 0., 0., 0., 0.],
    	#    [0., 0., 0., 0., 1., 0., 0., 0.],
    	#    [0., 0., 0., 0., 0., 1., 0., 0.],
    	#    [0., 0., 0., 0., 0., 0., 1., 0.],
    	#    [0., 0., 0., 0., 0., 0., 0., 1.]])
        for i in range(ndim):
            self._motion_mat[i, ndim + i] = dt
        # [[1. 0. 0. 0. 1. 0. 0. 0.]
        # [0. 1. 0. 0. 0. 1. 0. 0.]
        # [0. 0. 1. 0. 0. 0. 1. 0.]
        # [0. 0. 0. 1. 0. 0. 0. 1.]
        # [0. 0. 0. 0. 1. 0. 0. 0.]
        # [0. 0. 0. 0. 0. 1. 0. 0.]
        # [0. 0. 0. 0. 0. 0. 1. 0.]
        # [0. 0. 0. 0. 0. 0. 0. 1.]]
        self._update_mat = np.eye(ndim, 2 * ndim)
		# array([[1., 0., 0., 0., 0., 0., 0., 0.],
        #         [0., 1., 0., 0., 0., 0., 0., 0.],
        #         [0., 0., 1., 0., 0., 0., 0., 0.],
        #         [0., 0., 0., 1., 0., 0., 0., 0.]])
        # Motion and observation uncertainty are chosen relative to the current
        # state estimate. These weights control the amount of uncertainty in
        # the model. This is a bit hacky.
        self._std_weight_position = 1. / 20
        self._std_weight_velocity = 1. / 160



def run(self):
        results = []
        idx_frame = 0
        while self.vdo.grab():
            idx_frame += 1
            if idx_frame % self.args.frame_interval:

            start = time.time()
            _, ori_im = self.vdo.retrieve()
            outputs = []
            if args.backbone == 'yolov3':
                im = cv2.cvtColor(ori_im, cv2.COLOR_BGR2RGB)
                bbox_xywh, cls_conf, cls_ids = self.detector(im)

                # select person class
                mask = cls_ids == 0

                bbox_xywh = bbox_xywh[mask]
                # bbox dilation just in case bbox too small, delete this line if using a better pedestrian detector
                bbox_xywh[:, 3:] *= 1.2
                cls_conf = cls_conf[mask]
                # do tracking
                if len(bbox_xywh) > 0 :
                    outputs = self.deepsort.update(bbox_xywh, cls_conf, im)
            elif args.backbone == 'retinaface':
                from retinaface import eval_widerface
                img = torch.from_numpy(ori_im).permute(2, 0, 1).unsqueeze(0).float().cuda()
                picked_boxes, picked_landmarks, picked_scores = eval_widerface.get_detections(img, self.detector, score_threshold=0.5, iou_threshold=0.3)
                bbox_xywh = []
                if np.array(picked_boxes).ndim == 3:
                    picked_boxes = np.squeeze(np.array(picked_boxes))
                for box in picked_boxes:
                    if box is None:
                    x,y,w,h = _xyxy_to_xywh(box)
                bbox_xywh = np.array(bbox_xywh) 
                cls_conf = np.array(picked_scores)
                if np.array(cls_conf).ndim == 2:
                    cls_conf = np.squeeze(cls_conf) 
                # do tracking
                if len(bbox_xywh) > 0 :
                    outputs = self.deepsort.update(bbox_xywh, cls_conf, ori_im)
            # draw boxes for visualization
            if len(outputs) > 0:
                bbox_tlwh = []
                bbox_xyxy = outputs[:, :4]
                identities = outputs[:, -1]
                ori_im = draw_boxes(ori_im, bbox_xyxy, identities)

                for bb_xyxy in bbox_xyxy:

                results.append((idx_frame - 1, bbox_tlwh, identities))

            end = time.time()

            if self.args.display:
                cv2.imshow("test", ori_im)

            if self.args.save_path:

            # save results
            write_results(self.save_results_path, results, 'mot')

            # logging
  "time: {:.03f}s, fps: {:.03f}, detection numbers: {}, tracking numbers: {}" \
                             .format(end - start, 1 / (end - start), bbox_xywh.shape[0], len(outputs)))


def update(self, bbox_xywh, confidences, ori_img):
        self.height, self.width = ori_img.shape[:2]
        # generate detections
        features = self._get_features(bbox_xywh, ori_img)
        bbox_tlwh = self._xywh_to_tlwh(bbox_xywh)
        detections = [Detection(bbox_tlwh[i], conf, features[i]) for i,conf in enumerate(confidences) if conf>self.min_confidence]
        # run on non-maximum supression
        boxes = np.array([d.tlwh for d in detections])
        scores = np.array([d.confidence for d in detections])
        indices = non_max_suppression(boxes, self.nms_max_overlap, scores)
        detections = [detections[i] for i in indices]

        # update tracker

        # output bbox identities
        outputs = []
        for track in self.tracker.tracks:
            if not track.is_confirmed() or track.time_since_update > 1:
            box = track.to_tlwh()
            x1,y1,x2,y2 = self._tlwh_to_xyxy(box)
            track_id = track.track_id
        if len(outputs) > 0:
            outputs = np.stack(outputs,axis=0)
        return outputs

第四行根据检测框,通过特征提取网络得到特征信息。第五行self._xywh_to_tlwh是把坐标从图像中点的x,y坐标和长宽转换成左上角的x,y坐标和长宽。第6行至第11行是常规的目标检测后处理操作,通过置信度和nms过滤多余的bounding box。

def predict(self):
        """Propagate track state distributions one time step forward.

        This function should be called once every time step, before `update`.
        for track in self.tracks:



def update(self, detections):
        """Perform measurement update and track management.

        detections : List[deep_sort.detection.Detection]
            A list of detections at the current time step.

        # Run matching cascade.
        matches, unmatched_tracks, unmatched_detections = \

        # Update track set.
        for track_idx, detection_idx in matches:
                self.kf, detections[detection_idx])
        for track_idx in unmatched_tracks:
        for detection_idx in unmatched_detections:
        self.tracks = [t for t in self.tracks if not t.is_deleted()]

        # Update distance metric.
        active_targets = [t.track_id for t in self.tracks if t.is_confirmed()]
        features, targets = [], []
        for track in self.tracks:
            if not track.is_confirmed():
            features += track.features
            targets += [track.track_id for _ in track.features]
            track.features = []
            np.asarray(features), np.asarray(targets), active_targets)


to_xyah()的代码注释里是这样写的:Convert bounding box to format (center x, center y, aspect ratio, height), where the aspect ratio is width / height.


def initiate(self, measurement):
        """Create track from unassociated measurement.

        measurement : ndarray
            Bounding box coordinates (x, y, a, h) with center position (x, y),
            aspect ratio a, and height h.

        (ndarray, ndarray)
            Returns the mean vector (8 dimensional) and covariance matrix (8x8
            dimensional) of the new track. Unobserved velocities are initialized
            to 0 mean.

        mean_pos = measurement
        mean_vel = np.zeros_like(mean_pos) #初始速度为0
        # array([0., 0., 0., 0.])
        mean = np.r_[mean_pos, mean_vel] # np.r_是按列连接两个矩阵
        # array([6.04250183e+02, 4.01637024e+02, 7.10380774e-01, 7.42519043e+02,
        # 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00])

        std = [
            2 * self._std_weight_position * measurement[3],
            2 * self._std_weight_position * measurement[3],
            2 * self._std_weight_position * measurement[3],
            10 * self._std_weight_velocity * measurement[3],
            10 * self._std_weight_velocity * measurement[3],
            10 * self._std_weight_velocity * measurement[3]]
            # [74.251904296875, 74.251904296875, 0.01, 74.251904296875, 46.407440185546875, 46.407440185546875, 1e-05, 46.407440185546875]
        covariance = np.diag(np.square(std))
        # array([[5.51334529e+03, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
        #         [0.00000000e+00, 5.51334529e+03, 0.00000000e+00, 0.00000000e+00,0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
        #         [0.00000000e+00, 0.00000000e+00, 1.00000000e-04, 0.00000000e+00,0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
        #         [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 5.51334529e+03,0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
        #         [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,2.15365050e+03, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
        #         [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,0.00000000e+00, 2.15365050e+03, 0.00000000e+00, 0.00000000e+00],
        #         [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,0.00000000e+00, 0.00000000e+00, 1.00000000e-10, 0.00000000e+00],
        #         [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 2.15365050e+03]])
        return mean, covariance


继续看_initiate_track(self, detection):

def _initiate_track(self, detection):
        mean, covariance = self.kf.initiate(detection.to_xyah())
            mean, covariance, self._next_id, self.n_init, self.max_age,
        self._next_id += 1


class Track:
    A single target track with state space `(x, y, a, h)` and associated
    velocities, where `(x, y)` is the center of the bounding box, `a` is the
    aspect ratio and `h` is the height.

    mean : ndarray
        Mean vector of the initial state distribution.
    covariance : ndarray
        Covariance matrix of the initial state distribution.
    track_id : int
        A unique track identifier.
    n_init : int
        Number of consecutive detections before the track is confirmed. The
        track state is set to `Deleted` if a miss occurs within the first
        `n_init` frames.
    max_age : int
        The maximum number of consecutive misses before the track state is
        set to `Deleted`.
    feature : Optional[ndarray]
        Feature vector of the detection this track originates from. If not None,
        this feature is added to the `features` cache.

    mean : ndarray
        Mean vector of the initial state distribution.
    covariance : ndarray
        Covariance matrix of the initial state distribution.
    track_id : int
        A unique track identifier.
    hits : int
        Total number of measurement updates.
    age : int
        Total number of frames since first occurance.
    time_since_update : int
        Total number of frames since last measurement update.
    state : TrackState
        The current track state.
    features : List[ndarray]
        A cache of features. On each measurement update, the associated feature
        vector is added to this list.


    def __init__(self, mean, covariance, track_id, n_init, max_age,
        self.mean = mean
        self.covariance = covariance
        self.track_id = track_id
        self.hits = 1
        self.age = 1
        self.time_since_update = 0

        self.state = TrackState.Tentative
        self.features = []
        if feature is not None:

        self._n_init = n_init
        self._max_age = max_age


class TrackState:
    Enumeration type for the single target track state. Newly created tracks are
    classified as `tentative` until enough evidence has been collected. Then,
    the track state is changed to `confirmed`. Tracks that are no longer alive
    are classified as `deleted` to mark them for removal from the set of active


    Tentative = 1
    Confirmed = 2
    Deleted = 3


def update(self, detections):
        """Perform measurement update and track management.

        detections : List[deep_sort.detection.Detection]
            A list of detections at the current time step.

        # Run matching cascade.
        matches, unmatched_tracks, unmatched_detections = \

        # Update track set.
        for track_idx, detection_idx in matches:
                self.kf, detections[detection_idx])
        for track_idx in unmatched_tracks:
        for detection_idx in unmatched_detections:
        self.tracks = [t for t in self.tracks if not t.is_deleted()]

        # Update distance metric.
        active_targets = [t.track_id for t in self.tracks if t.is_confirmed()]
        features, targets = [], []
        for track in self.tracks:
            if not track.is_confirmed():
            features += track.features
            targets += [track.track_id for _ in track.features]
            track.features = []
            np.asarray(features), np.asarray(targets), active_targets)


def update(self, bbox_xywh, confidences, ori_img):
        self.height, self.width = ori_img.shape[:2]
        # generate detections
        features = self._get_features(bbox_xywh, ori_img)
        bbox_tlwh = self._xywh_to_tlwh(bbox_xywh)
        detections = [Detection(bbox_tlwh[i], conf, features[i]) for i,conf in enumerate(confidences) if conf>self.min_confidence]
        # run on non-maximum supression
        boxes = np.array([d.tlwh for d in detections])
        scores = np.array([d.confidence for d in detections])
        indices = non_max_suppression(boxes, self.nms_max_overlap, scores)
        detections = [detections[i] for i in indices]

        # update tracker

        # output bbox identities
        outputs = []
        for track in self.tracker.tracks:
            if not track.is_confirmed() or track.time_since_update > 1:
            box = track.to_tlwh()
            x1,y1,x2,y2 = self._tlwh_to_xyxy(box)
            track_id = track.track_id
        if len(outputs) > 0:
            outputs = np.stack(outputs,axis=0)
        return outputs


def predict(self):
        """Propagate track state distributions one time step forward.

        This function should be called once every time step, before `update`.
        for track in self.tracks:


def predict(self, kf):
        """Propagate the state distribution to the current time step using a
        Kalman filter prediction step.

        kf : kalman_filter.KalmanFilter
            The Kalman filter.

        self.mean, self.covariance = kf.predict(self.mean, self.covariance)
        self.age += 1
        self.time_since_update += 1


def predict(self, mean, covariance):
        """Run Kalman filter prediction step.

        mean : ndarray
            The 8 dimensional mean vector of the object state at the previous
            time step.
        covariance : ndarray
            The 8x8 dimensional covariance matrix of the object state at the
            previous time step.

        (ndarray, ndarray)
            Returns the mean vector and covariance matrix of the predicted
            state. Unobserved velocities are initialized to 0 mean.

        std_pos = [
            self._std_weight_position * mean[3],
            self._std_weight_position * mean[3],
            self._std_weight_position * mean[3]]
        std_vel = [
            self._std_weight_velocity * mean[3],
            self._std_weight_velocity * mean[3],
            self._std_weight_velocity * mean[3]]
        motion_cov = np.diag(np.square(np.r_[std_pos, std_vel]))

        mean =, mean)
        covariance = np.linalg.multi_dot((
            self._motion_mat, covariance, self._motion_mat.T)) + motion_cov

        return mean, covariance


# [[1. 0. 0. 0. 1. 0. 0. 0.]
# [0. 1. 0. 0. 0. 1. 0. 0.]
# [0. 0. 1. 0. 0. 0. 1. 0.]
# [0. 0. 0. 1. 0. 0. 0. 1.]
# [0. 0. 0. 0. 1. 0. 0. 0.]
# [0. 0. 0. 0. 0. 1. 0. 0.]
# [0. 0. 0. 0. 0. 0. 1. 0.]
# [0. 0. 0. 0. 0. 0. 0. 1.]

x t = x t − 1 + Δ t ⋅ v t x_t=x_{t-1}+\Delta t\cdot v_t xt=xt1+Δtvt 
P = ( F ∗ P ∗ F T ) + Q P=(F*P*F^T)+Q P=(FPFT)+Q

def update(self, detections):
        """Perform measurement update and track management.

        detections : List[deep_sort.detection.Detection]
            A list of detections at the current time step.

        # Run matching cascade.
        matches, unmatched_tracks, unmatched_detections = \

        # Update track set.
        for track_idx, detection_idx in matches:
                self.kf, detections[detection_idx])
        for track_idx in unmatched_tracks:
        for detection_idx in unmatched_detections:
        self.tracks = [t for t in self.tracks if not t.is_deleted()]

        # Update distance metric.
        active_targets = [t.track_id for t in self.tracks if t.is_confirmed()]
        features, targets = [], []
        for track in self.tracks:
            if not track.is_confirmed():
            features += track.features
            targets += [track.track_id for _ in track.features]
            track.features = []
            np.asarray(features), np.asarray(targets), active_targets)


def _match(self, detections):

        def gated_metric(tracks, dets, track_indices, detection_indices):
            features = np.array([dets[i].feature for i in detection_indices])
            targets = np.array([tracks[i].track_id for i in track_indices])
            cost_matrix = self.metric.distance(features, targets)
            cost_matrix = linear_assignment.gate_cost_matrix(
                self.kf, cost_matrix, tracks, dets, track_indices,

            return cost_matrix

        # Split track set into confirmed and unconfirmed tracks.
        confirmed_tracks = [
            i for i, t in enumerate(self.tracks) if t.is_confirmed()]
        unconfirmed_tracks = [
            i for i, t in enumerate(self.tracks) if not t.is_confirmed()]

        # Associate confirmed tracks using appearance features.
        matches_a, unmatched_tracks_a, unmatched_detections = \
                gated_metric, self.metric.matching_threshold, self.max_age,
                self.tracks, detections, confirmed_tracks)

        # Associate remaining tracks together with unconfirmed tracks using IOU.
        iou_track_candidates = unconfirmed_tracks + [
            k for k in unmatched_tracks_a if
            self.tracks[k].time_since_update == 1]
        unmatched_tracks_a = [
            k for k in unmatched_tracks_a if
            self.tracks[k].time_since_update != 1]
        matches_b, unmatched_tracks_b, unmatched_detections = \
                iou_matching.iou_cost, self.max_iou_distance, self.tracks,
                detections, iou_track_candidates, unmatched_detections)

        matches = matches_a + matches_b
        unmatched_tracks = list(set(unmatched_tracks_a + unmatched_tracks_b))
        return matches, unmatched_tracks, unmatched_detections

13-17行把track分为confirmed和not confirmed,然后是21行linear_assignment.matching_cascade(
gated_metric, self.metric.matching_threshold, self.max_age,
self.tracks, detections, confirmed_tracks)

def matching_cascade(
        distance_metric, max_distance, cascade_depth, tracks, detections,
        track_indices=None, detection_indices=None):
    """Run matching cascade.

    distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray
        The distance metric is given a list of tracks and detections as well as
        a list of N track indices and M detection indices. The metric should
        return the NxM dimensional cost matrix, where element (i, j) is the
        association cost between the i-th track in the given track indices and
        the j-th detection in the given detection indices.
    max_distance : float
        Gating threshold. Associations with cost larger than this value are
    cascade_depth: int
        The cascade depth, should be se to the maximum track age.
    tracks : List[track.Track]
        A list of predicted tracks at the current time step.
    detections : List[detection.Detection]
        A list of detections at the current time step.
    track_indices : Optional[List[int]]
        List of track indices that maps rows in `cost_matrix` to tracks in
        `tracks` (see description above). Defaults to all tracks.
    detection_indices : Optional[List[int]]
        List of detection indices that maps columns in `cost_matrix` to
        detections in `detections` (see description above). Defaults to all

    (List[(int, int)], List[int], List[int])
        Returns a tuple with the following three entries:
        * A list of matched track and detection indices.
        * A list of unmatched track indices.
        * A list of unmatched detection indices.

    if track_indices is None:
        track_indices = list(range(len(tracks)))
    if detection_indices is None:
        detection_indices = list(range(len(detections)))

    unmatched_detections = detection_indices
    matches = []
    for level in range(cascade_depth):
        if len(unmatched_detections) == 0:  # No detections left

        track_indices_l = [
            k for k in track_indices
            if tracks[k].time_since_update == 1 + level
        if len(track_indices_l) == 0:  # Nothing to match at this level

        matches_l, _, unmatched_detections = \
                distance_metric, max_distance, tracks, detections,
                track_indices_l, unmatched_detections)
        matches += matches_l
    unmatched_tracks = list(set(track_indices) - set(k for k, _ in matches))
    return matches, unmatched_tracks, unmatched_detections



def min_cost_matching(
        distance_metric, max_distance, tracks, detections, track_indices=None,
    """Solve linear assignment problem.

    distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray
        The distance metric is given a list of tracks and detections as well as
        a list of N track indices and M detection indices. The metric should
        return the NxM dimensional cost matrix, where element (i, j) is the
        association cost between the i-th track in the given track indices and
        the j-th detection in the given detection_indices.
    max_distance : float
        Gating threshold. Associations with cost larger than this value are
    tracks : List[track.Track]
        A list of predicted tracks at the current time step.
    detections : List[detection.Detection]
        A list of detections at the current time step.
    track_indices : List[int]
        List of track indices that maps rows in `cost_matrix` to tracks in
        `tracks` (see description above).
    detection_indices : List[int]
        List of detection indices that maps columns in `cost_matrix` to
        detections in `detections` (see description above).

    (List[(int, int)], List[int], List[int])
        Returns a tuple with the following three entries:
        * A list of matched track and detection indices.
        * A list of unmatched track indices.
        * A list of unmatched detection indices.

    if track_indices is None:
        track_indices = np.arange(len(tracks))
    if detection_indices is None:
        detection_indices = np.arange(len(detections))

    if len(detection_indices) == 0 or len(track_indices) == 0:
        return [], track_indices, detection_indices  # Nothing to match.

    cost_matrix = distance_metric(
        tracks, detections, track_indices, detection_indices)
    cost_matrix[cost_matrix > max_distance] = max_distance + 1e-5

    row_indices, col_indices = linear_assignment(cost_matrix)

    matches, unmatched_tracks, unmatched_detections = [], [], []
    for col, detection_idx in enumerate(detection_indices):
        if col not in col_indices:
    for row, track_idx in enumerate(track_indices):
        if row not in row_indices:
    for row, col in zip(row_indices, col_indices):
        track_idx = track_indices[row]
        detection_idx = detection_indices[col]
        if cost_matrix[row, col] > max_distance:
            matches.append((track_idx, detection_idx))
    return matches, unmatched_tracks, unmatched_detections


def gated_metric(tracks, dets, track_indices, detection_indices):
            features = np.array([dets[i].feature for i in detection_indices])
            targets = np.array([tracks[i].track_id for i in track_indices])
            cost_matrix = self.metric.distance(features, targets)
            cost_matrix = linear_assignment.gate_cost_matrix(
                self.kf, cost_matrix, tracks, dets, track_indices,

            return cost_matrix


def distance(self, features, targets):
        """Compute distance between features and targets.

        features : ndarray
            An NxM matrix of N features of dimensionality M.
        targets : List[int]
            A list of targets to match the given `features` against.

            Returns a cost matrix of shape len(targets), len(features), where
            element (i, j) contains the closest squared distance between
            `targets[i]` and `features[j]`.

        cost_matrix = np.zeros((len(targets), len(features)))
        for i, target in enumerate(targets):
            cost_matrix[i, :] = self._metric(self.samples[target], features)
        return cost_matrix


def _nn_cosine_distance(x, y):
    """ Helper function for nearest neighbor distance metric (cosine).
    x : ndarray
        A matrix of N row-vectors (sample points).
    y : ndarray
        A matrix of M row-vectors (query points).

        A vector of length M that contains for each entry in `y` the
        smallest cosine distance to a sample in `x`.

    distances = _cosine_distance(x, y)
    return distances.min(axis=0)


def _cosine_distance(a, b, data_is_normalized=False):
    """Compute pair-wise cosine distance between points in `a` and `b`.

    a : array_like
        An NxM matrix of N samples of dimensionality M.
    b : array_like
        An LxM matrix of L samples of dimensionality M.
    data_is_normalized : Optional[bool]
        If True, assumes rows in a and b are unit length vectors.
        Otherwise, a and b are explicitly normalized to lenght 1.

        Returns a matrix of size len(a), len(b) such that eleement (i, j)
        contains the squared distance between `a[i]` and `b[j]`.

    if not data_is_normalized:
        a = np.asarray(a) / np.linalg.norm(a, axis=1, keepdims=True)
        b = np.asarray(b) / np.linalg.norm(b, axis=1, keepdims=True)
    return 1. -, b.T)


def gated_metric(tracks, dets, track_indices, detection_indices):
            features = np.array([dets[i].feature for i in detection_indices])
            targets = np.array([tracks[i].track_id for i in track_indices])
            cost_matrix = self.metric.distance(features, targets)
            cost_matrix = linear_assignment.gate_cost_matrix(
                self.kf, cost_matrix, tracks, dets, track_indices,

            return cost_matrix


# 基于卡尔曼滤波获得的状态分布,使成本矩阵中的不可行条目无效
def gate_cost_matrix(
        kf, cost_matrix, tracks, detections, track_indices, detection_indices,
        gated_cost=INFTY_COST, only_position=False):
    """Invalidate infeasible entries in cost matrix based on the state
    distributions obtained by Kalman filtering.

    kf : The Kalman filter.
    cost_matrix : ndarray
        The NxM dimensional cost matrix, where N is the number of track indices
        and M is the number of detection indices, such that entry (i, j) is the
        association cost between `tracks[track_indices[i]]` and
    tracks : List[track.Track]
        A list of predicted tracks at the current time step.
    detections : List[detection.Detection]
        A list of detections at the current time step.
    track_indices : List[int]
        List of track indices that maps rows in `cost_matrix` to tracks in
        `tracks` (see description above).
    detection_indices : List[int]
        List of detection indices that maps columns in `cost_matrix` to
        detections in `detections` (see description above).
    gated_cost : Optional[float]
        Entries in the cost matrix corresponding to infeasible associations are
        set this value. Defaults to a very large value.
    only_position : Optional[bool]
        If True, only the x, y position of the state distribution is considered
        during gating. Defaults to False.

        Returns the modified cost matrix.

    gating_dim = 2 if only_position else 4
    gating_threshold = kalman_filter.chi2inv95[gating_dim]
    # chi2inv95 = {
#     1: 3.8415,
#     2: 5.9915,
#     3: 7.8147,
#     4: 9.4877,
#     5: 11.070,
#     6: 12.592,
#     7: 14.067,
#     8: 15.507,
#     9: 16.919}
    measurements = np.asarray(
        [detections[i].to_xyah() for i in detection_indices])
    for row, track_idx in enumerate(track_indices):
        track = tracks[track_idx]
        gating_distance = kf.gating_distance(
            track.mean, track.covariance, measurements, only_position)
        cost_matrix[row, gating_distance > gating_threshold] = gated_cost
    return cost_matrix

52行to_xyah():Convert bounding box to format (center x, center y, aspect ratio,height), where the aspect ratio is width / height.

def gating_distance(self, mean, covariance, measurements,
        """Compute gating distance between state distribution and measurements.

        A suitable distance threshold can be obtained from `chi2inv95`. If
        `only_position` is False, the chi-square distribution has 4 degrees of
        freedom, otherwise 2.

        mean : ndarray
            Mean vector over the state distribution (8 dimensional).
        covariance : ndarray
            Covariance of the state distribution (8x8 dimensional).
        measurements : ndarray
            An Nx4 dimensional matrix of N measurements, each in
            format (x, y, a, h) where (x, y) is the bounding box center
            position, a the aspect ratio, and h the height.
        only_position : Optional[bool]
            If True, distance computation is done with respect to the bounding
            box center position only.

            Returns an array of length N, where the i-th element contains the
            squared Mahalanobis distance between (mean, covariance) and

        mean, covariance = self.project(mean, covariance)
        if only_position:
            mean, covariance = mean[:2], covariance[:2, :2]
            measurements = measurements[:, :2]

        cholesky_factor = np.linalg.cholesky(covariance)
        d = measurements - mean
        z = scipy.linalg.solve_triangular(
            cholesky_factor, d.T, lower=True, check_finite=False,
        squared_maha = np.sum(z * z, axis=0)
        return squared_maha


def project(self, mean, covariance):
        """Project state distribution to measurement space.

        mean : ndarray
            The state's mean vector (8 dimensional array).
        covariance : ndarray
            The state's covariance matrix (8x8 dimensional).

        (ndarray, ndarray)
            Returns the projected mean and covariance matrix of the given state

        std = [
            self._std_weight_position * mean[3],
            self._std_weight_position * mean[3],
            self._std_weight_position * mean[3]]
            # [37.09232278406579, 37.09232278406579, 0.1, 37.09232278406579]
        innovation_cov = np.diag(np.square(std))
        # array([[1.37584041e+03, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
        #         [0.00000000e+00, 1.37584041e+03, 0.00000000e+00, 0.00000000e+00],
        #         [0.00000000e+00, 0.00000000e+00, 1.00000000e-02, 0.00000000e+00],
        #         [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 1.37584041e+03]])

        mean =, mean)# array([ 1.01493103e+03,  3.10279335e+02,  6.50138151e-01,  7.41846456e+02,5.15075825e-01, -2.37273481e+00,  1.51336479e-11, -2.75417309e+00])
        #array([1.01493103e+03, 3.10279335e+02, 6.50138151e-01, 7.41846456e+02])
        # array([[1., 0., 0., 0., 0., 0., 0., 0.],
        #         [0., 1., 0., 0., 0., 0., 0., 0.],
        #         [0., 0., 1., 0., 0., 0., 0., 0.],
        #         [0., 0., 0., 1., 0., 0., 0., 0.]])
        covariance = np.linalg.multi_dot((
            self._update_mat, covariance, self._update_mat.T))
        return mean, covariance + innovation_cov

计算 H x H_x Hx S = ( H ∗ P ∗ H T ) + R S=(H*P*H^T)+R S=(HPT)+R,self._update_mat即对应卡尔曼滤波里的H。

def gating_distance(self, mean, covariance, measurements,
        """Compute gating distance between state distribution and measurements.

        A suitable distance threshold can be obtained from `chi2inv95`. If
        `only_position` is False, the chi-square distribution has 4 degrees of
        freedom, otherwise 2.

        mean : ndarray
            Mean vector over the state distribution (8 dimensional).
        covariance : ndarray
            Covariance of the state distribution (8x8 dimensional).
        measurements : ndarray
            An Nx4 dimensional matrix of N measurements, each in
            format (x, y, a, h) where (x, y) is the bounding box center
            position, a the aspect ratio, and h the height.
        only_position : Optional[bool]
            If True, distance computation is done with respect to the bounding
            box center position only.

            Returns an array of length N, where the i-th element contains the
            squared Mahalanobis distance between (mean, covariance) and

        mean, covariance = self.project(mean, covariance)
        # covariance
        # array([[5.49024634e+03, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
        #         [0.00000000e+00, 5.49024634e+03, 0.00000000e+00, 0.00000000e+00],
        #         [0.00000000e+00, 0.00000000e+00, 1.05517373e-02, 0.00000000e+00],
        #         [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 5.49024634e+03]])
        if only_position:
            mean, covariance = mean[:2], covariance[:2, :2]
            measurements = measurements[:, :2]

        cholesky_factor = np.linalg.cholesky(covariance)
        # array([[74.09619655,  0.        ,  0.        ,  0.        ],
        #         [ 0.        , 74.09619655,  0.        ,  0.        ],
        #         [ 0.        ,  0.        ,  0.10272165,  0.        ],
        #         [ 0.        ,  0.        ,  0.        , 74.09619655]])
        d = measurements - mean
        z = scipy.linalg.solve_triangular(
            cholesky_factor, d.T, lower=True, check_finite=False,
        # array([[-0.16899016, -8.12494505],
        #         [-0.260157  ,  0.2972169 ],
        #         [-0.21768034,  3.05297728],
        #         [-0.67689182, -0.34832872]])
        squared_maha = np.sum(z * z, axis=0)
        return squared_maha

第42行为Cholesky分解。47行为马氏距离计算公式中对应的 d j − y i d_j-y_i djyi。scipy.linalg.solve_triangular 假设a是三角阵,求解x的等式a x = b。最后return马氏距离。
Cholesky 分解在计算马氏距离时的作用


# 基于卡尔曼滤波获得的状态分布,使成本矩阵中的不可行条目无效
def gate_cost_matrix(
        kf, cost_matrix, tracks, detections, track_indices, detection_indices,
        gated_cost=INFTY_COST, only_position=False):
    """Invalidate infeasible entries in cost matrix based on the state
    distributions obtained by Kalman filtering.

    kf : The Kalman filter.
    cost_matrix : ndarray
        The NxM dimensional cost matrix, where N is the number of track indices
        and M is the number of detection indices, such that entry (i, j) is the
        association cost between `tracks[track_indices[i]]` and
    tracks : List[track.Track]
        A list of predicted tracks at the current time step.
    detections : List[detection.Detection]
        A list of detections at the current time step.
    track_indices : List[int]
        List of track indices that maps rows in `cost_matrix` to tracks in
        `tracks` (see description above).
    detection_indices : List[int]
        List of detection indices that maps columns in `cost_matrix` to
        detections in `detections` (see description above).
    gated_cost : Optional[float]
        Entries in the cost matrix corresponding to infeasible associations are
        set this value. Defaults to a very large value.
    only_position : Optional[bool]
        If True, only the x, y position of the state distribution is considered
        during gating. Defaults to False.

        Returns the modified cost matrix.

    gating_dim = 2 if only_position else 4
    gating_threshold = kalman_filter.chi2inv95[gating_dim]
    measurements = np.asarray(
        [detections[i].to_xyah() for i in detection_indices])
    for row, track_idx in enumerate(track_indices):
        track = tracks[track_idx]
        gating_distance = kf.gating_distance(
            track.mean, track.covariance, measurements, only_position)
        cost_matrix[row, gating_distance > gating_threshold] = gated_cost
    return cost_matrix


def min_cost_matching(
        distance_metric, max_distance, tracks, detections, track_indices=None,
    """Solve linear assignment problem.

    distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray
        The distance metric is given a list of tracks and detections as well as
        a list of N track indices and M detection indices. The metric should
        return the NxM dimensional cost matrix, where element (i, j) is the
        association cost between the i-th track in the given track indices and
        the j-th detection in the given detection_indices.
    max_distance : float
        Gating threshold. Associations with cost larger than this value are
    tracks : List[track.Track]
        A list of predicted tracks at the current time step.
    detections : List[detection.Detection]
        A list of detections at the current time step.
    track_indices : List[int]
        List of track indices that maps rows in `cost_matrix` to tracks in
        `tracks` (see description above).
    detection_indices : List[int]
        List of detection indices that maps columns in `cost_matrix` to
        detections in `detections` (see description above).

    (List[(int, int)], List[int], List[int])
        Returns a tuple with the following three entries:
        * A list of matched track and detection indices.
        * A list of unmatched track indices.
        * A list of unmatched detection indices.

    if track_indices is None:
        track_indices = np.arange(len(tracks))
    if detection_indices is None:
        detection_indices = np.arange(len(detections))

    if len(detection_indices) == 0 or len(track_indices) == 0:
        return [], track_indices, detection_indices  # Nothing to match.

    cost_matrix = distance_metric(
        tracks, detections, track_indices, detection_indices)
    cost_matrix[cost_matrix > max_distance] = max_distance + 1e-5

    row_indices, col_indices = linear_assignment(cost_matrix)

    matches, unmatched_tracks, unmatched_detections = [], [], []
    for col, detection_idx in enumerate(detection_indices):
        if col not in col_indices:
    for row, track_idx in enumerate(track_indices):
        if row not in row_indices:
    for row, col in zip(row_indices, col_indices):
        track_idx = track_indices[row]
        detection_idx = detection_indices[col]
        if cost_matrix[row, col] > max_distance:
            matches.append((track_idx, detection_idx))
    return matches, unmatched_tracks, unmatched_detections


def _match(self, detections):

        def gated_metric(tracks, dets, track_indices, detection_indices):
            features = np.array([dets[i].feature for i in detection_indices])
            targets = np.array([tracks[i].track_id for i in track_indices])
            cost_matrix = self.metric.distance(features, targets)
            cost_matrix = linear_assignment.gate_cost_matrix(
                self.kf, cost_matrix, tracks, dets, track_indices,

            return cost_matrix

        # Split track set into confirmed and unconfirmed tracks.
        confirmed_tracks = [
            i for i, t in enumerate(self.tracks) if t.is_confirmed()]
        unconfirmed_tracks = [
            i for i, t in enumerate(self.tracks) if not t.is_confirmed()]

        # Associate confirmed tracks using appearance features.
        matches_a, unmatched_tracks_a, unmatched_detections = \
                gated_metric, self.metric.matching_threshold, self.max_age,
                self.tracks, detections, confirmed_tracks)

        # Associate remaining tracks together with unconfirmed tracks using IOU.
        iou_track_candidates = unconfirmed_tracks + [
            k for k in unmatched_tracks_a if
            self.tracks[k].time_since_update == 1]
        unmatched_tracks_a = [
            k for k in unmatched_tracks_a if
            self.tracks[k].time_since_update != 1]
        matches_b, unmatched_tracks_b, unmatched_detections = \
                iou_matching.iou_cost, self.max_iou_distance, self.tracks,
                detections, iou_track_candidates, unmatched_detections)

        matches = matches_a + matches_b
        unmatched_tracks = list(set(unmatched_tracks_a + unmatched_tracks_b))
        return matches, unmatched_tracks, unmatched_detections

unconfirmed tracks和没有匹配的tracker(unmatcher_tracks_a)一起组成iou_track_candidates,与还没有匹配上的检测结果(unmatched_detections)进行IOU匹配。缓解因为表现突变或者部分遮挡导致的较大特征变化。这样做也有可能导致一些新产生的轨迹被连接到了一些旧的轨迹上。

def iou_cost(tracks, detections, track_indices=None,
    """An intersection over union distance metric.

    tracks : List[deep_sort.track.Track]
        A list of tracks.
    detections : List[deep_sort.detection.Detection]
        A list of detections.
    track_indices : Optional[List[int]]
        A list of indices to tracks that should be matched. Defaults to
        all `tracks`.
    detection_indices : Optional[List[int]]
        A list of indices to detections that should be matched. Defaults
        to all `detections`.

        Returns a cost matrix of shape
        len(track_indices), len(detection_indices) where entry (i, j) is
        `1 - iou(tracks[track_indices[i]], detections[detection_indices[j]])`.

    if track_indices is None:
        track_indices = np.arange(len(tracks))
    if detection_indices is None:
        detection_indices = np.arange(len(detections))

    cost_matrix = np.zeros((len(track_indices), len(detection_indices)))
    for row, track_idx in enumerate(track_indices):
        if tracks[track_idx].time_since_update > 1:
            cost_matrix[row, :] = linear_assignment.INFTY_COST

        bbox = tracks[track_idx].to_tlwh()
        candidates = np.asarray([detections[i].tlwh for i in detection_indices])
        cost_matrix[row, :] = 1. - iou(bbox, candidates)
    return cost_matrix
def iou(bbox, candidates):
    """Computer intersection over union.

    bbox : ndarray
        A bounding box in format `(top left x, top left y, width, height)`.
    candidates : ndarray
        A matrix of candidate bounding boxes (one per row) in the same format
        as `bbox`.

        The intersection over union in [0, 1] between the `bbox` and each
        candidate. A higher score means a larger fraction of the `bbox` is
        occluded by the candidate.

    bbox_tl, bbox_br = bbox[:2], bbox[:2] + bbox[2:]
    candidates_tl = candidates[:, :2]
    candidates_br = candidates[:, :2] + candidates[:, 2:]

    tl = np.c_[np.maximum(bbox_tl[0], candidates_tl[:, 0])[:, np.newaxis],
               np.maximum(bbox_tl[1], candidates_tl[:, 1])[:, np.newaxis]]
    br = np.c_[np.minimum(bbox_br[0], candidates_br[:, 0])[:, np.newaxis],
               np.minimum(bbox_br[1], candidates_br[:, 1])[:, np.newaxis]]
    wh = np.maximum(0., br - tl)

    area_intersection =
    area_bbox = bbox[2:].prod()
    area_candidates = candidates[:, 2:].prod(axis=1)
    return area_intersection / (area_bbox + area_candidates - area_intersection)


def update(self, mean, covariance, measurement):
        """Run Kalman filter correction step.

        mean : ndarray
            The predicted state's mean vector (8 dimensional).
        covariance : ndarray
            The state's covariance matrix (8x8 dimensional).
        measurement : ndarray
            The 4 dimensional measurement vector (x, y, a, h), where (x, y)
            is the center position, a the aspect ratio, and h the height of the
            bounding box.

        (ndarray, ndarray)
            Returns the measurement-corrected state distribution.

        projected_mean, projected_cov = self.project(mean, covariance)

        chol_factor, lower = scipy.linalg.cho_factor(
            projected_cov, lower=True, check_finite=False)
        kalman_gain = scipy.linalg.cho_solve(
            (chol_factor, lower),, self._update_mat.T).T,
        innovation = measurement - projected_mean

        new_mean = mean +, kalman_gain.T)
        new_covariance = covariance - np.linalg.multi_dot((
            kalman_gain, projected_cov, kalman_gain.T))
        return new_mean, new_covariance


def update(self, kf, detection):
        """Perform Kalman filter measurement update step and update the feature

        kf : kalman_filter.KalmanFilter
            The Kalman filter.
        detection : Detection
            The associated detection.

        self.mean, self.covariance = kf.update(
            self.mean, self.covariance, detection.to_xyah())

        self.hits += 1
        self.time_since_update = 0
        if self.state == TrackState.Tentative and self.hits >= self._n_init:
            self.state = TrackState.Confirmed



