speechbrain.dataio.sampler 模块

PyTorch 兼容的采样器。

这些决定了数据集的迭代顺序。

Authors:
  • 阿库·柔赫 2020

  • 萨穆埃莱·科内尔 2020

  • 拉尔夫·莱博尔德 2020

  • 阿尔乔姆·普洛日尼科夫 2021

  • 安德烈亚斯·诺奇 2021, 2023

  • 阿德尔·穆门 2023

摘要

类:

BalancingDataSampler

一种数据采样器,从数据集中获取单个键,并确保按该键进行大致相等的分布

ConcatDatasetBatchSampler

此采样器设计用于与标准的Pytorch ConcatDataset一起使用。

DistributedSamplerWrapper

此包装器允许正确使用任何采样器(例如批量)与分布式数据并行(DDP)。

DynamicBatchSampler

这个BatchSampler通过将样本按长度分组来将它们批量处理。

ReproducibleRandomSampler

RandomSampler的修改版本,总是返回相同的值。

ReproducibleWeightedRandomSampler

WeightedRandomSampler的可重复修改版本。

参考

class speechbrain.dataio.sampler.ReproducibleRandomSampler(data_source, seed=563375142, epoch=0, **kwargs)[source]

基础:RandomSampler

对RandomSampler的修改,始终返回相同的值。

也可以看看 torch.utils.data.RandomSampler。这具有大致相同的行为和参数,除了增加了 'seed' 和 'epoch' 并且不支持 'generator'。

注意

在每个epoch之前调用set_epoch。否则,采样器将在每个epoch生成相同的索引序列。

Parameters:
  • data_source (Dataset) – 用于采样索引的数据源。

  • seed (int) – 用于随机数生成器的基本种子。建议使用一个0和1位混合良好的值。

  • epoch (int) – 开始的时期。

  • **kwargs (dict) – 传递给父类的参数。

Example

>>> import torch
>>> from speechbrain.utils.checkpoints import Checkpointer
>>> from speechbrain.dataio.dataloader import SaveableDataLoader
>>> # An example "dataset"
>>> dataset = torch.arange(10).unsqueeze(1)
>>> # Create the random sampler:
>>> sampler = ReproducibleRandomSampler(dataset)
>>> dataloader = SaveableDataLoader(dataset, sampler = sampler,
...     num_workers = 3)
>>> # Setup the checkpointer.
>>> # Note that the sampler doesn't need to be saved itself.
>>> tmpdir = getfixture('tmpdir')
>>> checkpointer = Checkpointer(tmpdir, {"dataloader": dataloader})
>>> # Iterate:
>>> subset = []
>>> for i, data_point in enumerate(dataloader):
...     # Say you save a checkpoint on the fourth batch:
...     if i == 3:
...         _ = checkpointer.save_checkpoint(end_of_epoch = False)
...     # So let's save the numbers you would get if you continue
...     if i >= 4:
...         subset.append(data_point.item())
>>> # What if instead you had to restart the experiment?
>>> new_sampler = ReproducibleRandomSampler(dataset)
>>> new_dataloader = SaveableDataLoader(dataset, sampler = new_sampler,
...        num_workers = 3)
>>> new_checkpointer = Checkpointer(tmpdir, {"dataloader": new_dataloader})
>>> _ = new_checkpointer.recover_if_possible()
>>> # You'll get the same random order again:
>>> new_subset = [data_point.item() for data_point in new_dataloader]
>>> assert subset == new_subset
set_epoch(epoch)[source]

你也可以直接访问 self.epoch,但我们维护这个接口以镜像 torch.utils.data.distributed.DistributedSampler

class speechbrain.dataio.sampler.ReproducibleWeightedRandomSampler(weights, num_samples, replacement, seed=129491412, epoch=0, **kwargs)[source]

基础:WeightedRandomSampler

对WeightedRandomSampler的可重复修改。

也可以看看 torch.utils.data.WeightedRandomSampler。它具有相同的行为和参数,除了增加了‘seed’和‘epoch’并且不支持‘generator’。

