Shortcuts

流水线并行

流水线并行最初在Gpipe论文中提出,是一种在多个GPU上高效训练大型模型的技术。

警告

torch.distributed.pipeline 已被弃用,因此本文档也不再更新。如需了解最新的流水线并行实现,请参考 PyTorch 组织下的 PiPPy 库(PyTorch 的流水线并行)。

使用多GPU的模型并行

通常对于不适合单个GPU的大型模型,会采用模型并行化,将模型的某些部分放置在不同的GPU上。尽管如此,如果对顺序模型进行简单的并行化处理,训练过程会因为GPU利用率不足而受到影响,因为如下面图所示,在某一时刻只有一个GPU处于活动状态:

_images/no_pipe.png

该图展示了一个具有4层的模型,分别放置在4个不同的GPU上(纵轴)。横轴表示通过时间训练该模型,表明在某一时刻仅使用1个GPU (图片来源)。

流水线执行

为了缓解这个问题,流水线并行将输入的小批次分割成多个微批次,并在多个GPU上流水线执行这些微批次。如下图所示:

_images/pipe.png

该图展示了一个具有4层的模型,分别放置在4个不同的GPU上(纵轴)。横轴表示通过时间训练该模型,展示了GPU的利用率得到了更高效的提升。然而,图中仍然存在一个气泡(如图所示),其中某些GPU未被充分利用。 (图片来源).

PyTorch 中的管道 API

class torch.distributed.pipeline.sync.Pipe(module, chunks=1, checkpoint='except_last', deferred_batch_norm=False)[源代码]

将任意 nn.Sequential 模块包装起来,以便使用同步管道并行进行训练。如果模块需要大量内存且无法容纳在单个GPU上,管道并行是一种有用的训练技术。

该实现基于torchgpipe论文。

Pipe 结合了流水线并行与检查点技术,以减少训练所需的峰值内存,同时最小化设备利用不足的情况。

你应该将所有模块放置在适当的设备上,并将它们包装到一个定义所需执行顺序的nn.Sequential模块中。如果一个模块不包含任何参数/缓冲区,则假定该模块应在CPU上执行,并且在执行之前会将模块的适当输入张量移动到CPU。此行为可以通过WithDevice包装器来覆盖,该包装器可用于明确指定模块应在哪个设备上运行。

