分布式项目采样器
- class dgl.graphbolt.DistributedItemSampler(item_set: ~dgl.graphbolt.itemset.ItemSet | ~dgl.graphbolt.itemset.ItemSetDict, batch_size: int, minibatcher: ~typing.Callable | None = <function minibatcher_default>, drop_last: bool | None = False, shuffle: bool | None = False, drop_uneven_inputs: bool | None = False, buffer_size: int | None = -1)[source]
基础类:
ItemSampler
一个采样器,用于迭代输入项并分布式创建子集。
此采样器从给定的数据集中创建一个分布式子集,可用于与PyTorch的分布式数据并行(DDP)一起进行训练。这些项目可以是节点ID、带或不带标签的节点对、带有负源/目标的节点对、DGLGraphs或异构对应物。原始项目集被分割,使得每个副本(进程)接收一个独占的子集。
注意:项目将首先分配到每个副本上,然后进行洗牌(如果需要)并分批处理。因此,每个副本将始终获得相同的一组项目。
注意:这个类 DistributedItemSampler 故意没有用 torchdata.datapipes.functional_datapipe 装饰。这表明它不支持类似函数的调用。但是,来自 torchdata 的任何可迭代数据管道都可以进一步附加。
- Parameters:
item_set (Union[ItemSet, ItemSetDict]) – Data to be sampled.
batch_size (int) – The size of each batch.
minibatcher (Optional[Callable]) – A callable that takes in a list of items and returns a MiniBatch.
drop_last (bool) – Option to drop the last batch if it’s not full.
shuffle (bool) – Option to shuffle before sample.
num_replicas (int) – 在分布式数据并行(DDP)训练期间将创建的模型副本数量。它应该与实际世界的大小相同,否则可能会导致错误。默认情况下,它是从当前分布式组中获取的。
drop_uneven_inputs (bool) – 确保每个副本的批次数量相同的选项。如果某些副本比其他副本有更多的批次,这些副本的多余批次将被丢弃。如果 drop_last 参数也设置为 True,则在丢弃多余批次之前,最后一个批次将被丢弃。 注意:在使用分布式数据并行(DDP)训练时,如果某个副本的输入较少,程序可能会挂起或出错。建议使用 PyTorch 提供的 Join Context Manager 来解决此问题。请参考 https://pytorch.org/tutorials/advanced/generic_join.html。然而,如果 Join Context Manager 由于任何原因无法使用,可以使用此选项。
buffer_size (int) – 用于存储从
ItemSet
或ItemSetDict
切片的项的缓冲区大小。默认情况下,它设置为-1,这意味着缓冲区大小将设置为项集中的总项数。如果项集太大,建议设置较小的缓冲区大小以避免内存不足错误。由于每个缓冲区内的项会被打乱,较小的缓冲区大小可能会导致较少的随机性,而这种较少的随机性可能会进一步影响训练性能,如收敛速度和准确性。因此,如果可能的话,建议设置较大的缓冲区大小。
示例
0. 准备:DistributedItemSampler 需要多进程环境才能工作。在执行以下示例之前,您需要生成子进程并初始化处理组。由于随机性,输出并不总是与下面列出的相同。
>>> import torch >>> from dgl import graphbolt as gb >>> item_set = gb.ItemSet(torch.arange(15)) >>> num_replicas = 4 >>> batch_size = 2 >>> mp.spawn(...)
shuffle = False, drop_last = False, drop_uneven_inputs = False.
>>> item_sampler = gb.DistributedItemSampler( >>> item_set, batch_size=2, shuffle=False, drop_last=False, >>> drop_uneven_inputs=False >>> ) >>> data_loader = gb.DataLoader(item_sampler) >>> print(f"Replica#{proc_id}: {list(data_loader)}) Replica#0: [tensor([0, 1]), tensor([2, 3])] Replica#1: [tensor([4, 5]), tensor([6, 7])] Replica#2: [tensor([8, 9]), tensor([10, 11])] Replica#3: [tensor([12, 13]), tensor([14])]
shuffle = False, drop_last = True, drop_uneven_inputs = False.
>>> item_sampler = gb.DistributedItemSampler( >>> item_set, batch_size=2, shuffle=False, drop_last=True, >>> drop_uneven_inputs=False >>> ) >>> data_loader = gb.DataLoader(item_sampler) >>> print(f"Replica#{proc_id}: {list(data_loader)}) Replica#0: [tensor([0, 1]), tensor([2, 3])] Replica#1: [tensor([4, 5]), tensor([6, 7])] Replica#2: [tensor([8, 9]), tensor([10, 11])] Replica#3: [tensor([12, 13])]
shuffle = False, drop_last = False, drop_uneven_inputs = True.
>>> item_sampler = gb.DistributedItemSampler( >>> item_set, batch_size=2, shuffle=False, drop_last=False, >>> drop_uneven_inputs=True >>> ) >>> data_loader = gb.DataLoader(item_sampler) >>> print(f"Replica#{proc_id}: {list(data_loader)}) Replica#0: [tensor([0, 1]), tensor([2, 3])] Replica#1: [tensor([4, 5]), tensor([6, 7])] Replica#2: [tensor([8, 9]), tensor([10, 11])] Replica#3: [tensor([12, 13]), tensor([14])]
shuffle = False, drop_last = True, drop_uneven_inputs = True.
>>> item_sampler = gb.DistributedItemSampler( >>> item_set, batch_size=2, shuffle=False, drop_last=True, >>> drop_uneven_inputs=True >>> ) >>> data_loader = gb.DataLoader(item_sampler) >>> print(f"Replica#{proc_id}: {list(data_loader)}) Replica#0: [tensor([0, 1])] Replica#1: [tensor([4, 5])] Replica#2: [tensor([8, 9])] Replica#3: [tensor([12, 13])]
shuffle = True, drop_last = True, drop_uneven_inputs = False.
>>> item_sampler = gb.DistributedItemSampler( >>> item_set, batch_size=2, shuffle=True, drop_last=True, >>> drop_uneven_inputs=False >>> ) >>> data_loader = gb.DataLoader(item_sampler) >>> print(f"Replica#{proc_id}: {list(data_loader)}) (One possible output:) Replica#0: [tensor([3, 2]), tensor([0, 1])] Replica#1: [tensor([6, 5]), tensor([7, 4])] Replica#2: [tensor([8, 10])] Replica#3: [tensor([14, 12])]
shuffle = True, drop_last = True, drop_uneven_inputs = True.
>>> item_sampler = gb.DistributedItemSampler( >>> item_set, batch_size=2, shuffle=True, drop_last=True, >>> drop_uneven_inputs=True >>> ) >>> data_loader = gb.DataLoader(item_sampler) >>> print(f"Replica#{proc_id}: {list(data_loader)}) (One possible output:) Replica#0: [tensor([1, 3])] Replica#1: [tensor([7, 5])] Replica#2: [tensor([11, 9])] Replica#3: [tensor([13, 14])]