注意

在每个epoch之前调用set_epoch。否则,采样器将在每个epoch生成相同的索引序列。

Parameters:
  • weights (序列float) – 每个索引的权重。不需要总和为一。

  • num_samples (int) – 要抽取的样本数量

  • replacement (bool) – 是否进行替换抽样(在num_samples的一个周期内)。

  • seed (int) – 用于随机数生成器的基本种子。建议使用一个具有良好0和1位混合的值。

  • epoch (int) – 开始的时期。

  • **kwargs (dict) – 传递给父类的参数。

Example

>>> a = ReproducibleWeightedRandomSampler([0.1, 0.9, 0.4, 0.7, 3.0, 0.6], 5, replacement=True)
>>> b = ReproducibleWeightedRandomSampler([0.1, 0.9, 0.4, 0.7, 3.0, 0.6], 5, replacement=True)
>>> list(a)
[3, 1, 4, 4, 4]
>>> list(b)
[3, 1, 4, 4, 4]
>>> a.set_epoch(1)
>>> list(a)
[4, 5, 4, 4, 3]
>>> b.set_epoch(1)
>>> list(b)
[4, 5, 4, 4, 3]
set_epoch(epoch)[source]

你也可以直接访问 self.epoch,但我们维护这个接口以镜像 torch.utils.data.distributed.DistributedSampler

class speechbrain.dataio.sampler.ConcatDatasetBatchSampler(samplers, batch_sizes: (<class 'tuple'>, <class 'list'>), epoch=0)[source]

基础:Sampler

这个采样器是为了与标准的Pytorch ConcatDataset一起工作而构建的。

它用于从不同的连接数据集中检索元素,将它们放在同一批次中,比例由batch_sizes指定,例如8, 16表示每个批次将有24个元素,其中前8个属于ConcatDataset对象中的第一个数据集,最后16个属于第二个数据集。支持两个以上的数据集,在这种情况下,您需要提供3个批次大小。

注意

批次从数据集中抽取,直到长度最小的数据集被耗尽。 因此,您的训练周期中的示例数量由长度最小的数据集决定。

Parameters:
  • 采样器 (列表元组) – 一个包含pytorch采样器的列表或元组

  • batch_sizes (list) – 批量大小。

  • epoch (int) – 开始的时期。

Example

>>> import torch
>>> from speechbrain.dataio.sampler import ConcatDatasetBatchSampler, ReproducibleRandomSampler
>>> from speechbrain.dataio.sampler import ReproducibleRandomSampler
>>> from speechbrain.dataio.dataloader import SaveableDataLoader
>>> # example "datasets"
>>> dataset1 = torch.arange(0, 10).unsqueeze(1)
>>> dataset2 = torch.arange(20, 40).unsqueeze(1)
>>> tot_dataset = torch.utils.data.ConcatDataset([dataset1, dataset2])
>>> sampler1 = ReproducibleRandomSampler(dataset1)
>>> sampler2 = ReproducibleRandomSampler(dataset2)
>>> tot_sampler = ConcatDatasetBatchSampler([sampler1, sampler2], [2, 4])
>>> dataloader = SaveableDataLoader(tot_dataset, batch_sampler = tot_sampler,
...     num_workers = 3)
>>> for data_point in dataloader:
...      assert len(data_point) == 6
...      for i in range(2):
...         assert data_point[i] in [x for x in range(0, 10)]
...      for i in range(2, 4):
...         assert data_point[i] in [x for x in range(10, 40)]
set_epoch(epoch)[source]

你也可以直接访问 self.epoch,但我们维护这个接口以镜像 torch.utils.data.distributed.DistributedSampler

class speechbrain.dataio.sampler.DynamicBatchSampler(dataset, max_batch_length: int, num_buckets: int = None, length_func=<function DynamicBatchSampler.<lambda>>, shuffle: bool = True, batch_ordering: str = 'random', max_batch_ex: int = None, bucket_boundaries: ~typing.List[int] = [], lengths_list: ~typing.List[int] = None, seed: int = 42, epoch: int = 0, drop_last: bool = False, verbose: bool = False)[source]