Parameters
  • 模块 (nn.Sequential) – 使用流水线并行化的顺序模块。序列中的每个模块都必须将其所有参数放在单个设备上。序列中的每个模块必须是 nn.Module 或 nn.Sequential(以将多个顺序模块组合在单个设备上)

  • chunks (int) – 微批次的数量(默认值:1

  • checkpoint (str) – 启用检查点的时间,可以是 'always''except_last''never'(默认值:'except_last')。 'never' 完全禁用检查点,'except_last' 启用除最后一个微批次外的所有微批次的检查点 而 'always' 为所有微批次启用检查点。

  • deferred_batch_norm (bool) – 是否使用延迟的 BatchNorm 移动统计信息(默认值: False)。如果设置为 True,我们会在多个微批次中跟踪统计信息,以更新每个小批次的运行统计信息。

Raises
Example::

在GPU 0 和 GPU 1 之间进行两层全连接层的流水线操作。

>>> # 需要先初始化RPC框架。
>>> os.environ['MASTER_ADDR'] = 'localhost'
>>> os.environ['MASTER_PORT'] = '29500'
>>> torch.distributed.rpc.init_rpc('worker', rank=0, world_size=1)
>>>
>>> # 构建管道。
>>> fc1 = nn.Linear(16, 8).cuda(0)
>>> fc2 = nn.Linear(8, 4).cuda(1)
>>> model = nn.Sequential(fc1, fc2)
>>> model = Pipe(model, chunks=8)
>>> input = torch.rand(16, 16).cuda(0)
>>> output_rref = model(input)

注意

你可以将一个 Pipe 模型与 torch.nn.parallel.DistributedDataParallel 一起使用,仅当 Pipe 的 checkpoint 参数为 'never' 时。

注意

Pipe 目前仅支持节点内流水线,但未来将扩展以支持节点间流水线。 前向函数返回一个 RRef,以便在未来实现节点间流水线时,输出可能位于远程主机上。对于节点内流水线,您可以使用 local_value() 在本地检索输出。

警告

Pipe 是实验性的,可能会发生变化。

forward(*inputs)[源代码]

通过管道处理单个输入小批次并返回一个指向输出的 RRefPipe 是一个相当透明的模块包装器。它不会修改底层模块的输入和输出签名。但是存在类型限制。输入和输出必须至少包含一个张量。此限制也适用于分区边界。

输入序列被送入管道的第一个阶段,作为 *inputs。因此,此函数的位置参数应与管道第一个阶段的位置参数匹配。同样的条件适用于管道一个阶段的输出,该输出是下一个阶段的输入。

输入张量根据用于初始化Pipechunks参数被分割成多个微批次。批次大小假设为张量的第一个维度,如果批次大小小于chunks,则微批次的数量等于批次大小。

只有张量被分割成多个微批次,非张量输入在每个微批次中按原样复制。对于管道最后一个阶段的非张量输出,它们被聚合为一个列表并返回给用户。例如,如果你有2个微批次返回整数5,用户将收到合并的输出[5, 5]

所有输入张量需要与管道的第一个分区位于同一设备上。

如果一个张量被 NoChunk 包装器包装,该张量不会在微批次之间拆分,并且会像非张量一样被复制。

Parameters

输入 – 输入小批量

Returns

RRef 到小批量输出的引用

Raises

TypeError – 输入不包含至少一个张量

Return type

RRef

跳跃连接

某些模型,如ResNeXt, 并不是完全顺序的,并且在层之间有跳跃连接。 如果简单地将其作为流水线并行的一部分来实现,则意味着 我们需要在最终到达跳跃连接所在的GPU之前,通过多个GPU复制某些层的输出。 为了避免这种复制开销,我们提供了以下API来在模型的不同层中存储和弹出张量。

torch.distributed.pipeline.sync.skip.skippable.skippable(stash=(), pop=())[源代码]

定义一个装饰器来创建带有跳跃连接的 nn.Module

这些装饰的模块被称为“可跳过”。即使模块没有被Pipe包裹,此功能也能完美工作。

每个跳跃张量都由其名称管理。在操作跳跃张量之前,跳跃模块必须通过stash和/或pop参数静态声明跳跃张量的名称。具有预声明名称的跳跃张量可以通过yield stash(name, tensor)存储,或者通过tensor = yield pop(name)弹出。

这里是一个包含三层的示例。一个名为“1to3”的跳跃张量分别在第一层和最后一层被存储和弹出:

@skippable(stash=['1to3'])
class Layer1(nn.Module):
    def forward(self, input):
        yield stash('1to3', input)
        return f1(input)

class Layer2(nn.Module):
    def forward(self, input):
        return f2(input)

@skippable(pop=['1to3'])
class Layer3(nn.Module):
    def forward(self, input):
        skip_1to3 = yield pop('1to3')
        return f3(input) + skip_1to3

model = nn.Sequential(Layer1(), Layer2(), Layer3())

一个可跳过的模块可以存储或弹出多个跳过张量:

@skippable(stash=['alice', 'bob'], pop=['carol'])
class StashStashPop(nn.Module):
    def forward(self, input):
        yield stash('alice', f_alice(input))
        yield stash('bob', f_bob(input))
        carol = yield pop('carol')
        return input + carol

每个跳过张量必须与恰好一对存储弹出相关联。Pipe在包装模块时会自动检查此限制。您也可以在不使用Pipe的情况下,通过verify_skippables()来检查此限制。

Return type

可调用[[类型[模块]], 类型[可跳过]]

class torch.distributed.pipeline.sync.skip.skippable.stash(name, tensor)[源代码]

存储一个跳过张量的命令。

def forward(self, input):
    yield stash('name', input)
    return f(input)
Parameters
  • 名称 (str) – 跳过张量的名称

  • 输入 (torch.TensorNone) – 传递给跳跃连接的张量

class torch.distributed.pipeline.sync.skip.skippable.pop(name)[源代码]

弹出跳过张量的命令。

def forward(self, input):
    skip = yield pop('name')
    return f(input) + skip
Parameters

名称 (str) – 跳过张量的名称

Returns

之前由另一层以相同名称存储的跳过张量

Return type

torch.distributed.pipeline.sync.skip.skippable.verify_skippables(module)[源代码]

验证底层可跳过模块是否满足完整性。

每个跳过张量必须只有一对 stashpop。如果存在一个或多个不匹配的对,将会引发 TypeError,并附带详细信息。

以下是几个失败案例。verify_skippables() 会报告这些案例的失败:

# Layer1 存储 "1to3"。
# Layer3 弹出 "1to3"。

nn.Sequential(Layer1(), Layer2())
#               └──── ?

nn.Sequential(Layer2(), Layer3())
#                   ? ────┘

nn.Sequential(Layer1(), Layer2(), Layer3(), Layer3())
#               └───────────────────┘       ^^^^^^

nn.Sequential(Layer1(), Layer1(), Layer2(), Layer3())
#             ^^^^^^      └───────────────────┘

要为多个跳过张量使用相同名称,它们必须通过不同的命名空间进行隔离。请参阅 isolate()

Raises

TypeError – 一个或多个 stashpop 对不匹配。

教程

以下教程概述了如何使用Pipe API 与 PyTorch 提供的其他组件一起训练您的模型:

致谢

流水线并行性的实现基于fairscale的管道实现torchgpipe。我们感谢这两个团队对将流水线并行性引入PyTorch所做的贡献和指导。