分布式项目采样器

class dgl.graphbolt.DistributedItemSampler(item_set: ~dgl.graphbolt.itemset.ItemSet | ~dgl.graphbolt.itemset.ItemSetDict, batch_size: int, minibatcher: ~typing.Callable | None = <function minibatcher_default>, drop_last: bool | None = False, shuffle: bool | None = False, drop_uneven_inputs: bool | None = False, buffer_size: int | None = -1)[source]

基础类:ItemSampler

一个采样器,用于迭代输入项并分布式创建子集。

此采样器从给定的数据集中创建一个分布式子集,可用于与PyTorch的分布式数据并行(DDP)一起进行训练。这些项目可以是节点ID、带或不带标签的节点对、带有负源/目标的节点对、DGLGraphs或异构对应物。原始项目集被分割,使得每个副本(进程)接收一个独占的子集。

注意:项目将首先分配到每个副本上,然后进行洗牌(如果需要)并分批处理。因此,每个副本将始终获得相同的一组项目。

注意:这个类 DistributedItemSampler 故意没有用 torchdata.datapipes.functional_datapipe 装饰。这表明它不支持类似函数的调用。但是,来自 torchdata 的任何可迭代数据管道都可以进一步附加。

Parameters:
  • item_set (Union[ItemSet, ItemSetDict]) – Data to be sampled.

  • batch_size (int) – The size of each batch.

  • minibatcher (Optional[Callable]) – A callable that takes in a list of items and returns a MiniBatch.

  • drop_last (bool) – Option to drop the last batch if it’s not full.

  • shuffle (bool) – Option to shuffle before sample.

  • num_replicas (int) – 在分布式数据并行(DDP)训练期间将创建的模型副本数量。它应该与实际世界的大小相同,否则可能会导致错误。默认情况下,它是从当前分布式组中获取的。

  • drop_uneven_inputs (bool) – 确保每个副本的批次数量相同的选项。如果某些副本比其他副本有更多的批次,这些副本的多余批次将被丢弃。如果 drop_last 参数也设置为 True,则在丢弃多余批次之前,最后一个批次将被丢弃。 注意:在使用分布式数据并行(DDP)训练时,如果某个副本的输入较少,程序可能会挂起或出错。建议使用 PyTorch 提供的 Join Context Manager 来解决此问题。请参考 https://pytorch.org/tutorials/advanced/generic_join.html。然而,如果 Join Context Manager 由于任何原因无法使用,可以使用此选项。

  • buffer_size (int) – 用于存储从ItemSetItemSetDict切片的项的缓冲区大小。默认情况下,它设置为-1,这意味着缓冲区大小将设置为项集中的总项数。如果项集太大,建议设置较小的缓冲区大小以避免内存不足错误。由于每个缓冲区内的项会被打乱,较小的缓冲区大小可能会导致较少的随机性,而这种较少的随机性可能会进一步影响训练性能,如收敛速度和准确性。因此,如果可能的话,建议设置较大的缓冲区大小。

示例

0. 准备:DistributedItemSampler 需要多进程环境才能工作。在执行以下示例之前,您需要生成子进程并初始化处理组。由于随机性,输出并不总是与下面列出的相同。

>>> import torch
>>> from dgl import graphbolt as gb
>>> item_set = gb.ItemSet(torch.arange(15))
>>> num_replicas = 4
>>> batch_size = 2
>>> mp.spawn(...)
  1. shuffle = False, drop_last = False, drop_uneven_inputs = False.

>>> item_sampler = gb.DistributedItemSampler(
>>>     item_set, batch_size=2, shuffle=False, drop_last=False,
>>>     drop_uneven_inputs=False
>>> )
>>> data_loader = gb.DataLoader(item_sampler)
>>> print(f"Replica#{proc_id}: {list(data_loader)})
Replica#0: [tensor([0, 1]), tensor([2, 3])]
Replica#1: [tensor([4, 5]), tensor([6, 7])]
Replica#2: [tensor([8, 9]), tensor([10, 11])]
Replica#3: [tensor([12, 13]), tensor([14])]
  1. shuffle = False, drop_last = True, drop_uneven_inputs = False.

>>> item_sampler = gb.DistributedItemSampler(
>>>     item_set, batch_size=2, shuffle=False, drop_last=True,
>>>     drop_uneven_inputs=False
>>> )
>>> data_loader = gb.DataLoader(item_sampler)
>>> print(f"Replica#{proc_id}: {list(data_loader)})
Replica#0: [tensor([0, 1]), tensor([2, 3])]
Replica#1: [tensor([4, 5]), tensor([6, 7])]
Replica#2: [tensor([8, 9]), tensor([10, 11])]
Replica#3: [tensor([12, 13])]
  1. shuffle = False, drop_last = False, drop_uneven_inputs = True.

>>> item_sampler = gb.DistributedItemSampler(
>>>     item_set, batch_size=2, shuffle=False, drop_last=False,
>>>     drop_uneven_inputs=True
>>> )
>>> data_loader = gb.DataLoader(item_sampler)
>>> print(f"Replica#{proc_id}: {list(data_loader)})
Replica#0: [tensor([0, 1]), tensor([2, 3])]
Replica#1: [tensor([4, 5]), tensor([6, 7])]
Replica#2: [tensor([8, 9]), tensor([10, 11])]
Replica#3: [tensor([12, 13]), tensor([14])]
  1. shuffle = False, drop_last = True, drop_uneven_inputs = True.

>>> item_sampler = gb.DistributedItemSampler(
>>>     item_set, batch_size=2, shuffle=False, drop_last=True,
>>>     drop_uneven_inputs=True
>>> )
>>> data_loader = gb.DataLoader(item_sampler)
>>> print(f"Replica#{proc_id}: {list(data_loader)})
Replica#0: [tensor([0, 1])]
Replica#1: [tensor([4, 5])]
Replica#2: [tensor([8, 9])]
Replica#3: [tensor([12, 13])]
  1. shuffle = True, drop_last = True, drop_uneven_inputs = False.

>>> item_sampler = gb.DistributedItemSampler(
>>>     item_set, batch_size=2, shuffle=True, drop_last=True,
>>>     drop_uneven_inputs=False
>>> )
>>> data_loader = gb.DataLoader(item_sampler)
>>> print(f"Replica#{proc_id}: {list(data_loader)})
(One possible output:)
Replica#0: [tensor([3, 2]), tensor([0, 1])]
Replica#1: [tensor([6, 5]), tensor([7, 4])]
Replica#2: [tensor([8, 10])]
Replica#3: [tensor([14, 12])]
  1. shuffle = True, drop_last = True, drop_uneven_inputs = True.

>>> item_sampler = gb.DistributedItemSampler(
>>>     item_set, batch_size=2, shuffle=True, drop_last=True,
>>>     drop_uneven_inputs=True
>>> )
>>> data_loader = gb.DataLoader(item_sampler)
>>> print(f"Replica#{proc_id}: {list(data_loader)})
(One possible output:)
Replica#0: [tensor([1, 3])]
Replica#1: [tensor([7, 5])]
Replica#2: [tensor([11, 9])]
Replica#3: [tensor([13, 14])]