Skip to content

zamba.object_detection.yolox.megadetector_lite_yolox

FillModeEnum

Bases: str, Enum

Enum for frame filtering fill modes

Attributes:

Name Type Description
repeat

Randomly resample qualifying frames to get to n_frames

score_sorted

Take up to n_frames in sort order (even if some have zero probability)

weighted_euclidean

Sample the remaining frames weighted by their euclidean distance in time to the frames over the threshold

weighted_prob

Sample the remaining frames weighted by their predicted probability

Source code in zamba/object_detection/yolox/megadetector_lite_yolox.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class FillModeEnum(str, Enum):
    """Enum for frame filtering fill modes

    Attributes:
        repeat: Randomly resample qualifying frames to get to n_frames
        score_sorted: Take up to n_frames in sort order (even if some have zero probability)
        weighted_euclidean: Sample the remaining frames weighted by their euclidean distance in
            time to the frames over the threshold
        weighted_prob: Sample the remaining frames weighted by their predicted probability
    """

    repeat = "repeat"
    score_sorted = "score_sorted"
    weighted_euclidean = "weighted_euclidean"
    weighted_prob = "weighted_prob"

MegadetectorLiteYoloX

Source code in zamba/object_detection/yolox/megadetector_lite_yolox.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
class MegadetectorLiteYoloX:
    def __init__(
        self,
        path: os.PathLike = LOCAL_MD_LITE_MODEL,
        kwargs: os.PathLike = LOCAL_MD_LITE_MODEL_KWARGS,
        config: Optional[Union[MegadetectorLiteYoloXConfig, dict]] = None,
    ):
        """MegadetectorLite based on YOLOX.

        Args:
            path (pathlike): Path to trained YoloX model checkpoint (.pth extension)
            config (MegadetectorLiteYoloXConfig): YoloX configuration
        """
        if config is None:
            config = MegadetectorLiteYoloXConfig()
        elif isinstance(config, dict):
            config = MegadetectorLiteYoloXConfig.parse_obj(config)

        yolox = YoloXModel.load(
            checkpoint=path,
            model_kwargs_path=kwargs,
        )

        ckpt = torch.load(yolox.args.ckpt, map_location=config.device)
        model = yolox.exp.get_model()
        model.load_state_dict(ckpt["model"])
        model = model.eval().to(config.device)

        self.model = model
        self.yolox = yolox
        self.config = config
        self.num_classes = yolox.exp.num_classes

    @staticmethod
    def scale_and_pad_array(
        image_array: np.ndarray, output_width: int, output_height: int
    ) -> np.ndarray:
        return np.array(
            ImageOps.pad(
                Image.fromarray(image_array),
                (output_width, output_height),
                method=Image.BICUBIC,
                color=None,
                centering=(0, 0),
            )
        )

    def _preprocess(self, frame: np.ndarray) -> np.ndarray:
        """Process an image for the model, including scaling/padding the image, transposing from
        (height, width, channel) to (channel, height, width) and casting to float.
        """
        arr = np.ascontiguousarray(
            self.scale_and_pad_array(frame, self.config.image_width, self.config.image_height),
            dtype=np.float32,
        )
        return np.moveaxis(arr, 2, 0)

    def _preprocess_video(self, video: np.ndarray) -> np.ndarray:
        """Process a video for the model, including resizing the frames in the video, transposing
        from (batch, height, width, channel) to (batch, channel, height, width) and casting to float.
        """
        resized_video = np.zeros(
            (video.shape[0], video.shape[3], self.config.image_height, self.config.image_width),
            dtype=np.float32,
        )
        for frame_idx in range(video.shape[0]):
            resized_video[frame_idx] = self._preprocess(video[frame_idx])
        return resized_video

    def detect_video(self, video_arr: np.ndarray, pbar: bool = False):
        """Runs object detection on an video.

        Args:
            video_arr (np.ndarray): An video array with dimensions (frames, height, width, channels).
            pbar (int): Whether to show progress bar. Defaults to False.

        Returns:
            list: A list containing detections and score for each frame. Each tuple contains two arrays:
                the first is an array of bounding box detections with dimensions (object, 4) where
                object is the number of objects detected and the other 4 dimension are
                (x1, y1, x2, y1). The second is an array of object detection confidence scores of
                length (object) where object is the number of objects detected.
        """

        pbar = tqdm if pbar else lambda x: x

        # batch of frames
        batch_size = self.config.frame_batch_size

        video_outputs = []
        with torch.no_grad():
            for i in range(0, len(video_arr), batch_size):
                a = video_arr[i : i + batch_size]

                outputs = self.model(
                    torch.from_numpy(self._preprocess_video(a)).to(self.config.device)
                )
                outputs = postprocess(
                    outputs, self.num_classes, self.config.confidence, self.config.nms_threshold
                )
                video_outputs.extend(outputs)

        detections = []
        for o in pbar(video_outputs):
            detections.append(
                self._process_frame_output(
                    o, original_height=video_arr.shape[1], original_width=video_arr.shape[2]
                )
            )

        return detections

    def detect_image(self, img_arr: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """Runs object detection on an image.

        Args:
            img_arr (np.ndarray): An image array with dimensions (height, width, channels).

        Returns:
            np.ndarray: An array of bounding box detections with dimensions (object, 4) where
                object is the number of objects detected and the other 4 dimension are
                (x1, y1, x2, y1).

            np.ndarray: An array of object detection confidence scores of length (object) where
                object is the number of objects detected.
        """
        with torch.no_grad():
            outputs = self.model(
                torch.from_numpy(self._preprocess(img_arr)).unsqueeze(0).to(self.config.device)
            )
            output = postprocess(
                outputs, self.num_classes, self.config.confidence, self.config.nms_threshold
            )

        return self._process_frame_output(output[0], img_arr.shape[0], img_arr.shape[1])

    def _process_frame_output(self, output, original_height, original_width):
        if output is None:
            return np.array([]), np.array([])
        else:
            detections = pd.DataFrame(
                output.cpu().numpy(),
                columns=["x1", "y1", "x2", "y2", "score1", "score2", "class_num"],
            ).assign(score=lambda row: row.score1 * row.score2)

            # Transform bounding box to be in terms of the original image dimensions
            ratio = min(
                self.config.image_width / original_width,
                self.config.image_height / original_height,
            )
            detections[["x1", "y1", "x2", "y2"]] /= ratio

            # Express bounding boxes in terms of proportions of original image dimensions
            detections[["x1", "x2"]] /= original_width
            detections[["y1", "y2"]] /= original_height

            return detections[["x1", "y1", "x2", "y2"]].values, detections.score.values

    def filter_frames(
        self, frames: np.ndarray, detections: List[Tuple[float, float, float, float]]
    ) -> np.ndarray:
        """Filter video frames using megadetector lite.

        Which frames are returned depends on the fill_mode and how many frames are above the
        confidence threshold. If more than n_frames are above the threshold, the top n_frames are
        returned. Otherwise add to those over threshold based on fill_mode. If none of these
        conditions are met, returns all frames above the threshold.

        Args:
            frames (np.ndarray): Array of video frames to filter with dimensions (frames, height,
                width, channels)
            detections (list of tuples): List of detection results for each frame. Each element is
                a tuple of the list of bounding boxes [array(x1, y1, x2, y2)] and the detection
                 probabilities, both as float

        Returns:
            np.ndarray: An array of video frames of length n_frames or shorter
        """

        frame_scores = pd.Series(
            [(np.max(score) if (len(score) > 0) else 0) for _, score in detections]
        ).sort_values(
            ascending=False
        )  # reduce to one score per frame

        selected_indices = frame_scores.loc[frame_scores > self.config.confidence].index

        if self.config.n_frames is None:
            # no minimum n_frames provided, just select all the frames with scores > threshold
            pass

        elif len(selected_indices) >= self.config.n_frames:
            # num. frames with scores > threshold is greater than the requested number of frames
            selected_indices = (
                frame_scores[selected_indices]
                .sort_values(ascending=False)
                .iloc[: self.config.n_frames]
                .index
            )

        elif len(selected_indices) < self.config.n_frames:
            # num. frames with scores > threshold is less than the requested number of frames
            # repeat frames that are above threshold to get to n_frames
            rng = np.random.RandomState(self.config.seed)

            if self.config.fill_mode == "repeat":
                repeated_indices = rng.choice(
                    selected_indices,
                    self.config.n_frames - len(selected_indices),
                    replace=True,
                )
                selected_indices = np.concatenate((selected_indices, repeated_indices))

            # take frames in sorted order up to n_frames, even if score is zero
            elif self.config.fill_mode == "score_sorted":
                selected_indices = (
                    frame_scores.sort_values(ascending=False).iloc[: self.config.n_frames].index
                )

            # sample up to n_frames, prefer points closer to frames with detection
            elif self.config.fill_mode == "weighted_euclidean":
                sample_from = frame_scores.loc[~frame_scores.index.isin(selected_indices)].index
                # take one over euclidean distance to all points with detection
                weights = [1 / np.linalg.norm(selected_indices - sample) for sample in sample_from]
                # normalize weights
                weights /= np.sum(weights)
                sampled = rng.choice(
                    sample_from,
                    self.config.n_frames - len(selected_indices),
                    replace=False,
                    p=weights,
                )

                selected_indices = np.concatenate((selected_indices, sampled))

            # sample up to n_frames, weight by predicted probability - only if some frames have nonzero prob
            elif (self.config.fill_mode == "weighted_prob") and (len(selected_indices) > 0):
                sample_from = frame_scores.loc[~frame_scores.index.isin(selected_indices)].index
                weights = frame_scores[sample_from] / np.sum(frame_scores[sample_from])
                sampled = rng.choice(
                    sample_from,
                    self.config.n_frames - len(selected_indices),
                    replace=False,
                    p=weights,
                )

                selected_indices = np.concatenate((selected_indices, sampled))

        # sort the selected images back into their original order
        if self.config.sort_by_time:
            selected_indices = sorted(selected_indices)

        return frames[selected_indices]

__init__(path=LOCAL_MD_LITE_MODEL, kwargs=LOCAL_MD_LITE_MODEL_KWARGS, config=None)

MegadetectorLite based on YOLOX.

Parameters:

Name Type Description Default
path pathlike

Path to trained YoloX model checkpoint (.pth extension)

LOCAL_MD_LITE_MODEL
config MegadetectorLiteYoloXConfig

YoloX configuration

None
Source code in zamba/object_detection/yolox/megadetector_lite_yolox.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def __init__(
    self,
    path: os.PathLike = LOCAL_MD_LITE_MODEL,
    kwargs: os.PathLike = LOCAL_MD_LITE_MODEL_KWARGS,
    config: Optional[Union[MegadetectorLiteYoloXConfig, dict]] = None,
):
    """MegadetectorLite based on YOLOX.

    Args:
        path (pathlike): Path to trained YoloX model checkpoint (.pth extension)
        config (MegadetectorLiteYoloXConfig): YoloX configuration
    """
    if config is None:
        config = MegadetectorLiteYoloXConfig()
    elif isinstance(config, dict):
        config = MegadetectorLiteYoloXConfig.parse_obj(config)

    yolox = YoloXModel.load(
        checkpoint=path,
        model_kwargs_path=kwargs,
    )

    ckpt = torch.load(yolox.args.ckpt, map_location=config.device)
    model = yolox.exp.get_model()
    model.load_state_dict(ckpt["model"])
    model = model.eval().to(config.device)

    self.model = model
    self.yolox = yolox
    self.config = config
    self.num_classes = yolox.exp.num_classes

detect_image(img_arr)

Runs object detection on an image.

Parameters:

Name Type Description Default
img_arr ndarray

An image array with dimensions (height, width, channels).

required

Returns:

Type Description
ndarray

np.ndarray: An array of bounding box detections with dimensions (object, 4) where object is the number of objects detected and the other 4 dimension are (x1, y1, x2, y1).

ndarray

np.ndarray: An array of object detection confidence scores of length (object) where object is the number of objects detected.

Source code in zamba/object_detection/yolox/megadetector_lite_yolox.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def detect_image(self, img_arr: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    """Runs object detection on an image.

    Args:
        img_arr (np.ndarray): An image array with dimensions (height, width, channels).

    Returns:
        np.ndarray: An array of bounding box detections with dimensions (object, 4) where
            object is the number of objects detected and the other 4 dimension are
            (x1, y1, x2, y1).

        np.ndarray: An array of object detection confidence scores of length (object) where
            object is the number of objects detected.
    """
    with torch.no_grad():
        outputs = self.model(
            torch.from_numpy(self._preprocess(img_arr)).unsqueeze(0).to(self.config.device)
        )
        output = postprocess(
            outputs, self.num_classes, self.config.confidence, self.config.nms_threshold
        )

    return self._process_frame_output(output[0], img_arr.shape[0], img_arr.shape[1])

detect_video(video_arr, pbar=False)

Runs object detection on an video.

Parameters:

Name Type Description Default
video_arr ndarray

An video array with dimensions (frames, height, width, channels).

required
pbar int

Whether to show progress bar. Defaults to False.

False

Returns:

Name Type Description
list

A list containing detections and score for each frame. Each tuple contains two arrays: the first is an array of bounding box detections with dimensions (object, 4) where object is the number of objects detected and the other 4 dimension are (x1, y1, x2, y1). The second is an array of object detection confidence scores of length (object) where object is the number of objects detected.

Source code in zamba/object_detection/yolox/megadetector_lite_yolox.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def detect_video(self, video_arr: np.ndarray, pbar: bool = False):
    """Runs object detection on an video.

    Args:
        video_arr (np.ndarray): An video array with dimensions (frames, height, width, channels).
        pbar (int): Whether to show progress bar. Defaults to False.

    Returns:
        list: A list containing detections and score for each frame. Each tuple contains two arrays:
            the first is an array of bounding box detections with dimensions (object, 4) where
            object is the number of objects detected and the other 4 dimension are
            (x1, y1, x2, y1). The second is an array of object detection confidence scores of
            length (object) where object is the number of objects detected.
    """

    pbar = tqdm if pbar else lambda x: x

    # batch of frames
    batch_size = self.config.frame_batch_size

    video_outputs = []
    with torch.no_grad():
        for i in range(0, len(video_arr), batch_size):
            a = video_arr[i : i + batch_size]

            outputs = self.model(
                torch.from_numpy(self._preprocess_video(a)).to(self.config.device)
            )
            outputs = postprocess(
                outputs, self.num_classes, self.config.confidence, self.config.nms_threshold
            )
            video_outputs.extend(outputs)

    detections = []
    for o in pbar(video_outputs):
        detections.append(
            self._process_frame_output(
                o, original_height=video_arr.shape[1], original_width=video_arr.shape[2]
            )
        )

    return detections

filter_frames(frames, detections)

Filter video frames using megadetector lite.

Which frames are returned depends on the fill_mode and how many frames are above the confidence threshold. If more than n_frames are above the threshold, the top n_frames are returned. Otherwise add to those over threshold based on fill_mode. If none of these conditions are met, returns all frames above the threshold.

Parameters:

Name Type Description Default
frames ndarray

Array of video frames to filter with dimensions (frames, height, width, channels)

required
detections list of tuples

List of detection results for each frame. Each element is a tuple of the list of bounding boxes [array(x1, y1, x2, y2)] and the detection probabilities, both as float

required

Returns:

Type Description
ndarray

np.ndarray: An array of video frames of length n_frames or shorter

Source code in zamba/object_detection/yolox/megadetector_lite_yolox.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
def filter_frames(
    self, frames: np.ndarray, detections: List[Tuple[float, float, float, float]]
) -> np.ndarray:
    """Filter video frames using megadetector lite.

    Which frames are returned depends on the fill_mode and how many frames are above the
    confidence threshold. If more than n_frames are above the threshold, the top n_frames are
    returned. Otherwise add to those over threshold based on fill_mode. If none of these
    conditions are met, returns all frames above the threshold.

    Args:
        frames (np.ndarray): Array of video frames to filter with dimensions (frames, height,
            width, channels)
        detections (list of tuples): List of detection results for each frame. Each element is
            a tuple of the list of bounding boxes [array(x1, y1, x2, y2)] and the detection
             probabilities, both as float

    Returns:
        np.ndarray: An array of video frames of length n_frames or shorter
    """

    frame_scores = pd.Series(
        [(np.max(score) if (len(score) > 0) else 0) for _, score in detections]
    ).sort_values(
        ascending=False
    )  # reduce to one score per frame

    selected_indices = frame_scores.loc[frame_scores > self.config.confidence].index

    if self.config.n_frames is None:
        # no minimum n_frames provided, just select all the frames with scores > threshold
        pass

    elif len(selected_indices) >= self.config.n_frames:
        # num. frames with scores > threshold is greater than the requested number of frames
        selected_indices = (
            frame_scores[selected_indices]
            .sort_values(ascending=False)
            .iloc[: self.config.n_frames]
            .index
        )

    elif len(selected_indices) < self.config.n_frames:
        # num. frames with scores > threshold is less than the requested number of frames
        # repeat frames that are above threshold to get to n_frames
        rng = np.random.RandomState(self.config.seed)

        if self.config.fill_mode == "repeat":
            repeated_indices = rng.choice(
                selected_indices,
                self.config.n_frames - len(selected_indices),
                replace=True,
            )
            selected_indices = np.concatenate((selected_indices, repeated_indices))

        # take frames in sorted order up to n_frames, even if score is zero
        elif self.config.fill_mode == "score_sorted":
            selected_indices = (
                frame_scores.sort_values(ascending=False).iloc[: self.config.n_frames].index
            )

        # sample up to n_frames, prefer points closer to frames with detection
        elif self.config.fill_mode == "weighted_euclidean":
            sample_from = frame_scores.loc[~frame_scores.index.isin(selected_indices)].index
            # take one over euclidean distance to all points with detection
            weights = [1 / np.linalg.norm(selected_indices - sample) for sample in sample_from]
            # normalize weights
            weights /= np.sum(weights)
            sampled = rng.choice(
                sample_from,
                self.config.n_frames - len(selected_indices),
                replace=False,
                p=weights,
            )

            selected_indices = np.concatenate((selected_indices, sampled))

        # sample up to n_frames, weight by predicted probability - only if some frames have nonzero prob
        elif (self.config.fill_mode == "weighted_prob") and (len(selected_indices) > 0):
            sample_from = frame_scores.loc[~frame_scores.index.isin(selected_indices)].index
            weights = frame_scores[sample_from] / np.sum(frame_scores[sample_from])
            sampled = rng.choice(
                sample_from,
                self.config.n_frames - len(selected_indices),
                replace=False,
                p=weights,
            )

            selected_indices = np.concatenate((selected_indices, sampled))

    # sort the selected images back into their original order
    if self.config.sort_by_time:
        selected_indices = sorted(selected_indices)

    return frames[selected_indices]

MegadetectorLiteYoloXConfig

Bases: BaseModel

Configuration for a MegadetectorLiteYoloX frame selection model

Attributes:

Name Type Description
confidence float

Only consider object detections with this confidence or greater

nms_threshold float

Non-maximum suppression is a method for filtering many bounding boxes around the same object to a single bounding box. This is a constant that determines how much to suppress similar bounding boxes.

image_width int

Scale image to this width before sending to object detection model.

image_height int

Scale image to this height before sending to object detection model.

device str

Where to run the object detection model, "cpu" or "cuda".

frame_batch_size int

Number of frames to predict on at once.

n_frames int

Max number of frames to return. If None returns all frames above the threshold. Defaults to None.

fill_mode str

Mode for upsampling if the number of frames above the threshold is less than n_frames. Defaults to "repeat".

sort_by_time bool

Whether to sort the selected frames by time (original order) before returning. If False, returns frames sorted by score (descending). Defaults to True.

seed int

Random state for random number generator. Defaults to 55.

Source code in zamba/object_detection/yolox/megadetector_lite_yolox.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
class MegadetectorLiteYoloXConfig(BaseModel):
    """Configuration for a MegadetectorLiteYoloX frame selection model

    Attributes:
        confidence (float): Only consider object detections with this confidence or greater
        nms_threshold (float): Non-maximum suppression is a method for filtering many bounding
            boxes around the same object to a single bounding box. This is a constant that
            determines how much to suppress similar bounding boxes.
        image_width (int): Scale image to this width before sending to object detection model.
        image_height (int): Scale image to this height before sending to object detection model.
        device (str): Where to run the object detection model, "cpu" or "cuda".
        frame_batch_size (int): Number of frames to predict on at once.
        n_frames (int, optional): Max number of frames to return. If None returns all frames above
            the threshold. Defaults to None.
        fill_mode (str, optional): Mode for upsampling if the number of frames above the threshold
            is less than n_frames. Defaults to "repeat".
        sort_by_time (bool, optional): Whether to sort the selected frames by time (original order)
            before returning. If False, returns frames sorted by score (descending). Defaults to
            True.
        seed (int, optional): Random state for random number generator. Defaults to 55.
    """

    confidence: float = 0.25
    nms_threshold: float = 0.45
    image_width: int = 640
    image_height: int = 640
    device: str = "cuda" if torch.cuda.is_available() else "cpu"
    frame_batch_size: int = 24
    n_frames: Optional[int] = None
    fill_mode: Optional[FillModeEnum] = FillModeEnum.score_sorted
    sort_by_time: bool = True
    seed: Optional[int] = 55

    class Config:
        extra = "forbid"