1. 流式处理
  2. 从视频中进行目标检测

视频中的流式对象检测

在本指南中,我们将使用RT-DETR模型来检测用户上传视频中的对象。我们将使用Gradio 5.0中引入的新视频流功能从服务器流式传输结果。

video_object_detection_stream_latest

设置模型

首先,我们将在系统中安装以下要求:

opencv-python
torch
transformers>=4.43.0
spaces

然后,我们将从Hugging Face Hub下载模型:

from transformers import RTDetrForObjectDetection, RTDetrImageProcessor

image_processor = RTDetrImageProcessor.from_pretrained("PekingU/rtdetr_r50vd")
model = RTDetrForObjectDetection.from_pretrained("PekingU/rtdetr_r50vd").to("cuda")

我们正在将模型移动到GPU上。我们将把我们的模型部署到Hugging Face Spaces,并在免费的ZeroGPU集群中运行推理。

推理函数

我们的推理函数将接受一个视频和一个期望的置信度阈值。 物体检测模型会识别许多物体并为每个物体分配一个置信度分数。置信度越低,误报的可能性就越高。因此,我们将让用户设置置信度阈值。

我们的函数将遍历视频中的帧,并在每一帧上运行RT-DETR模型。 然后,我们将在帧中为每个检测到的对象绘制边界框,并将帧保存到新的输出视频中。 该函数将以两秒的块为单位生成每个输出视频。

为了在ZeroGPU上尽可能保持推理时间最低(存在基于时间的配额),我们将在运行模型之前将输出视频的原始帧率减半,并将输入帧的大小调整为原始大小的一半。

推理函数的代码如下 - 我们将逐段讲解。

import spaces
import cv2
from PIL import Image
import torch
import time
import numpy as np
import uuid

from draw_boxes import draw_bounding_boxes

SUBSAMPLE = 2

@spaces.GPU
def stream_object_detection(video, conf_threshold):
    cap = cv2.VideoCapture(video)

    # This means we will output mp4 videos
    video_codec = cv2.VideoWriter_fourcc(*"mp4v") # type: ignore
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    desired_fps = fps // SUBSAMPLE
    width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) // 2
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) // 2

    iterating, frame = cap.read()

    n_frames = 0

    # Use UUID to create a unique video file
    output_video_name = f"output_{uuid.uuid4()}.mp4"

    # Output Video
    output_video = cv2.VideoWriter(output_video_name, video_codec, desired_fps, (width, height)) # type: ignore
    batch = []

    while iterating:
        frame = cv2.resize( frame, (0,0), fx=0.5, fy=0.5)
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        if n_frames % SUBSAMPLE == 0:
            batch.append(frame)
        if len(batch) == 2 * desired_fps:
            inputs = image_processor(images=batch, return_tensors="pt").to("cuda")

            with torch.no_grad():
                outputs = model(**inputs)

            boxes = image_processor.post_process_object_detection(
                outputs,
                target_sizes=torch.tensor([(height, width)] * len(batch)),
                threshold=conf_threshold)
            
            for i, (array, box) in enumerate(zip(batch, boxes)):
                pil_image = draw_bounding_boxes(Image.fromarray(array), box, model, conf_threshold)
                frame = np.array(pil_image)
                # Convert RGB to BGR
                frame = frame[:, :, ::-1].copy()
                output_video.write(frame)

            batch = []
            output_video.release()
            yield output_video_name
            output_video_name = f"output_{uuid.uuid4()}.mp4"
            output_video = cv2.VideoWriter(output_video_name, video_codec, desired_fps, (width, height)) # type: ignore

        iterating, frame = cap.read()
        n_frames += 1
  1. 从视频中读取

在Python中创建视频的行业标准之一是OpenCV,因此我们将在此应用程序中使用它。

cap 变量是我们从输入视频中读取的方式。每当我们调用 cap.read() 时,我们都在读取视频中的下一帧。

为了在Gradio中流式传输视频,我们需要为输出视频的每个“块”生成一个不同的视频文件。 我们使用output_video = cv2.VideoWriter(output_video_name, video_codec, desired_fps, (width, height))这一行来创建下一个要写入的视频文件。video_codec是我们指定视频文件类型的方式。目前仅支持“mp4”和“ts”文件进行视频流传输。

  1. 推理循环

对于视频中的每一帧,我们将其大小调整为原来的一半。OpenCV以BGR格式读取文件,因此会转换为transfomers期望的RGB格式。这就是while循环前两行所做的事情。

我们每隔一帧将其添加到batch列表中,以便输出视频的帧率为原始帧率的一半。当批次覆盖两秒的视频时,我们将运行模型。选择两秒的阈值是为了保持每个批次的处理时间足够小,以便视频在服务器上流畅显示,同时不需要太多的单独前向传递。为了使视频流在Gradio中正常工作,批次大小应至少为1秒。

我们运行模型的前向传播,然后使用模型的post_process_object_detection方法将检测到的边界框缩放到输入帧的大小。

我们使用一个自定义函数来绘制边界框(来源这里)。然后,在写回输出视频之前,我们必须将RGB转换为BGR

一旦我们完成了批处理,我们就会为下一个批处理创建一个新的输出视频文件。

Gradio演示

UI代码与其他类型的Gradio应用程序非常相似。 我们将使用标准的两列布局,以便用户可以并排查看输入和输出视频。

为了使流媒体工作,我们必须在输出视频中设置streaming=True。将视频设置为自动播放不是必须的,但它能为用户提供更好的体验。

import gradio as gr

with gr.Blocks() as app:
    gr.HTML(
        """
    <h1 style='text-align: center'>
    Video Object Detection with <a href='https://huggingface.co/PekingU/rtdetr_r101vd_coco_o365' target='_blank'>RT-DETR</a>
    </h1>
    """)
    with gr.Row():
        with gr.Column():
            video = gr.Video(label="Video Source")
            conf_threshold = gr.Slider(
                label="Confidence Threshold",
                minimum=0.0,
                maximum=1.0,
                step=0.05,
                value=0.30,
            )
        with gr.Column():
            output_video = gr.Video(label="Processed Video", streaming=True, autoplay=True)

    video.upload(
        fn=stream_object_detection,
        inputs=[video, conf_threshold],
        outputs=[output_video],
    )

结论

您可以查看我们在Hugging Face Spaces上托管的演示这里

它也嵌入在本页下方