Parallel Video Processing with Multiple GPUs in PyTorch

5 min read

If you’re looking for a straightforward way to process lots of video files in parallel on multiple GPUs, then this Python script is an excellent starting point. It leverages Python’s threading module to coordinate multiple GPUs, processes video frames one by one, and uses the Rich library to display real-time progress.

How It Works

1. Discovering Available GPUs:

The script runs an nvidia-smi command to gather GPU IDs on the system. Users can optionally exclude specific GPU IDs through a command-line argument, ensuring that only the desired GPUs are used.

2. Enqueuing Videos:

The script scans a specified video directory for .mp4 files and adds their paths to a shared queue. Videos that have already been processed (i.e., you already have the corresponding .npz files) are automatically skipped.

3. Thread Spawning:

For each GPU, the script spawns a new thread. Each thread then repeatedly grabs the next video file path from the queue and processes it until there are no more videos left.

4. Frame-by-Frame Processing:

The script includes a placeholder process_frame() function for any custom inference logic (e.g., object detection, pose estimation, etc.). You’ll see how easy it is to replace the dummy code with your own model inference.

5. Progress Tracking:

Using Rich, two progress bars are displayed: one shows the overall progress of the entire video set, and the other shows detailed progress for the current video on each GPU. This gives you real-time updates on the workload and helps you monitor performance across GPUs.

This template is flexible, allowing you to adopt your own hardware or software stack. Whether you’re experimenting with deep learning models or just trying to speed up a large video preprocessing task, this script provides a scalable, multi-GPU starting point.

import os
import threading
import argparse
import subprocess
import queue
import cv2
import numpy as np

from rich.console import Console, Group
from rich.progress import (
    Progress,
    TextColumn,
    BarColumn,
    TimeElapsedColumn,
    TimeRemainingColumn,
    TaskProgressColumn,
)
from rich.live import Live

################################################################################
# Utility functions
################################################################################

def parse_args():
    parser = argparse.ArgumentParser(description="Multi-GPU script template")
    parser.add_argument(
        "--exclude_gpus",
        type=str,
        default="",
        help="Comma separated list of GPU ids to exclude, e.g. '0,1'",
    )
    args = parser.parse_args()
    return args

def get_all_gpu_ids():
    """Returns a list of all available GPU IDs using nvidia-smi."""
    try:
        result = subprocess.run(
            ["nvidia-smi", "--query-gpu=index", "--format=csv,noheader,nounits"],
            stdout=subprocess.PIPE,
            universal_newlines=True,
        )
        gpu_ids = result.stdout.strip().split("\n")
        return [int(gpu_id) for gpu_id in gpu_ids]
    except Exception as e:
        print(f"Error getting GPU IDs: {e}")
        return []

def get_bus_id(gpu_id):
    """Returns the PCI bus ID for a given GPU index using nvidia-smi."""
    try:
        result = subprocess.run(
            ["nvidia-smi", "--query-gpu=pci.bus_id", "--format=csv,noheader,nounits"],
            stdout=subprocess.PIPE,
            universal_newlines=True,
        )
        bus_ids = result.stdout.strip().split("\n")
        return bus_ids[int(gpu_id)]
    except Exception as e:
        print(f"Error getting BUS ID for GPU {gpu_id}: {e}")
        return "Unknown"

################################################################################
# Placeholder for your custom frame processing / model inference
################################################################################

def process_frame(frame, backend, device, gpu_id):
    """
    Insert your model inference code here.

    For example:
      - object detection
      - pose estimation
      - any other frame-by-frame processing

    Return any results you want to save or visualize.
    Here, we demonstrate returning synthetic keypoints and scores.
    """
    # TODO: Replace this block with your actual inference logic.
    # ----------------------------------
    # e.g., keypoints, scores = your_model_inference(frame)
    keypoints = np.random.rand(1, 17, 2)  # (Batch=1, NumKeypoints=17, x/y)
    scores = np.random.rand(1, 17)       # (Batch=1, NumKeypoints=17)
    # ----------------------------------
    return keypoints, scores

################################################################################
# Worker thread
################################################################################

