管道并行

模型规范

class deepspeed.pipe.PipelineModule(layers, num_stages=None, topology=None, loss_fn=None, seed_layers=False, seed_fn=None, base_seed=1234, partition_method='parameters', activation_checkpoint_interval=0, activation_checkpoint_func=<function checkpoint>, checkpointable_layers=None, dynamic_shape=False)[来源]

使用管道并行化的模块。

实现管道并行的关键约束是将前向传播表示为一层层的序列,并在它们之间强制执行一个简单的接口。前向传播由模块layers隐式定义。关键假设是每一层的输出可以直接作为下一层的输入,就像torch.nn.Sequence一样。前向传播是隐式的:

def forward(self, inputs):
    x = inputs
    for layer in self.layers:
        x = layer(x)
    return x

注意

管道并行与ZeRO-2和ZeRO-3不兼容。

Parameters
  • layers (Iterable) – 定义管道结构的层序列。可以是torch.nn.Sequential模块。

  • num_stages (int, optional) – 管道并行的程度。如果未指定,必须提供topology

  • 拓扑结构 (deepspeed.runtime.pipe.ProcessTopology, 可选) – 定义训练的并行轴。如果 num_stagesNone,则必须提供。

  • loss_fn (可调用的, 可选的) – 损失计算为 loss = loss_fn(outputs, label)

  • seed_layers (bool, optional) – 为每一层使用不同的种子。默认为 False。

  • seed_fn (类型, 可选) – 自定义的种子生成函数。默认为随机种子生成器。

  • base_seed (int, optional) – 起始种子。默认为1234。

  • partition_method (str, optional) – 用于分层的方法。默认为‘parameters’。

  • activation_checkpoint_interval (int, optional) – 激活检查点的粒度,以层数为单位。0 表示禁用激活检查点。

  • activation_checkpoint_func (callable, optional) – 用于激活检查点的函数。默认为 deepspeed.checkpointing.checkpoint

  • checkpointable_layers (list[str], optional) – 可进行检查点的层类名称列表。对于GPT模型,无论此列表如何,ParallelTransformerLayerPipe始终会被检查点。如果为None,则所有带有参数的层都被视为可检查点。默认为None。

  • dynamic_shape – 允许输入具有动态形状。这可能会影响性能。

forward(forward_input)[来源]

定义每次调用时执行的计算。

应该由所有子类覆盖。

注意

尽管前向传递的配方需要在此函数内定义,但之后应该调用Module实例而不是这个,因为前者负责运行已注册的钩子,而后者则默默地忽略它们。

allreduce_tied_weight_gradients()[来源]

减少绑定阶段之间绑定权重的梯度

topology()[来源]

用于查询进程映射的ProcessTopology对象。

ckpt_prefix(checkpoints_path, tag)[来源]

为这个模块写入的所有检查点文件构建一个前缀。

ckpt_layer_path(ckpt_dir, local_layer_idx)[来源]

为特定的管道模块层自定义前缀。

ckpt_layer_path_list(ckpt_dir, local_layer_idx)[来源]

获取特定管道模块层的所有ckpt文件列表。

get_additional_losses()[来源]

返回模型特定的额外损失以供报告

返回一个字典 {“损失名称”: 损失值},如果没有额外的损失则返回 None。

compile(*args, **kwargs)[来源]

使用torch.compile()编译此模块的前向传播。

此模块的__call__方法被编译,所有参数都原封不动地传递给torch.compile()

有关此函数参数的详细信息,请参见 torch.compile()

class deepspeed.pipe.LayerSpec(typename, *module_args, **module_kwargs)[来源]

用于指定管道并行模块的构建块。

LayerSpec 存储了 PipelineModule 中每个阶段的类型信息和参数。例如:

nn.Sequence(
    torch.nn.Linear(self.in_dim, self.hidden_dim, bias=False),
    torch.nn.Linear(self.hidden_hidden, self.out_dim)
)

变成

layer_specs = [
    LayerSpec(torch.nn.Linear, self.in_dim, self.hidden_dim, bias=False),
    LayerSpec(torch.nn.Linear, self.hidden_hidden, self.out_dim)]
]
build(log=False)[来源]

构建存储规范。

class deepspeed.pipe.TiedLayerSpec(key, typename, *module_args, forward_fn=None, tied_weight_attr=['weight'], **module_kwargs)[来源]
class deepspeed.runtime.pipe.ProcessTopology(axes, dims)[来源]

管理n维笛卡尔坐标到线性索引的映射。此映射用于将进程的等级映射到网格,以支持各种形式的并行性。

张量的每个轴都通过其名称访问。提供的轴顺序定义了拓扑的布局。ProcessTopology使用张量轴的“行优先”布局,因此axes=['x', 'y']会将坐标(x,y)和(x,y+1)映射到相邻的线性索引。如果使用axes=['y', 'x'],则坐标(x,y)和(x+1,y)将是相邻的。

