管道并行
DeepSpeed v0.3 新增了对管道并行的支持!管道并行通过将模型的层划分为可以并行处理的阶段,提高了深度学习训练的内存和计算效率。 DeepSpeed 的训练引擎提供了混合数据和管道并行,并且可以进一步与模型并行(如 Megatron-LM)结合使用。 下面展示了3D并行的示意图。我们最新的 结果 表明,这种3D并行能够训练具有超过 万亿 参数的模型。

DeepSpeed 使用梯度累积来提取管道并行性(如下所示)。每批训练数据被划分为可以在管道阶段并行处理的微批次。一旦一个阶段完成了一个微批次的前向传递,激活内存就会被传递到管道中的下一个阶段。同样,当下一阶段完成其对微批次的反向传递时,关于激活的梯度会通过管道向后传递。每个反向传递都会在本地累积梯度。接下来,所有数据并行组并行执行梯度的归约。最后,优化器更新模型权重。
以下是DeepSpeed如何使用混合双向数据并行和两阶段管道并行训练一个包含八个微批次的批次的示意图。GPU 0和2被安排在一个管道中,将交替进行前向(F)和后向(B)传递。然后,它们将分别与它们的数据并行对应物GPU 1和3进行梯度全归约(AR)。最后,两个管道阶段更新它们的模型权重。

