ninth in our series on performance profiling and optimization in PyTorch aimed at emphasizing the critical role of performance analysis and optimization in machine learning development. Throughout the series we have reviewed a wide variety of practical tools and techniques for analyzing and boosting the runtime performance of PyTorch-based AI/ML models. Our goal has been twofold:
- To emphasize the importance of routine evaluation and optimization of AI/ML workloads.
- To demonstrate the accessibility of a wide variety tools and techniques for analyzing and optimizing AI/ML runtime performance. You don’t need to be a CUDA expert to meaningfully improve your model performance and reduce compute costs.
In this post, we will explore the use of CUDA streams, a powerful feature of NVIDIA’s CUDA programming model that offers a sophisticated method of overlapping GPU operations and running them concurrently. Although we typically associate our AI/ML model training workload with a single monolithic (a.k.a., “unbreakable”) computation graph G running on the GPU, there are some scenarios where the graph can be decomposed into two distinct subgraphs G1 and G2, where G=G2*G1. In such cases CUDA streams enable “pipelining” the computation graph, i.e., programming our training step to run G1 (on batch input n+1) in parallel to G2 (on the nth output of G1). This technique is especially useful when:
- Neither subgraph fully utilizes the GPU when run alone, and
- The two subgraphs are of similar computational cost (i.e., neither dominates runtime).
We will explore two common scenarios where “pipelining” is feasible:
- Partial-model training or finetuning:
It’s common to freeze a pre-trained model backbone (e.g., feature extractor or encoder) and train only a model head (e.g., decoder). Since the frozen backbone doesn’t rely on gradients from the head, the two can be executed concurrently. - Offloading data preprocessing to the GPU:
A common method for addressing bottlenecks in the input pipeline (also known as GPU starvation), data preprocessing can be moved to the GPU. While prepending preprocessing operations to the model graph improves performance, additional gains can be achieved by running preprocessing on a separate CUDA stream in parallel with model execution—assuming preprocessing isn’t trivial compared to model compute.
To facilitate our discussion, we will define two toy training scripts and measure the training performance under different scenarios. The experiments were run on an Amazon EC2 g5.2xlarge instance (containing an NVIDIA A10G GPU and 8 vCPUs) running a PyTorch (2.6) Deep Learning AMI (DLAMI).
Please note: the code snippets that we share are for demonstration purposes only —please do not rely on their correctness or optimality. The impact of using CUDA streams will vary depending on model architecture and system configuration. We encourage you to conduct your own profiling and experimentation before integrating CUDA streams (or any other tool technique we refer to) into your workflow.
Part 1: Pipelining an Encoder-Decoder Model
The first use-case we explore involves a CNN-based image segmentation model consisting of a fixed (pre-trained) encoder and a trainable decoder. In this scenario, since the encoder weights are frozen and unaffected by backpropagation, the encoder can be executed independently of the decoder’s training. In this section, we assess the impact of pipelining the training process using CUDA streams.
A Toy Image Segmentation Training Experiment
We begin by defining a simple CNN-based image encoder along with its corresponding decoder.
undefined
Next, we construct a synthetic dataset of random images and segmentation maps.
from torch.utils.data import DataLoader
from torchvision.datasets.vision import VisionDataset
# A dataset with random images and per-pixel labels
class FakeDataset(VisionDataset):
def __init__(self):
super().__init__(root=None)
self.size = 1000000
def __getitem__(self, index):
# create a random image
img = torch.randint(0, 256, (3, img_size, img_size),
dtype=torch.uint8)
# create a random label map
target = torch.randint(0, num_classes, (img_size, img_size))
return img, target
def __len__(self):
return self.size
train_set = FakeDataset()
train_loader = DataLoader(
dataset=train_set,
batch_size=8,
num_workers=8
)
Finally, we define the loss function, optimizer, and training loop. Note, that we freeze the encoder’s weights and train only the decoder.
import time
device = torch.device("cuda")
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(decoder.parameters())
# Freeze the encoder weights
encoder.requires_grad_(False)
encoder.eval().to(device)
decoder.train().to(device)
warmup = 10
active_batches = 100
total_iters = warmup + active_batches
for idx, data in enumerate(train_loader):
inputs = data[0].to(device=device, non_blocking=True).float()
labels = data[1].to(device=device, non_blocking=True)
optimizer.zero_grad()
with torch.no_grad():
features = encoder(inputs)
output = decoder(features)
loss = criterion(output, labels)
loss.backward()
optimizer.step()
if idx == warmup:
# sync the GPU and start the timer
torch.cuda.synchronize()
t0 = time.perf_counter()
if idx == total_iters:
break
# wait for the GPU to finnish and then stop the timer
torch.cuda.synchronize()
total_time = time.perf_counter() - t0
print(f'throughput: {active_batches / total_time}')
Our baseline training script achieves an average throughput of 83 steps per second, with an average GPU utilization of 85%.
Pipelining the Model Execution With CUDA Streams
In the revised version of the training loop shown below, we introduce two CUDA streams: one for executing the encoder and one for training the decoder. In each iteration, we perform two operations concurrently:
- Train the decoder using the image features and labels from batch N.
- Execute the encoder on input batch N+1 to generate its image features.
encoder_stream = torch.cuda.Stream()
decoder_stream = torch.cuda.Stream()
# initialize the features to None
features = None
for idx, data in enumerate(train_loader):
inputs = data[0].to(device, non_blocking=True).float()
labels_next = data[1].to(device, non_blocking=True)
if features is not None:
with torch.cuda.stream(decoder_stream):
decoder_stream.wait_stream(encoder_stream)
optimizer.zero_grad()
output = decoder(features)
loss = criterion(output, labels)
loss.backward()
optimizer.step()
with torch.cuda.stream(encoder_stream):
with torch.no_grad():
features = encoder(inputs)
# Record that features was produced on s1_backbone
features.record_stream(encoder_stream)
labels = labels_next
if idx == warmup:
# sync the GPU and start the timer
torch.cuda.synchronize()
t0 = time.perf_counter()
if idx == total_iters:
break
# wait for the GPU to finish and then stop the timer
torch.cuda.synchronize()
total_time = time.perf_counter() - t0
print(f'throughput: {active_batches / total_time}')
This modification yields an average throughput of 91 steps per second, representing a 9.6% speedup. This is a significant improvement — especially considering that our baseline already had high GPU utilization (85%).
Sensitivity of Pipelining to Workload Properties
The effectiveness of pipelining with CUDA streams is highly dependent on the specifics of the training workload and runtime environment. If the encoder is significantly larger than the decoder (or vice versa), pipelining may offer little benefit or even hinder performance. Conversely, when the GPU is underutilized, pipelining tends to yield more substantial gains.
To illustrate this dependency, we reran the experiment with varying batch sizes. The results are summarized below:
As the batch size increases, the benefit of pipelining diminishes. This is likely because larger batch sizes naturally lead to higher (and more efficient) GPU utilization, leaving less room for improvement through concurrent execution.
Part 2: Offloading Augmentations onto the GPU
In this section, we will apply the use of CUDA streams to the acceleration of data augmentation. In previous blog posts (e.g., here and here), we have studied the problem of bottlenecks on the data input pipeline from different perspectives and reviewed several techniques for diagnosing and addressing them. A common causes of these bottlenecks is CPU resource exhaustion, where the CPU cannot meet the computational demands of the preprocessing pipeline. The result is GPU starvation — a scenario in which the expensive GPU sits idle, waiting for data to arrive.
One effective solution is to offload heavy data preprocessing to the GPU. We will demonstrate this technique and take it a step further by executing the augmentations on a dedicated CUDA stream, enabling concurrent execution with the model training.
A Toy Image Classification Training Experiment
We begin by defining a simple CNN-based image classification model:
import torch
import torch.nn as nn
import torch
import torch.nn as nn
img_size = 256
num_classes = 10
model = nn.Sequential(
# Start with 256x256 image
nn.Conv2d(3, 16, kernel_size=1),
nn.ReLU(inplace=True),
nn.Conv2d(16, 32, kernel_size=2, stride=2), # 2x downsample
nn.ReLU(inplace=True),
nn.Conv2d(32, 64, kernel_size=2, stride=2), # 4x downsample
nn.ReLU(inplace=True),
nn.Conv2d(64, 128, kernel_size=2, stride=2), # 8x downsample
nn.ReLU(inplace=True),
nn.Conv2d(128, 256, kernel_size=2, stride=2), # 16x downsample
nn.ReLU(inplace=True),
nn.Conv2d(256, 512, kernel_size=2, stride=2), # 32x downsample
nn.ReLU(inplace=True),
nn.Conv2d(512, 1024, kernel_size=2, stride=2), # 64x downsample
nn.ReLU(inplace=True),
nn.Conv2d(1024, 2048, kernel_size=2, stride=2), # 128X downsample
nn.ReLU(inplace=True),
nn.Conv2d(2048, 4096, kernel_size=2, stride=2), # 256X
nn.Flatten(),
nn.Linear(4096, num_classes)
)
Next, we create a synthetic dataset with an augmentation pipeline intentionally designed to cause a severe performance bottleneck:
import random
from torch.utils.data import DataLoader
import torchvision.transforms.v2 as T
from torchvision.datasets.vision import VisionDataset
import torchvision.transforms.v2.functional as F
import torchvision.ops as ops
# A dataset with random images and labels
class FakeDataset(VisionDataset):
def __init__(self, transform = None):
super().__init__(root=None, transform=transform)
self.size = 1000000
def __getitem__(self, index):
# create a random image
img = torch.randint(0, 256, (3, img_size, img_size),
dtype=torch.uint8)
# create a random label
target = torch.randint(0, num_classes, (1, ))
if self.transform:
# Apply tranformations
img = self.transform(img)
return img, target
def __len__(self):
return self.size
augmentations = T.Compose([
T.ToDtype(torch.float32),
T.RandomCrop(img_size//2),
T.Resize(img_size),
T.RandomRotation(degrees=45.0),
T.GaussianBlur(kernel_size=7),
T.Normalize(mean=[0, 0, 0], std=[1, 1, 1])
])
train_set = FakeDataset(transform=augmentations)
train_loader = DataLoader(
dataset=train_set,
batch_size=32,
num_workers=8
)
Finally, we define the loss function, optimizer, and training loop:
import time
device = torch.device("cuda")
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters())
model.train().to(device)
warmup = 10
active_batches = 100
total_iters = warmup + active_batches
for idx, data in enumerate(train_loader):
inputs = data[0].to(device=device, non_blocking=True)
labels = data[1].to(device=device, non_blocking=True).squeeze()
optimizer.zero_grad()
output = model(inputs)
loss = criterion(output, labels)
loss.backward()
optimizer.step()
if idx == warmup:
# sync the GPU and start the timer
torch.cuda.synchronize()
t0 = time.perf_counter()
if idx == total_iters:
break
# wait for the GPU to finnish and then stop the timer
torch.cuda.synchronize()
total_time = time.perf_counter() - t0
print(f'throughput: {active_batches / total_time}')
Running this baseline script results in an average throughput of 20.41 steps per second and a GPU utilization of only 42%. The heavy data augmentations are choking the CPU leading to GPU starvation. See our previous post for more information on detecting bottlenecks on the data input pipeline.
Offloading Data Augmentations to the GPU
To address the performance bottleneck on the data input pipeline, we move the augmentations onto the GPU.
The first step is to define custom data transforms that apply random rotations and crops per sample in a batch. This is important because the built-in torchvision transforms apply the same augmentation across the entire batch — losing the per-sample randomness seen on the CPU.
We implement the BatchRandomCrop transform using the roi_align operator.
class BatchRandomCrop(T.Transform):
def __init__(self, output_size):
super().__init__()
self.output_size = output_size
def transform(self, img: torch.Tensor, params: dict):
batch_size, _, original_height, original_width = img.shape
device = img.device
max_top = original_height - self.output_size
max_left = original_width - self.output_size
# Generate random top and left coords for each image in the batch
random_top = torch.randint(0, max_top + 1, (batch_size,),
device=device, dtype=torch.float32)
random_left = torch.randint(0, max_left + 1, (batch_size,),
device=device, dtype=torch.float32)
image_indices = torch.arange(batch_size, device=device,
dtype=torch.float32)
boxes = torch.stack([
image_indices,
random_left,
random_top,
random_left + self.output_size,
random_top + self.output_size
], dim=1)
cropped_batch = ops.roi_align(
img,
boxes,
output_size=self.output_size
)
return cropped_batch
We implement the BatchRandomRotate transfrom by iterating over all of the images in the batch and applying a random rotation to each one. Note that this version is not vectorized; a fully vectorized implementation would be more would require greater effort.
class BatchRandomRotation(T.Transform):
def __init__(self, degrees):
super().__init__()
self .degrees = degrees
def transform(self, inpt: torch.Tensor, params: dict):
# split the batch into a list of individual images
images = list(torch.unbind(inpt, dim=0))
augmented_images = []
for img_tensor in images:
# generate a random angle
angle = random.uniform(-self.degrees, self.degrees)
# apply the rotation to the single image
transformed_img = F.rotate(
img_tensor,
angle=angle
)
augmented_images.append(transformed_img)
# stack the transformed images
return torch.stack(augmented_images, dim=0)
We now define batch_transform that mimics the CPU-based augmentation pipeline defined above:
batch_transform = T.Compose([
T.ToDtype(torch.float32),
BatchRandomCrop(img_size//2),
T.Resize(img_size),
BatchRandomRotation(degrees=45.0),
T.GaussianBlur(kernel_size=7),
T.Normalize(mean=[0, 0, 0], std=[1, 1, 1])
])
Finally, we reset the dataset and update the training loop to apply the new batch_transform:
train_set = FakeDataset(transform=None)
train_loader = DataLoader(
dataset=train_set,
batch_size=32,
num_workers=8
)
for idx, data in enumerate(train_loader):
inputs = data[0].to(device=device, non_blocking=True)
labels = data[1].to(device=device, non_blocking=True).squeeze()
# apply augmentations
inputs = batch_transform(inputs)
optimizer.zero_grad()
output = model(inputs)
loss = criterion(output, labels)
loss.backward()
optimizer.step()
if idx == warmup:
torch.cuda.synchronize()
t0 = time.perf_counter()
if idx == total_iters:
break
torch.cuda.synchronize()
total_time = time.perf_counter() - t0
print(f'throughput: {active_batches / total_time}')
This updated training script improves throughput to 35.22 steps per second — a 72.57% speedup over the baseline result.
Pipelining Augmentations With CUDA Streams
Next, we pipeline the augmentation and training steps using two separate CUDA streams: one for running the data transform one for training the model. In each iteration of the loop we perform two concurrent operations:
- We train the model on the augmented batch N.
- Perform GPU-based data augmentations on batch N+1
transform_stream = torch.cuda.Stream()
model_stream = torch.cuda.Stream()
# initialize the transformed value to None
transformed = None
for idx, data in enumerate(train_loader):
inputs = data[0]
labels_next = data[1]
if transformed is not None:
with torch.cuda.stream(model_stream):
labels = labels.to(device, non_blocking=True).squeeze()
model_stream.wait_stream(transform_stream)
optimizer.zero_grad()
output = model(transformed)
loss = criterion(output, labels)
loss.backward()
optimizer.step()
with torch.cuda.stream(transform_stream):
inputs = inputs.to(device, non_blocking=True)
transformed = batch_transform(inputs)
# Record that the tensor was produced on transform_stream
transformed.record_stream(transform_stream)
labels = labels_next
if idx == warmup:
torch.cuda.synchronize()
t0 = time.perf_counter()
if idx == total_iters:
break
torch.cuda.synchronize()
total_time = time.perf_counter() - t0
print(f'throughput: {active_batches / total_time}')
This further improves the throughput to 38.82 steps per second — a 10.2% increase over the serialized solution, and 90.20% faster than the original baseline
Sensitivity of Pipelining to Workload Properties
As we saw in Part 1, the benefit of pipelining using CUDA streams varies based on the details of the workload. In the table below, we capture the results for several different batch sizes:
As the batch size increases, GPU offloading becomes more effective, significantly boosting performance. At the same time, the gains from pipelining decrease. This is likely do to the fact larger batch sizes increase the GPU efficiency, reducing the opportunities for overlap.
Summary
When it comes to running AI/ML workloads, every millisecond counts. In this post we explored the impact of pipelining an AI/ML training step using CUDA stream in two common scenarios: partial model training and offloading data augmentations to the GPU. In both cases, the pipelined solution outperformed the serialized implementation — though the extent of the improvement varied significantly based on the value of the batch size.
As we’ve emphasized throughout the post, the expected impact of the use of CUDA streams can vary greatly based on the AI/ML workload. For example, in cases where the GPU is already being efficiently utilized, the overhead of using CUDA streams may actually lead to a degradation in runtime performance. We strongly recommend testing this technique on your own workloads before adopting this approach.
We hope you will find the technique described in this post useful. For more tip, tricks, and techniques for profiling and optimizing AI/ML workflows, check out the other posts in this series.
Source link
#Pipelining #AIML #Training #Workloads #CUDA #Streams