基础:Sampler

这个BatchSampler通过将示例按长度分组来将它们批量处理在一起。

批次中的每个示例都具有大致相同的长度,因此填充被最小化。 这使得在示例长度可能显著变化的数据集(例如Librispeech)上训练更快。 灵感来源:https://www.tensorflow.org/api_docs/python/tf/data/experimental/bucket_by_sequence_length

动态批处理是通过指定一个max_batch_length来执行的,这是批次中示例长度总和的上限: 例如,如果ex1的长度为4,ex2的长度为5,并且max_batch_length设置为6, ex1和ex2将分别放置在两个不同的批次中。

每个示例的长度可以通过两种方式获得。 如果输入数据集是DynamicItemDataset,可以通过指定length_func来获得。默认情况下假设注释中存在“duration”条目。 每个示例的长度也可以在实例化时通过指定包含每个示例长度的列表并将其传递给lengths_list来传递给此类。

示例通过定义一组可能的离散区间(桶)进行分组。长度落在这些区间内的示例可以一起批处理。

可以通过使用参数 num_buckets 来指定桶的数量。 通常这个参数的值有一个最佳范围。

如果 num_buckets == 1,所有示例可以一起批处理。您将获得最大的随机化,但由于长示例和短示例可以一起批处理,训练速度会变慢,因为会有大量的填充值。 随着桶数量的增加,只有长度相似的示例可以分组在一起。 这是在速度和随机化之间进行权衡。 TLDR:低数量 -> 更好的随机化,高数量 -> 更快的训练。 请注意:如果设置过高,训练速度会降低。如果 num_buckets -> 数据集中的示例数量,批处理大小将很小,影响训练速度和可能的性能。

也可以通过将列表传递给bucket_boundaries参数来指定桶,而不是指定left_bucket_length和bucket_length_multiplier。

Example

>>> import torch
>>> import speechbrain as sb
>>> from speechbrain.dataio.sampler import DynamicBatchSampler
>>> from speechbrain.dataio.dataset import DynamicItemDataset
>>> from speechbrain.dataio.dataloader import SaveableDataLoader
>>> from speechbrain.dataio.batch import PaddedBatch
>>> import numpy as np
>>> item_lengths = sorted([np.random.randint(10, 100) for x in range(20)])
>>> dataset = {"ex_{}".format(x) : {"wav" :torch.randn(x)} for x in item_lengths}
>>> dataset = DynamicItemDataset(dataset)
>>> dataset.set_output_keys(["wav"])
>>> length_func = lambda x : len(x) # trivial in this example
>>> bsampler = DynamicBatchSampler(dataset, 20, 4, length_func, shuffle=False, batch_ordering='descending')
>>> dataloader = SaveableDataLoader(dataset, batch_sampler=bsampler, collate_fn=PaddedBatch)
>>> for i, b in enumerate(dataloader):
...     data, length = b["wav"]
>>> assert data.shape[-1] == max(item_lengths)
Parameters:
  • dataset (torch.utils.data.Dataset) – 从中采样元素的Pytorch数据集。

  • max_batch_length (int) – 批次中示例长度总和的上限。 应根据您的GPU内存进行选择。

  • num_buckets (int) – 用于将示例分组在一起的离散桶的数量。 如果 num_buckets == 1,所有示例可以一起批处理。随着桶数量的增加,只有长度相似的示例才能被分组在一起。这是在速度和随机化之间进行权衡。 低数量 -> 更好的随机化,高数量 -> 更快的训练。 然而,如果设置得太高,训练速度会降低。如果 num_buckets -> 数据集中的示例数量,批处理大小将很小,影响训练速度和可能的性能。 注意:您必须手动指定 bucket_boundaries 或桶的数量。

  • length_func (callable) – 用于从数据集中获取每个示例长度的函数。 此参数仅在数据集是Speechbrain DynamicItemDataset对象时使用。 可以是任何内容:例如,lambda x: x[“duration”]*16000 返回样本数 如果注释中的duration键以秒为单位,并且文件的采样频率为16kHz。

  • shuffle (bool) – 是否在每个epoch之间打乱示例。

  • batch_ordering (string) – 如果为 random,则批次随机排列;否则按长度 ascendingdescending 排序。

  • max_batch_ex (int) – 如果设置,它将限制一个批次中可以包含的最大示例数量,覆盖max_batch_length 在示例数量将超过此处指定的值的情况下。 例如,如果你有很多短示例,这些示例的批次大小会太大,你可以使用这个参数 来限制这些短示例的批次大小。

  • bucket_boundaries (list) – 通过手动指定桶的右边界来覆盖 bucket_length_multiplier 和 left_bucket_length。

  • lengths_list (list) – 通过传递一个包含数据集中每个样本长度的列表来覆盖 length_func。当数据集是一个普通的 Pytorch Dataset 对象而不是 DynamicItemDataset 对象时,必须设置此参数,因为 length_func 不能在 Pytorch Datasets 上使用。

  • seed (int) – 随机种子。

  • epoch (int) – 开始的时期。

  • drop_last (bool) – 如果 True,采样器将丢弃未分组的最后几个示例。

  • verbose (bool) – 如果 True,在第一个 epoch 时也会记录每个批次的统计信息。