一些方法返回名为ProcessCoord的命名元组。

get_rank(**coord_kwargs)[来源]

通过其坐标返回进程的全局排名。

坐标被指定为关键字参数。例如:

>>> X = ProcessTopology(axes=['x', 'y'], dims=[2,3])
>>> X.get_rank(x=0, y=1)
1
get_axis_names()[来源]

返回拓扑结构中轴名称的列表。

get_rank_repr(rank, omit_axes=['data', 'pipe'], inner_sep='_', outer_sep='-')[来源]

返回一个表示等级的字符串。

此方法主要用于检查点模型数据。

For example:
>>> topo = Topo(axes=['a', 'b'], dims=[2, 2])
>>> topo.get_rank_repr(rank=3)
'a_01-b_01'
>>> topo.get_rank_repr(rank=3, omit_axes=['a'])
'b_01'
Parameters
  • rank (int) – 拓扑结构中的一个等级。

  • omit_axes (list, optional) – 不应包含在表示中的轴。默认为 [‘data’, ‘pipe’]。

  • inner_sep (str, 可选) – [描述]. 默认为‘_’.

  • outer_sep (str, 可选) – [描述]. 默认为‘-‘.

Returns

rank拥有的坐标的字符串表示。

Return type

字符串

get_dim(axis)[来源]

返回沿给定轴的进程数。

For example:
>>> X = ProcessTopology(axes=['x', 'y'], dims=[2,3])
>>> X.get_dim('y')
3
get_coord(rank)[来源]

返回由进程等级拥有的坐标。

返回的命名元组的轴可以直接作为成员访问。例如

>>> X = ProcessTopology(axes=['x', 'y'], dims=[2,3])
>>> coord = X.get_coord(rank=1)
>>> coord.x
0
>>> coord.y
1
get_axis_comm_lists(axis)[来源]

沿着轴 axis 构建适合通信器组的列表。

示例

>>> topo = Topo(axes=['pipe', 'data', 'model'], dims=[2, 2, 2])
>>> topo.get_axis_comm_lists('pipe')
[
    [0, 4], # data=0, model=0
    [1, 5], # data=0, model=1
    [2, 6], # data=1, model=0
    [3, 7], # data=1, model=1
]
Returns

一个列表的列表,其坐标在所有轴上匹配,除了 axis

filter_match(**filter_kwargs)[来源]

返回坐标符合提供条件的排名列表。

示例

>>> X = ProcessTopology(axes=['pipe', 'data', 'model'], dims=[2, 2, 2])
>>> X.filter_match(pipe=0, data=1)
[2, 3]
>>> [X.get_coord(rank) for rank in X.filter_match(pipe=0, data=1)]
[ProcessCoord(pipe=0, data=1, model=0), ProcessCoord(pipe=0, data=1, model=1)]
Parameters

**filter_kwargs (dict) – 用于选择坐标的标准。

Returns

坐标与 filter_kwargs 匹配的等级列表。

get_axis_list(axis, idx)[来源]

返回在某个轴上坐标为idx的全局排名列表。

For example:
>>> X = ProcessTopology(axes=['x', 'y'], dims=[2,3])
>>> X.get_axis_list(axis='x', idx=0)
[0, 1, 2]
>>> X.get_axis_list(axis='y', idx=0)
[0, 3]

培训

扩展管道并行性

class deepspeed.runtime.pipe.schedule.PipeSchedule(micro_batches, stages, stage_id)[来源]

通过生成PipeInstruction的序列来指导管道引擎的执行。

调度器是生成器,它们生成一系列PipeInstruction来处理一个批次中的微批次。每个生成的步骤在原子性方面是独立的,这意味着可以在连续的步骤之间放置屏障同步而不会导致死锁。

以下是一个实现数据并行与梯度累积的示例计划:

class DataParallelSchedule(PipeSchedule):
    def steps(self):
        for step_id in range(self.micro_batches):
            cmds = [
                LoadMicroBatch(buffer_id=0),
                ForwardPass(buffer_id=0),
                BackwardPass(buffer_id=0),
            ]
            if step_id == self.micro_batches - 1:
                cmds.extend([
                    ReduceGrads(),
                    OptimizerStep(),
                ])
            yield cmds

    def num_pipe_buffers(self):
        return 1
Parameters
  • micro_batches (int) – 组成一个批次的微批次数量。

  • stages (int) – 管道阶段的数量。

  • stage_id (int) – 将执行生成计划的管道阶段。

abstract steps()[来源]

为计划中的每个步骤生成一个PipeInstruction的列表。

注意

计划必须实现steps()来定义计划。

Returns

作为管道的一个步骤执行的指令

num_pipe_buffers()[来源]

此阶段将使用的管道缓冲区数量。

注意

调度应专门化num_pipe_buffers()以在大规模时节省内存。

Returns

引擎分配的缓冲区数量。