开始使用管道并行
DeepSpeed 致力于加速并简化管道并行训练的过程。本节通过准备torchvision的AlexNet模型,提供了混合数据和管道并行训练的初步步骤。
表达管道模型
管道并行要求模型被表达为一系列层。在前向传播中,每一层消耗前一层的输出。实际上,不需要为管道并行模型指定forward()!管道并行模型的前向传播隐式地采用以下形式:
def forward(self, inputs):
x = inputs
for layer in self.layers:
x = layer(x)
return x
PyTorch的
torch.nn.Sequential
是一个方便的容器,用于表达管道并行模型,并且可以通过DeepSpeed进行并行化而无需修改:
net = nn.Sequential(
nn.Linear(in_features, hidden_dim),
nn.ReLU(inplace=True),
nn.Linear(hidden_dim, out_features)
)
from deepspeed.pipe import PipelineModule
net = PipelineModule(layers=net, num_stages=2)
PipelineModule 使用其 layers 参数作为构成模型的层序列。初始化后,net 被分为两个管道阶段,并将其层移动到相应的 GPU 上。如果存在两个以上的 GPU,DeepSpeed 还将使用混合数据并行。
注意: GPU的总数必须能被流水线阶段的数量整除。
注意: 对于大型模型训练,请参阅 内存高效模型构建。
AlexNet
让我们看一下torchvision的AlexNet的简化实现:
class AlexNet(nn.Module):
def __init__(self, num_classes=1000):
super(AlexNet, self).__init__()
self.features = nn.Sequential(
nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2),
...
nn.MaxPool2d(kernel_size=3, stride=2),
)
self.avgpool = nn.AdaptiveAvgPool2d((6, 6))
self.classifier = nn.Sequential(
nn.Dropout(),
...
nn.Linear(4096, num_classes),
)
def forward(self, x):
x = self.features(x)
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.classifier(x)
return x
AlexNet 主要由几个 Sequential 子模块组成。我们可以通过将其子模块展平为单一层序列,将其转换为 PipelineModule:
class AlexNetPipe(AlexNet):
def to_layers(self):
layers = [
*self.features,
self.avgpool,
lambda x: torch.flatten(x, 1),
*self.classifier
]
return layers
from deepspeed.pipe import PipelineModule
net = AlexNetPipe()
net = PipelineModule(layers=net.to_layers(), num_stages=2)
注意:
上面layers中间的lambda不是torch.nn.Module
类型。任何实现了__call__()的对象都可以作为PipelineModule中的一层:这允许在管道中进行方便的数据转换。
输入和输出
遵循torch.nn.Sequential,每一层的输入和输出必须是单个torch.Tensor或张量的tuple。实际上,一些模型可能需要修改其前向传递以打包和解包forward()的参数。考虑一个简化的Transformer块堆栈实现:
class TransformerBlock(nn.Module)
...
def forward(self, hidden, mask):
output = self.compute(hidden, mask)
return output
...
stack = [ TransformerBlock() for _ in range(num_layers) ]
需要对TransformerBlock进行两项修改:
- 参数必须收集到一个
tuple中。 mask也必须从forward()返回,以便传递给下一层。
这些修改可以通过一个简短的子类来完成:
class TransformerBlockPipe(TransformerBlock)
def forward(self, inputs):
hidden, mask = inputs
output = super().forward(hidden, mask)
return (output, mask)
stack = [ TransformerBlockPipe() for _ in range(num_layers) ]
训练循环
管道并行性交错进行前向和后向传递,因此训练循环不能分为单独的forward()、backward()和step()阶段。相反,DeepSpeed的管道引擎提供了一个train_batch()方法,该方法推进管道引擎,直到消耗下一批训练数据并更新模型权重。
train_iter = iter(train_loader)
loss = engine.train_batch(data_iter=train_iter)
上面的 train_batch() 示例等同于以下使用传统数据并行 DeepSpeed 的情况:
train_iter = iter(train_loader)
for micro_batch in engine.gradient_accumulation_steps():
batch = next(data_iter)
loss = engine(batch)
engine.backward(loss)
engine.step()
处理数据
数据并行训练通常在每个批次的开始时,每个工作节点独立执行IO操作。然而,在管道并行环境中,只有第一个阶段使用输入数据,只有最后一个阶段使用标签进行损失计算。
注意:
管道引擎期望数据加载器返回一个包含两个项目的tuple。第一个返回的项目是输入批次数据,第二个项目是用于损失计算的数据。和之前一样,输入和标签应该是torch.Tensor类型或一个包含张量的tuple。
为了方便起见,当向deepspeed.initialize()提供数据集时,DeepSpeed管道引擎可以构建一个分布式数据加载器。DeepSpeed处理数据加载的其余复杂性,因此管道训练循环变为:
engine, _, _, _ = deepspeed.initialize(
args=args,
model=net,
model_parameters=[p for p in net.parameters() if p.requires_grad],
training_data=cifar_trainset())
for step in range(args.steps):
loss = engine.train_batch()
当然,DeepSpeed 将与您希望使用的任何数据加载器一起工作。
数据加载器应由管道中的第一个和最后一个阶段构建。
每个工作器应加载大小为 engine.train_micro_batch_size_per_gpu() 的微批次,并且每个 train_batch() 将被查询总共 engine.gradient_accumulation_steps() 次。
注意!
管道引擎从迭代器中拉取数据,而不是对其进行迭代。在训练批次中间,数据流不能为空,这一点至关重要。每次调用train_batch()时,都会从数据迭代器中拉取总共engine.gradient_accumulation_steps()个微批次的数据。
DeepSpeed 提供了一个便利类 deepspeed.utils.RepeatingLoader,它简单地包装了一个可迭代对象,例如数据加载器,并在达到末尾时重新启动它:
train_loader = deepspeed.utils.RepeatingLoader(train_loader)
train_iter = iter(train_loader)
for step in range(args.steps):
loss = engine.train_batch(data_iter=train_iter)
高级主题
负载均衡管道模块
管道并行训练的性能在很大程度上依赖于负载平衡。DeepSpeed 提供了几种机制来在 GPU 之间划分模型。这些策略可以通过 partition_method 关键字参数设置到 PipelineModule。以下是 DeepSpeed 目前提供的划分方法:
partition_method="parameters"(默认) 在每个管道阶段平衡可训练参数的数量。这在内存受限的环境中以及当层的大小与计算时间成正比时特别有用。partition_method="type:[regex]"平衡那些类名匹配[regex]的层。正则表达式不区分大小写。例如,partition_method="type:transformer"将平衡每个阶段的transformer层数量。partition_method="uniform"平衡每个阶段的层数。
内存高效的模型构建
构建一个Sequential容器并将其提供给PipelineModule是指定管道并行模型的一种便捷方式。然而,这种方法在大型模型中会遇到可扩展性问题,因为每个工作进程都会在CPU内存中复制整个模型。例如,一台拥有16个GPU的机器必须拥有相当于模型大小16倍的本地CPU内存。
DeepSpeed 提供了一个 LayerSpec 类,它延迟了模块的构建,直到模型层在工作者之间进行了分区。然后每个工作者只会分配给它被分配的层。因此,与上一段的例子相比,使用 LayerSpec 的拥有 16 个 GPU 的机器将只需要在其 CPU 内存中分配 1 倍模型大小,而不是 16 倍。
这里是一个简化的AlexNet模型的示例,但仅使用LayerSpec表示。请注意,语法几乎未变:nn.ReLU(inplace=True) 简单地变成了 LayerSpec(nn.ReLU, inplace=True)。
from deepspeed.pipe import PipelineModule, LayerSpec
class AlexNetPipe(PipelineModule):
def __init__(self, num_classes=10, **kwargs):
self.num_classes = num_classes
specs = [
LayerSpec(nn.Conv2d, 3, 64, kernel_size=11, stride=4, padding=2),
LayerSpec(nn.ReLU, inplace=True),
...
LayerSpec(nn.ReLU, inplace=True),
LayerSpec(nn.Linear, 4096, self.num_classes),
]
super().__init__(layers=specs, loss_fn=nn.CrossEntropyLoss(), **kwargs)
绑定层
有些模型不能完全表示为管道并行模型,因为某些层在管道中被重复使用。例如,基于Transformer的语言模型通常在管道的早期使用嵌入层将词汇映射到隐藏状态,然后在管道的末尾使用嵌入将隐藏状态映射回词汇。如果模型仅限于纯管道并行,这种嵌入重用将禁止管道并行。
DeepSpeed 提供了一个 TiedLayerSpec,它是 LayerSpec 的扩展。TiedLayerSpec 需要一个额外的参数:key。每次重用层时都会使用 TiedLayerSpec 来指定,而 key 字段用于标识层的重用位置。
绑定的层在每个拥有重用实例的管道阶段上被复制。然后训练正常进行,但在所有反向传播完成后,会添加一个额外的全归约操作来同步绑定的梯度。全归约确保绑定层的权重在管道阶段之间保持同步。