def thread_execution(gpu_id, progress_videos, progress_overall, overall_task_id):
    """
    Processes videos in the shared video queue on the given GPU.
    """
    global video_queue, backend, device, output_dir_selected

    bus_id = get_bus_id(gpu_id)
    print(f"Thread {threading.current_thread().name} using GPU {gpu_id} with BUS ID {bus_id}")

    while True:
        try:
            video_path = video_queue.get_nowait()
        except queue.Empty:
            break  # No more videos to process

        try:
            video_name = os.path.splitext(os.path.basename(video_path))[0]
            video_output_dir = os.path.join(output_dir_selected, video_name)

            cap = cv2.VideoCapture(video_path)
            if not cap.isOpened():
                print(f"Error: Unable to open video file {video_path}")
                continue
            else:
                video_length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
                task_id = progress_videos.add_task(
                    f"Processing {video_name}",
                    filename=video_name,
                    total=video_length,
                    gpu_id=gpu_id,
                )

                # Lists to store all keypoints and scores
                keypoints_list = []
                scores_list = []

                while True:
                    ret, frame = cap.read()
                    if not ret:
                        break

                    # Perform your custom inference
                    keypoints, scores = process_frame(frame, backend, device, gpu_id)
                    keypoints_list.append(keypoints)
                    scores_list.append(scores)

                    # Example: Debug visualization placeholder (disabled by default)
                    debug = False
                    if debug:
                        img_show = frame.copy()
                        # You could draw your keypoints or bounding boxes here
                        # cv2.circle(img_show, (x, y), 3, (0,255,0), -1)
                        cv2.imshow("Debug", img_show)
                        cv2.waitKey(10)

                    # Update progress bar
                    progress_videos.advance(task_id)

                # Remove the task after finishing this video
                progress_videos.remove_task(task_id)

                # Save the results
                result_keypoints = np.concatenate(keypoints_list, axis=0)
                result_scores = np.concatenate(scores_list, axis=0)

                output_npz = os.path.join(output_dir_selected, f"keypoints_{video_name}.npz")
                np.savez_compressed(output_npz, keypoints=result_keypoints)

                output_scores = os.path.join(output_dir_selected, f"scores_{video_name}.npz")
                np.savez_compressed(output_scores, scores=result_scores)

                # Advance the overall progress
                progress_overall.advance(overall_task_id, advance=1)

        except Exception as e:
            print(f"Error processing video {video_path}: {e}")
            continue

    print(f"Finished processing on GPU {gpu_id} (BUS ID {bus_id})")

################################################################################
# Globals & main
################################################################################

video_queue = queue.Queue()  # This queue will hold all video paths
output_dir_selected = ""

# You may change these if your backend or device differs
backend = "onnxruntime"  # e.g., 'opencv', 'onnxruntime', 'openvino'
device = "cuda"          # e.g., 'cuda', 'cpu'

def main():
    global video_queue, output_dir_selected

    # Parse arguments
    args = parse_args()
    exclude_gpu_ids = (
        [int(x) for x in args.exclude_gpus.split(",") if x != ""]
        if args.exclude_gpus
        else []
    )
    
    # Discover available GPUs (excluding any user-specified ones)
    all_gpu_ids = get_all_gpu_ids()
    available_gpu_ids = [gpu_id for gpu_id in all_gpu_ids if gpu_id not in exclude_gpu_ids]

    # Example video directory and output directory
    video_dir = "/path/to/your/videos"
    output_dir_selected = "/path/to/save/results"

    # Enqueue all .mp4 files that have not been processed yet
    for file in os.listdir(video_dir):
        if file.endswith(".mp4"):
            video_path = os.path.join(video_dir, file)
            video_name = os.path.splitext(os.path.basename(video_path))[0]
            npz_file = os.path.join(output_dir_selected, f"keypoints_{video_name}.npz")
            if not os.path.exists(npz_file):
                video_queue.put(video_path)

    total_videos = video_queue.qsize()
    print("Number of videos to process:", total_videos)

    console = Console()

    # Overall progress bar (videos processed)
    progress_overall = Progress(
        TextColumn("Processed: {task.completed}/{task.total} videos"),
        BarColumn(),
        TaskProgressColumn(),
        TimeElapsedColumn(),
        TimeRemainingColumn(),
    )

    # Per-video progress bar
    progress_videos = Progress(
        TextColumn("[bold blue]{task.fields[filename]} (GPU {task.fields[gpu_id]})", justify="right"),
        BarColumn(),
        TaskProgressColumn(),
        TimeElapsedColumn(),
        TimeRemainingColumn(),
    )

    overall_task_id = progress_overall.add_task("Total Progress", total=total_videos)

    # Create and start threads, each one handling a different GPU
    with Live(Group(progress_overall, progress_videos), refresh_per_second=10):
        threads = []
        for gpu_id in available_gpu_ids:
            thread = threading.Thread(
                target=thread_execution,
                args=(gpu_id, progress_videos, progress_overall, overall_task_id),
            )
            thread.start()
            threads.append(thread)

        # Wait for all threads to complete
        for thread in threads:
            thread.join()

if __name__ == "__main__":
    main()
Python

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert

Diese Website verwendet Akismet, um Spam zu reduzieren. Erfahre mehr darüber, wie deine Kommentardaten verarbeitet werden.