使用TensorPipe CUDA RPC进行直接设备到设备通信¶
创建于:2021年3月19日 | 最后更新:2021年3月19日 | 最后验证:2024年11月5日
注意
PyTorch 1.8 引入了直接设备到设备的 RPC(CUDA RPC)作为原型功能。此 API 可能会发生变化。
在本教程中,您将学习:
CUDA RPC的高级概念。
如何使用CUDA RPC。
需求¶
PyTorch 1.8+
什么是CUDA RPC?¶
CUDA RPC supports directly sending Tensors from local CUDA memory to remote
CUDA memory. Prior to v1.8 release, PyTorch RPC only accepts CPU Tensors. As a
result, when an application needs to send a CUDA Tensor through RPC, it has
to first move the Tensor to CPU on the caller, send it via RPC, and then move
it to the destination device on the callee, which incurs both unnecessary
synchronizations and D2H and H2D copies. Since v1.8, RPC allows users to
configure a per-process global device map using the
set_device_map
API, specifying how to map local devices to remote devices. More specifically,
if worker0’s device map has an entry "worker1" : {"cuda:0" : "cuda:1"},
all RPC arguments on "cuda:0" from worker0 will be directly sent to
"cuda:1" on worker1. The response of an RPC will use the inverse of
the caller device map, i.e., if worker1 returns a Tensor on "cuda:1",
it will be directly sent to "cuda:0" on worker0. All intended
device-to-device direct communication must be specified in the per-process
device map. Otherwise, only CPU tensors are allowed.
在底层,PyTorch RPC 依赖于 TensorPipe 作为通信后端。PyTorch RPC 从每个请求或响应中提取所有张量到一个列表中,并将其他所有内容打包成一个二进制负载。然后,TensorPipe 将根据张量的设备类型以及调用方和被调用方上的通道可用性,自动为每个张量选择通信通道。现有的 TensorPipe 通道涵盖了 NVLink、InfiniBand、SHM、CMA、TCP 等。
如何使用CUDA RPC?¶
下面的代码展示了如何使用CUDA RPC。该模型包含两个线性层,并被分成两个分片。这两个分片分别放置在worker0和worker1上,并且worker0作为主节点驱动前向和后向传递。请注意,我们有意跳过了DistributedOptimizer,以突出使用CUDA RPC时的性能改进。实验重复前向和后向传递10次,并测量总执行时间。它将使用CUDA RPC与手动暂存到CPU内存并使用CPU RPC进行了比较。
import torch
import torch.distributed.autograd as autograd
import torch.distributed.rpc as rpc
import torch.multiprocessing as mp
import torch.nn as nn
import os
import time
class MyModule(nn.Module):
def __init__(self, device, comm_mode):
super().__init__()
self.device = device
self.linear = nn.Linear(1000, 1000).to(device)
self.comm_mode = comm_mode
def forward(self, x):
# x.to() is a no-op if x is already on self.device
y = self.linear(x.to(self.device))
return y.cpu() if self.comm_mode == "cpu" else y
def parameter_rrefs(self):
return [rpc.RRef(p) for p in self.parameters()]
def measure(comm_mode):
# local module on "worker0/cuda:0"
lm = MyModule("cuda:0", comm_mode)
# remote module on "worker1/cuda:1"
rm = rpc.remote("worker1", MyModule, args=("cuda:1", comm_mode))
# prepare random inputs
x = torch.randn(1000, 1000).cuda(0)
tik = time.time()
for _ in range(10):
with autograd.context() as ctx:
y = rm.rpc_sync().forward(lm(x))
autograd.backward(ctx, [y.sum()])
# synchronize on "cuda:0" to make sure that all pending CUDA ops are
# included in the measurements
torch.cuda.current_stream("cuda:0").synchronize()
tok = time.time()
print(f"{comm_mode} RPC total execution time: {tok - tik}")
def run_worker(rank):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
options = rpc.TensorPipeRpcBackendOptions(num_worker_threads=128)
if rank == 0:
options.set_device_map("worker1", {0: 1})
rpc.init_rpc(
f"worker{rank}",
rank=rank,
world_size=2,
rpc_backend_options=options
)
measure(comm_mode="cpu")
measure(comm_mode="cuda")
else:
rpc.init_rpc(
f"worker{rank}",
rank=rank,
world_size=2,
rpc_backend_options=options
)
# block until all rpcs finish
rpc.shutdown()
if __name__=="__main__":
world_size = 2
mp.spawn(run_worker, nprocs=world_size, join=True)
输出结果显示如下,表明在此实验中,CUDA RPC 相比 CPU RPC 可以实现 34 倍的加速。
cpu RPC total execution time: 2.3145179748535156 Seconds
cuda RPC total execution time: 0.06867480278015137 Seconds