get_durations(batch)[source]

获取批次中元素的持续时间。

set_epoch(epoch)[source]

你也可以直接访问 self.epoch,但我们维护这个接口以镜像 torch.utils.data.distributed.DistributedSampler

class speechbrain.dataio.sampler.DistributedSamplerWrapper(sampler, *args, **kwargs)[source]

基础:DistributedSampler

这个包装器允许正确地将任何采样器(例如批量)与分布式数据并行(DDP)一起使用。

盲目地将采样器传递给每个DDP进程将导致每个进程访问数据集中的所有数据,而不是每个进程独有的子集。这个包装器防止了这种情况,并允许每个进程仅使用原始数据的一个子集。

注意

当使用DDP训练时,这会自动应用于Brain类中的任何采样器。

set_epoch(epoch)[source]

将 set_epoch() 传递给 DistributedSampler 和包装器

class speechbrain.dataio.sampler.BalancingDataSampler(dataset, key, num_samples=None, replacement=True, seed=563375142, epoch=0, **kwargs)[source]

基础类: ReproducibleWeightedRandomSampler

一个数据采样器,从数据集中获取单个键,并确保按该键进行大致均匀的分布

Parameters:
  • dataset (DynamicItemDataset) – 从中抽取样本的数据集

  • key (str) – 从中抽取样本的键

  • num_samples (int) – 要抽取的样本数量

  • replacement (bool) – 是否进行替换抽样(在num_samples的一个周期内)。

  • seed (int) – 用于随机数生成器的基本种子。建议使用一个具有良好0和1位混合的值。

  • epoch (int) – 开始的时期。

  • **kwargs (dict) – 传递给父类的参数。

Example

>>> from speechbrain.dataio.sampler import BalancingDataSampler
>>> from speechbrain.dataio.dataset import DynamicItemDataset
>>> sample_data = {
...   1: {"category": "A",
...       "text": "This is a test"},
...   2: {"category": "A",
...       "text": "This is a second test"},
...   3: {"category": "B",
...       "text": "This is a third test"}
...  }
>>> dataset = DynamicItemDataset(data=sample_data)
>>> sampler = BalancingDataSampler(
...     dataset=dataset,
...     key="category",
...     num_samples=10
... )
>>> sampler.weights
tensor([0.5000, 0.5000, 1.0000], dtype=torch.float64)
>>> it = iter(sampler)
>>> [next(it) for _ in range(10)]
[2, 2, 1, 2, 2, 0, 1, 1, 1, 2]