流水线并行¶
流水线并行最初在Gpipe论文中提出,是一种在多个GPU上高效训练大型模型的技术。
警告
torch.distributed.pipeline 已被弃用,因此本文档也不再更新。如需了解最新的流水线并行实现,请参考 PyTorch 组织下的 PiPPy 库(PyTorch 的流水线并行)。
使用多GPU的模型并行¶
通常对于不适合单个GPU的大型模型,会采用模型并行化,将模型的某些部分放置在不同的GPU上。尽管如此,如果对顺序模型进行简单的并行化处理,训练过程会因为GPU利用率不足而受到影响,因为如下面图所示,在某一时刻只有一个GPU处于活动状态:
流水线执行¶
为了缓解这个问题,流水线并行将输入的小批次分割成多个微批次,并在多个GPU上流水线执行这些微批次。如下图所示:
PyTorch 中的管道 API¶
- class torch.distributed.pipeline.sync.Pipe(module, chunks=1, checkpoint='except_last', deferred_batch_norm=False)[源代码]¶
将任意
nn.Sequential
模块包装起来,以便使用同步管道并行进行训练。如果模块需要大量内存且无法容纳在单个GPU上,管道并行是一种有用的训练技术。该实现基于torchgpipe论文。
Pipe 结合了流水线并行与检查点技术,以减少训练所需的峰值内存,同时最小化设备利用不足的情况。
你应该将所有模块放置在适当的设备上,并将它们包装到一个定义所需执行顺序的
nn.Sequential
模块中。如果一个模块不包含任何参数/缓冲区,则假定该模块应在CPU上执行,并且在执行之前会将模块的适当输入张量移动到CPU。此行为可以通过WithDevice
包装器来覆盖,该包装器可用于明确指定模块应在哪个设备上运行。- Parameters
模块 (
nn.Sequential
) – 使用流水线并行化的顺序模块。序列中的每个模块都必须将其所有参数放在单个设备上。序列中的每个模块必须是 nn.Module 或nn.Sequential
(以将多个顺序模块组合在单个设备上)chunks (int) – 微批次的数量(默认值:
1
)checkpoint (str) – 启用检查点的时间,可以是
'always'
、'except_last'
或'never'
(默认值:'except_last'
)。'never'
完全禁用检查点,'except_last'
启用除最后一个微批次外的所有微批次的检查点 而'always'
为所有微批次启用检查点。deferred_batch_norm (bool) – 是否使用延迟的
BatchNorm
移动统计信息(默认值:False
)。如果设置为True
,我们会在多个微批次中跟踪统计信息,以更新每个小批次的运行统计信息。
- Raises
TypeError – 该模块不是一个
nn.Sequential
。ValueError – 无效参数
- Example::
在GPU 0 和 GPU 1 之间进行两层全连接层的流水线操作。
>>> # 需要先初始化RPC框架。 >>> os.environ['MASTER_ADDR'] = 'localhost' >>> os.environ['MASTER_PORT'] = '29500' >>> torch.distributed.rpc.init_rpc('worker', rank=0, world_size=1) >>> >>> # 构建管道。 >>> fc1 = nn.Linear(16, 8).cuda(0) >>> fc2 = nn.Linear(8, 4).cuda(1) >>> model = nn.Sequential(fc1, fc2) >>> model = Pipe(model, chunks=8) >>> input = torch.rand(16, 16).cuda(0) >>> output_rref = model(input)
注意
你可以将一个
Pipe
模型与torch.nn.parallel.DistributedDataParallel
一起使用,仅当Pipe
的 checkpoint 参数为'never'
时。注意
Pipe
目前仅支持节点内流水线,但未来将扩展以支持节点间流水线。 前向函数返回一个RRef
,以便在未来实现节点间流水线时,输出可能位于远程主机上。对于节点内流水线,您可以使用local_value()
在本地检索输出。警告
Pipe
是实验性的,可能会发生变化。- forward(*inputs)[源代码]¶
通过管道处理单个输入小批次并返回一个指向输出的
RRef
。Pipe
是一个相当透明的模块包装器。它不会修改底层模块的输入和输出签名。但是存在类型限制。输入和输出必须至少包含一个张量。此限制也适用于分区边界。输入序列被送入管道的第一个阶段,作为
*inputs
。因此,此函数的位置参数应与管道第一个阶段的位置参数匹配。同样的条件适用于管道一个阶段的输出,该输出是下一个阶段的输入。输入张量根据用于初始化
Pipe
的chunks
参数被分割成多个微批次。批次大小假设为张量的第一个维度,如果批次大小小于chunks
,则微批次的数量等于批次大小。只有张量被分割成多个微批次,非张量输入在每个微批次中按原样复制。对于管道最后一个阶段的非张量输出,它们被聚合为一个
列表
并返回给用户。例如,如果你有2个微批次返回整数5,用户将收到合并的输出[5, 5]。所有输入张量需要与管道的第一个分区位于同一设备上。
如果一个张量被
NoChunk
包装器包装,该张量不会在微批次之间拆分,并且会像非张量一样被复制。- Parameters
输入 – 输入小批量
- Returns
RRef
到小批量输出的引用- Raises
TypeError – 输入不包含至少一个张量
- Return type
RRef
跳跃连接¶
某些模型,如ResNeXt, 并不是完全顺序的,并且在层之间有跳跃连接。 如果简单地将其作为流水线并行的一部分来实现,则意味着 我们需要在最终到达跳跃连接所在的GPU之前,通过多个GPU复制某些层的输出。 为了避免这种复制开销,我们提供了以下API来在模型的不同层中存储和弹出张量。
- torch.distributed.pipeline.sync.skip.skippable.skippable(stash=(), pop=())[源代码]¶
定义一个装饰器来创建带有跳跃连接的
nn.Module
。这些装饰的模块被称为“可跳过”。即使模块没有被
Pipe
包裹,此功能也能完美工作。每个跳跃张量都由其名称管理。在操作跳跃张量之前,跳跃模块必须通过stash和/或pop参数静态声明跳跃张量的名称。具有预声明名称的跳跃张量可以通过
yield stash(name, tensor)
存储,或者通过tensor = yield pop(name)
弹出。这里是一个包含三层的示例。一个名为“1to3”的跳跃张量分别在第一层和最后一层被存储和弹出:
@skippable(stash=['1to3']) class Layer1(nn.Module): def forward(self, input): yield stash('1to3', input) return f1(input) class Layer2(nn.Module): def forward(self, input): return f2(input) @skippable(pop=['1to3']) class Layer3(nn.Module): def forward(self, input): skip_1to3 = yield pop('1to3') return f3(input) + skip_1to3 model = nn.Sequential(Layer1(), Layer2(), Layer3())
一个可跳过的模块可以存储或弹出多个跳过张量:
@skippable(stash=['alice', 'bob'], pop=['carol']) class StashStashPop(nn.Module): def forward(self, input): yield stash('alice', f_alice(input)) yield stash('bob', f_bob(input)) carol = yield pop('carol') return input + carol
每个跳过张量必须与恰好一对存储和弹出相关联。
Pipe
在包装模块时会自动检查此限制。您也可以在不使用Pipe
的情况下,通过verify_skippables()
来检查此限制。
- class torch.distributed.pipeline.sync.skip.skippable.stash(name, tensor)[源代码]¶
存储一个跳过张量的命令。
def forward(self, input): yield stash('name', input) return f(input)
- Parameters
名称 (str) – 跳过张量的名称
输入 (torch.Tensor 或 None) – 传递给跳跃连接的张量
- class torch.distributed.pipeline.sync.skip.skippable.pop(name)[源代码]¶
弹出跳过张量的命令。
def forward(self, input): skip = yield pop('name') return f(input) + skip
- Parameters
名称 (str) – 跳过张量的名称
- Returns
之前由另一层以相同名称存储的跳过张量
- Return type
无
- torch.distributed.pipeline.sync.skip.skippable.verify_skippables(module)[源代码]¶
验证底层可跳过模块是否满足完整性。
每个跳过张量必须只有一对 stash 和 pop。如果存在一个或多个不匹配的对,将会引发
TypeError
,并附带详细信息。以下是几个失败案例。
verify_skippables()
会报告这些案例的失败:# Layer1 存储 "1to3"。 # Layer3 弹出 "1to3"。 nn.Sequential(Layer1(), Layer2()) # └──── ? nn.Sequential(Layer2(), Layer3()) # ? ────┘ nn.Sequential(Layer1(), Layer2(), Layer3(), Layer3()) # └───────────────────┘ ^^^^^^ nn.Sequential(Layer1(), Layer1(), Layer2(), Layer3()) # ^^^^^^ └───────────────────┘
要为多个跳过张量使用相同名称,它们必须通过不同的命名空间进行隔离。请参阅
isolate()
。- Raises
TypeError – 一个或多个 stash 和 pop 对不匹配。
致谢¶
流水线并行性的实现基于fairscale的管道实现和 torchgpipe。我们感谢这两个团队对将流水线并行性引入PyTorch所做的贡献和指导。