property stage

用于配置此计划的阶段索引。

property num_stages

用于配置此调度的总管道阶段数。

property num_micro_batches

用于配置此计划的微批次总数。

property is_first_stage

如果配置的stage_id是管道中的第一个阶段,则为True。

property is_last_stage

如果配置的stage_id是管道中的最后一个阶段,则为True。

class deepspeed.runtime.pipe.schedule.InferenceSchedule(micro_batches, stages, stage_id)[来源]

使用管道并行进行推理批次的计划。

num_pipe_buffers()[来源]

推理只需要两个管道缓冲区。

Returns

2

class deepspeed.runtime.pipe.schedule.TrainSchedule(micro_batches, stages, stage_id)[来源]

使用混合并行训练批次的计划。

管道并行性通过梯度累积提取,因此收敛性与具有相同批量大小的数据并行方法相同。

num_pipe_buffers()[来源]

返回此阶段所需的管道缓冲区数量。

这相当于最大数量的在途前向传递,因为我们需要记住前向传递的激活以便运行反向传播。对于同步1F1B,这相当于此阶段与最后阶段之间的索引差异。

class deepspeed.runtime.pipe.schedule.DataParallelSchedule(micro_batches, stages, stage_id)[来源]

一个使用传统数据并行和梯度累积进行训练的示例计划。

num_pipe_buffers()[来源]

只需要一个管道缓冲区。

class deepspeed.runtime.pipe.schedule.PipeInstruction(**kwargs)[来源]

所有由管道引擎执行的指令的基类。

所有关键字参数都存储为类似于namedtuple的成员。这些参数在执行期间对PipeEngine是可访问的。

Parameters

kwargs (可选) – 作为成员存储的关键字参数

class deepspeed.runtime.pipe.schedule.OptimizerStep(**kwargs)[来源]

使用优化器执行一步并清零梯度。

注意

应在ReduceGradsReduceTiedGrads之后发布。

注意

可以作为数据并行等级之间的同步点。

class deepspeed.runtime.pipe.schedule.ReduceGrads(**kwargs)[来源]

在阶段内减少数据并行进程之间计算出的梯度。

class deepspeed.runtime.pipe.schedule.ReduceTiedGrads(**kwargs)[来源]

减少管道并行组内绑定模块的计算梯度。

警告

在此同步点中包含的阶段在模型被分配到流水线阶段之前是未知的。在最坏的情况下,它包括所有流水线阶段。应仔细安排此指令以避免死锁。

class deepspeed.runtime.pipe.schedule.BufferOpInstruction(buffer_id, **kwargs)[来源]

一个对管道缓冲区进行操作的管道指令。

Parameters

buffer_id (int) – 要修改的管道缓冲区的索引。

class deepspeed.runtime.pipe.schedule.LoadMicroBatch(buffer_id, **kwargs)[来源]

将微批次加载到缓冲区中。

大致上:

buffers['inputs'][buffer_id] = next(data_iter)
class deepspeed.runtime.pipe.schedule.ForwardPass(buffer_id, **kwargs)[来源]

计算前向传播。

大致上:

buffers['outputs'][buffer_id] = forward(buffers['inputs'][buffer_id])
class deepspeed.runtime.pipe.schedule.BackwardPass(buffer_id, **kwargs)[来源]

计算反向传播并累积梯度。

大致上:

outputs = buffers['outputs'][buffer_id]
gradients = buffers['gradients'][buffer_id]
torch.autograd.backward(tensors=outputs,
                        grad_tensors=gradients)
class deepspeed.runtime.pipe.schedule.SendActivation(buffer_id, **kwargs)[来源]

将激活发送到管道中的下一个阶段。

大致上:

send(buffers['outputs'][buffer_id])

注意

通信是阻塞的,必须与下一个管道阶段的RecvActivation配对,以避免死锁。

class deepspeed.runtime.pipe.schedule.RecvActivation(buffer_id, **kwargs)[来源]

从管道的前一阶段接收激活。

大致上:

buffers['inputs'][buffer_id] = recv()

注意

通信是阻塞的,必须与前一管道阶段的SendActivation配对,以避免死锁。

class deepspeed.runtime.pipe.schedule.SendGrad(buffer_id, **kwargs)[来源]

将计算出的梯度发送到前一个管道阶段。 关于接收到的激活

注意

只有接收到requires_grad==True的张量才会产生梯度。 在接收阶段,缺失的梯度将被替换为None

注意

通信是阻塞的,必须与前一管道阶段的RecvGrad配对,以避免死锁。

class deepspeed.runtime.pipe.schedule.RecvGrad(buffer_id, **kwargs)[来源]

接收下一个管道阶段计算出的梯度。

注意

只有requires_grad==True的激活才会产生梯度。 缺失的梯度将被None替换。

注意

通信是阻塞的,必须与下一个管道阶段的SendGrad配对,以避免死锁。