注意
Go to the end to download the full example code
单机多GPU小批量节点分类
在本教程中,您将学习如何在训练图神经网络(GNN)进行节点分类时使用多个GPU。
本教程假设您已经阅读了DGL中的节点分类的随机GNN训练。
它还假设您了解使用DistributedDataParallel
进行多GPU训练一般模型的基础知识。
注意
请参阅此教程
来自PyTorch,了解使用DistributedDataParallel
进行通用多GPU训练的信息。此外,
请参阅多GPU图分类教程
的第一部分,了解在DGL中使用DistributedDataParallel
的概述。
导入包
我们使用 torch.distributed
来初始化分布式训练上下文,并使用 torch.multiprocessing
为每个 GPU 生成多个进程。
import os
os.environ["DGLBACKEND"] = "pytorch"
import time
import dgl.graphbolt as gb
import dgl.nn as dglnn
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.nn.functional as F
import torchmetrics.functional as MF
from torch.distributed.algorithms.join import Join
from torch.nn.parallel import DistributedDataParallel as DDP
from tqdm.auto import tqdm
Defining Model
该模型将再次与DGL中的节点分类的随机GNN训练相同。
class SAGE(nn.Module):
def __init__(self, in_size, hidden_size, out_size):
super().__init__()
self.layers = nn.ModuleList()
# Three-layer GraphSAGE-mean.
self.layers.append(dglnn.SAGEConv(in_size, hidden_size, "mean"))
self.layers.append(dglnn.SAGEConv(hidden_size, hidden_size, "mean"))
self.layers.append(dglnn.SAGEConv(hidden_size, out_size, "mean"))
self.dropout = nn.Dropout(0.5)
self.hidden_size = hidden_size
self.out_size = out_size
# Set the dtype for the layers manually.
self.float()
def forward(self, blocks, x):
hidden_x = x
for layer_idx, (layer, block) in enumerate(zip(self.layers, blocks)):
hidden_x = layer(block, hidden_x)
is_last_layer = layer_idx == len(self.layers) - 1
if not is_last_layer:
hidden_x = F.relu(hidden_x)
hidden_x = self.dropout(hidden_x)
return hidden_x
小批量数据加载
与之前的教程相比,主要区别在于我们将使用DistributedItemSampler
而不是ItemSampler
来采样小批量的节点。DistributedItemSampler
是ItemSampler
的分布式版本,适用于DistributedDataParallel
。它是作为ItemSampler
的包装器实现的,并且会在所有副本上采样相同的小批量。它还支持丢弃最后一个不完整的小批量,以避免需要填充。
def create_dataloader(
graph,
features,
itemset,
device,
is_train,
):
datapipe = gb.DistributedItemSampler(
item_set=itemset,
batch_size=1024,
drop_last=is_train,
shuffle=is_train,
drop_uneven_inputs=is_train,
)
datapipe = datapipe.copy_to(device)
# Now that we have moved to device, sample_neighbor and fetch_feature steps
# will be executed on GPUs.
datapipe = datapipe.sample_neighbor(graph, [10, 10, 10])
datapipe = datapipe.fetch_feature(features, node_feature_keys=["feat"])
return gb.DataLoader(datapipe)
def weighted_reduce(tensor, weight, dst=0):
########################################################################
# (HIGHLIGHT) Collect accuracy and loss values from sub-processes and
# obtain overall average values.
#
# `torch.distributed.reduce` is used to reduce tensors from all the
# sub-processes to a specified process, ReduceOp.SUM is used by default.
#
# Because the GPUs may have differing numbers of processed items, we
# perform a weighted mean to calculate the exact loss and accuracy.
########################################################################
dist.reduce(tensor=tensor, dst=dst)
weight = torch.tensor(weight, device=tensor.device)
dist.reduce(tensor=weight, dst=dst)
return tensor / weight
评估循环
评估循环与之前的教程几乎相同。
@torch.no_grad()
def evaluate(rank, model, graph, features, itemset, num_classes, device):
model.eval()
y = []
y_hats = []
dataloader = create_dataloader(
graph,
features,
itemset,
device,
is_train=False,
)
for data in tqdm(dataloader) if rank == 0 else dataloader:
blocks = data.blocks
x = data.node_features["feat"]
y.append(data.labels)
y_hats.append(model.module(blocks, x))
res = MF.accuracy(
torch.cat(y_hats),
torch.cat(y),
task="multiclass",
num_classes=num_classes,
)
return res.to(device), sum(y_i.size(0) for y_i in y)
Training Loop
训练循环与之前的教程几乎相同,除了我们使用Join Context Manager来解决输入不均匀的问题。PyTorch中的分布式数据并行(DDP)训练机制要求所有等级的输入数量相同,否则程序可能会出错或挂起。为了解决这个问题,PyTorch提供了Join Context Manager。请参考此教程以获取详细信息。
def train(
rank,
graph,
features,
train_set,
valid_set,
num_classes,
model,
device,
):
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
# Create training data loader.
dataloader = create_dataloader(
graph,
features,
train_set,
device,
is_train=True,
)
for epoch in range(5):
epoch_start = time.time()
model.train()
total_loss = torch.tensor(0, dtype=torch.float, device=device)
num_train_items = 0
with Join([model]):
for data in tqdm(dataloader) if rank == 0 else dataloader:
# The input features are from the source nodes in the first
# layer's computation graph.
x = data.node_features["feat"]
# The ground truth labels are from the destination nodes
# in the last layer's computation graph.
y = data.labels
blocks = data.blocks
y_hat = model(blocks, x)
# Compute loss.
loss = F.cross_entropy(y_hat, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.detach() * y.size(0)
num_train_items += y.size(0)
# Evaluate the model.
if rank == 0:
print("Validating...")
acc, num_val_items = evaluate(
rank,
model,
graph,
features,
valid_set,
num_classes,
device,
)
total_loss = weighted_reduce(total_loss, num_train_items)
acc = weighted_reduce(acc * num_val_items, num_val_items)
# We synchronize before measuring the epoch time.
torch.cuda.synchronize()
epoch_end = time.time()
if rank == 0:
print(
f"Epoch {epoch:05d} | "
f"Average Loss {total_loss.item():.4f} | "
f"Accuracy {acc.item():.4f} | "
f"Time {epoch_end - epoch_start:.4f}"
)
定义训练和评估程序
以下代码定义了每个进程的主函数。它与之前的教程类似,只是我们需要使用torch.distributed
初始化一个分布式训练上下文,并使用torch.nn.parallel.DistributedDataParallel
包装模型。
def run(rank, world_size, devices, dataset):
# Set up multiprocessing environment.
device = devices[rank]
torch.cuda.set_device(device)
dist.init_process_group(
backend="nccl", # Use NCCL backend for distributed GPU training
init_method="tcp://127.0.0.1:12345",
world_size=world_size,
rank=rank,
)
# Pin the graph and features in-place to enable GPU access.
graph = dataset.graph.pin_memory_()
features = dataset.feature.pin_memory_()
train_set = dataset.tasks[0].train_set
valid_set = dataset.tasks[0].validation_set
num_classes = dataset.tasks[0].metadata["num_classes"]
in_size = features.size("node", None, "feat")[0]
hidden_size = 256
out_size = num_classes
# Create GraphSAGE model. It should be copied onto a GPU as a replica.
model = SAGE(in_size, hidden_size, out_size).to(device)
model = DDP(model)
# Model training.
if rank == 0:
print("Training...")
train(
rank,
graph,
features,
train_set,
valid_set,
num_classes,
model,
device,
)
# Test the model.
if rank == 0:
print("Testing...")
test_set = dataset.tasks[0].test_set
test_acc, num_test_items = evaluate(
rank,
model,
graph,
features,
itemset=test_set,
num_classes=num_classes,
device=device,
)
test_acc = weighted_reduce(test_acc * num_test_items, num_test_items)
if rank == 0:
print(f"Test Accuracy {test_acc.item():.4f}")
Spawning Trainer Processes
以下代码为每个GPU生成一个进程,并调用上面定义的run
函数。
def main():
if not torch.cuda.is_available():
print("No GPU found!")
return
devices = [
torch.device(f"cuda:{i}") for i in range(torch.cuda.device_count())
]
world_size = len(devices)
print(f"Training with {world_size} gpus.")
# Load and preprocess dataset.
dataset = gb.BuiltinDataset("ogbn-arxiv").load()
# Thread limiting to avoid resource competition.
os.environ["OMP_NUM_THREADS"] = str(mp.cpu_count() // 2 // world_size)
mp.set_sharing_strategy("file_system")
mp.spawn(
run,
args=(world_size, devices, dataset),
nprocs=world_size,
join=True,
)
if __name__ == "__main__":
main()
No GPU found!
脚本的总运行时间: (0 分钟 0.594 秒)