Open In Colab 要在GitHub上执行或查看/下载此笔记本

大数据集和共享文件系统的数据加载

你是否有一个存储在共享文件系统中的大型数据集,并且你想用它来训练神经网络?这个数据集是否大到甚至无法放入计算节点的本地SSD中?如果是这样,本教程将带你完成从共享文件系统读取大型文件所需的所有步骤。

在许多计算集群中,主要的数据存储是网络文件系统(NFS),例如Lustre NFS可以同时为许多用户提供服务,并从单个文件中提供高数据吞吐量。然而,打开或列出许多不同的文件速度很慢——这样做可能会减慢整个系统的速度,而不仅仅是违规用户。语音数据集通常由许多小录音组成。反复读取每个文件正是那种会减慢NFS速度的数据IO。

一种解决方案是将数据集复制到计算节点的本地SSD中。这可以通过将数据集压缩成单个文件(例如dataset.tar.gz),将其复制到本地节点,最后解压缩(解压)文件来相对高效地完成。从本地SSD读取文件非常高效,并且不会影响共享文件系统的性能。 标准的SpeechBrain数据IO在这种情况下效果很好,请参阅本教程。 然而,可能会有一些巨大的数据集超过本地SSD的大小。

一个可能的解决方案是将数据保存在共享文件系统中,并将小记录捆绑成更大的存档,这些存档通常被称为分片。从分片加载数据可以避免打开太多文件,因此速度较快。

从分片读取数据时,无法再对数据集进行随机访问。数据是从一个中顺序读取的。这需要在准备实验时多加注意。

上述分片IO的情况在学术计算集群设置中是典型的。流数据IO也可以在更大规模上使用,配备专用的数据服务器。

在本教程中,我们将使用WebDataset库。WebDataset的替代方案及其使用案例由WebDataset开发者在这个PyTorch提案中进行了阐述。

什么是WebDataset?

WebDataset 是一个分片(流式)数据输入输出库,与 PyTorch 配合良好。WebDataset 使用标准的 TAR 归档作为分片格式,遵循一个简单的约定:所有具有相同基本名称的连续文件属于同一个示例。因此,列出 data-archive/shard-0000.tar 的内容可能如下所示:

> tar -t data-archives/shard-0000.tar
spk1-utt1.wav
spk1-utt1.txt
spk1-utt1.json
spk1-utt2.wav
spk1-utt2.txt
spk1-utt2.json
spk2-utt1wav
spk2-utt1.txt
spk2-utt1.json
...

在Python方面,数据集接口是一个IterableDataset,它有一组可以链接起来构建数据管道的方法,例如:

import webdataset as wds  # Note the typical import shorthand
dataset = (
      wds.WebDataset("data-archives/shard-00{00...24}.tar")  # 25 shards
      .decode()  # Automagically decode files
      .shuffle(size=1000)  # Shuffle on-the-fly in a buffer
      .batch(batchsize=10)  # Create batches
)

请注意,WebDataset(至少在撰写本文时)是一个快速发展的库。它也被考虑纳入PyTorch核心。再次,阅读更多这里这里

安装依赖项

%%capture
# Installing SpeechBrain via pip
BRANCH = 'develop'
!python -m pip install git+https://github.com/speechbrain/speechbrain.git@$BRANCH
%%capture
!pip install "webdataset<0.2"
import speechbrain as sb
import webdataset as wds
import torch
import glob
import pathlib
import random

创建TAR分片

WebDataset中的数据准备过程是遍历数据集中的每个示例并将它们分割成TAR分片。TAR文件是一种标准格式,因此您可以使用任何标准工具创建它们。WebDataset提供了一些辅助工具,可以使这个过程变得稍微容易一些。

  • Tarp,一个基于Go的工具,可以将TAR流分割成碎片,并执行一些其他流处理任务。参见GitHub页面。这是一个独立的工具,需要单独安装,但从理论上讲,Go可能比Python更快。

  • wds.ShardWriter,一个可以将WebDataset风格的dict写入TAR存档的Python类,将其分割成给定大小的多个分片。这是我们在这里将采用的方法。

下载一些数据

在本教程中,我们将使用Mini Librispeech的开发集(但我们会像处理任何普通训练数据一样处理它)。

%%capture
!wget https://www.openslr.org/resources/31/dev-clean-2.tar.gz
!tar -xvzf dev-clean-2.tar.gz
!rm dev-clean-2.tar.gz

遍历数据

这一步当然会因数据集而异。在Mini Librispeech中,数据是按说话者和文档组织的。我们将首先读取所有转录内容,然后打乱它们,以便连续的示例不来自同一个说话者和文档。

DATAROOT = pathlib.Path("LibriSpeech/dev-clean-2")
SHARDSDIR = pathlib.Path("DATA-SHARDS")
SHARDSDIR.mkdir(exist_ok=True, parents=True)

# 1. Gather texts
# Note that here uttid encodes speaker and document IDs, so we don't need to
# keep track of them separately
texts = {}
for textf in DATAROOT.glob("*/*/*.trans.txt"):
    with open(textf) as fi:
        for line in fi:
            uttid, text = line.split(" ", maxsplit=1)
            texts[uttid] = text
            print(uttid, text)

# 2. Shuffle uttids
uttids = list(texts.keys())
random.shuffle(uttids)
print(uttids)
# 3. Create TARs
# In this example, we are only storing 100 examples / shard, because the full
# development set could probably fit in a normal shard. In practical setups
# use bigger values.
# maxcount sets the max number of examples, and maxsize
# sets the maximum size in bytes.

# 3A. Iterate over the shuffled uttids
# 3B. For each uttid, create an example dict
#   The example dict is written into a TAR stream. The special __key__
#   entry becomes the basename for this example's files, and the other
#   entries in the dict become files with different extensions.
#   E.G. with uttid "3536-23268-0007" this will write the files:
#     3536-23268-0007.audio.pth, 3536-23268-0007.text
#   There are default handlers for many extensions
#     See https://github.com/webdataset/webdataset/blob/6ee2279795b3f667bb7a5868af596990cc6efee3/webdataset/writer.py#L97

with wds.ShardWriter(f"{SHARDSDIR}/shard-%06d.tar", maxcount = 100) as writer:
    for uttid in uttids:
        spk, doc, _ = uttid.split("-")
        audio_fpath = (DATAROOT / spk / doc / uttid).with_suffix(".flac")
        audio_tensor = sb.dataio.dataio.read_audio(str(audio_fpath))
        example = {
            "__key__": uttid,
            "audio.pth": audio_tensor,
            "text": texts[uttid]
        }
        writer.write(example)
! cd DATA-SHARDS/
# Now we can load these shards.
# This uses the SpeechBrain batch class, but batching itself is done by
# WebDataset
dataset = (
      wds.WebDataset(str(SHARDSDIR)+"/shard-0000{00..10}.tar")
      .decode()
      .shuffle(100)
      .batched(batchsize=10,
               collation_fn=sb.dataio.batch.PaddedBatch)
)
batch = next(iter(dataset))
print(batch.text)
print(batch["audio.pth"])  # Because of the audio.pth name, attribute access doesn't work
print("How much of batch is padding [%]:",
      sb.dataio.iterators.padding_ratio(batch["audio.pth"].lengths).item()*100)

使用SpeechBrain的WebDataset

SpeechBrain 兼容任何 PyTorch 数据加载,因此 WebDataset 可以在没有任何扩展的情况下使用(正如我们迄今为止所做的那样)。然而,仍然存在三个问题:

  1. 分片中的数据通常没有排序(甚至可能被故意打乱)。连续的语句长度会非常不同,并且需要大量的填充。

  2. SaveableDataLoader中的epoch内检查点不适用于IterableDatasets。

  3. 使用分布式数据并行(Distributed Data Parallel)很难实现精确的epoch。(这个问题不仅限于WebDataset或SpeechBrain。)

这些问题通过以下策略和扩展得到解决:

  1. SpeechBrain 实现了一个即时动态批处理和分桶迭代器。这与 webdataset.WebDataset 一起工作。

  • 分桶将相似长度的语句放在同一批次中,减少了填充的数量。

  • 动态批处理在实现分桶的同时自然实现,旨在产生具有相似元素总数的批次。包含短话语的批次具有较大的批次大小,而包含长话语的批次具有较小的批次大小。

  • 流数据加载需要实时操作。

  1. 不要关心确切的周期数。相反,测量更新的次数并设置一个名义上的周期长度(例如,一个周期=2500次更新)。

  2. 不关心确切的重新启动:当实验重新启动时,数据加载不会从上次停止的示例继续,而是从随机分配的分片重新开始。

训练数据加载管道中的一些更改

  • 首先,在加载管道中使用.rename来获得更合理的命名批处理元素。这也将解决(上述)audio.pth无法通过典型的属性风格访问的问题。

  • 然后添加一个.repeat,以便使用无限的数据流。

  • 最后,主要的变化是使用 sb.dataio.iterators.dynamic_bucketed_batch 作为批处理方法

    • 通用迭代器可以与.then方法一起使用

    • 参见文档了解参数。

    • 由于这也涉及到一个洗牌操作,因此不再使用WebDataset的洗牌功能。

dataset = (
      wds.WebDataset(str(SHARDSDIR)+"/shard-0000{00..10}.tar")
      .decode()
      .rename(id="__key__", signal="audio.pth", text="text")  # Mention all, even text.
      .repeat()
      .then(sb.dataio.iterators.dynamic_bucketed_batch,
            len_key = "signal",  # Which batch element's length to consider
            sampler_kwargs={
                "target_batch_numel":16000*45.,  # Add examples till they total 45 seconds
                "max_batch_numel":   16000*60.   # ... but so that they don't go over 60 seconds
            }
      )
)

batch = next(iter(dataset))
print("Batch size:", len(batch))
print("How much of batch is padding [%]:",
      sb.dataio.iterators.padding_ratio(batch.signal.lengths).item()*100)

更复杂的数据加载管道

  • 你可以使用.map()来实现任意处理。

text_mapping = {"<PADDING>": 0}
index = 1
for example in wds.WebDataset(str(SHARDSDIR)+"/shard-0000{00..10}.tar").decode():
    for word in example["text"].split():
        if word not in text_mapping:
            text_mapping[word] = index
            index += 1

def text_to_index(sample):
    """Adds text_vec entry, a LongTensor for text"""
    sample["text_vec"] = torch.LongTensor(
        [text_mapping[word] for word in sample["text"].split()]
    )
    return sample
dataset = (
      wds.WebDataset(str(SHARDSDIR)+"/shard-0000{00..10}.tar")
      .decode()
      .rename(id="__key__", signal="audio.pth", text="text")
      .map(text_to_index)
      .repeat()
      .then(sb.dataio.iterators.dynamic_bucketed_batch,
            len_key = "signal",  # Which batch element's length to consider
            sampler_kwargs={
                "target_batch_numel":16000*45.,  # Add examples till they total 45 seconds
                "max_batch_numel":   16000*60.   # ... but so that they don't go over 60 seconds
            }
      )
)
batch = next(iter(dataset))
print(batch.text[0])
print(batch.text_vec.data[0])

如何处理DataLoader

  • 由于我们有一个返回批次(而不是单个示例)的数据集,DataLoader 应该设置 batch_size=None

    • Brain 类(以及底层的 sb.dataio.dataloader.make_dataloader)如果您的数据集来自 WebDataset,将自动设置此选项。

  • 为了实现名义上的epochs,SpeechBrain有sb.dataio.dataloader.LoopedLoader

    • 如果你在train_loader_kwargs中指定了looped_nominal_epoch(在调用.fit()时),Brain类(以及底层的sb.dataio.dataloader.make_dataloader)将会使用这个参数。

    • Brain 类也会自动将其添加到检查点器中,以便它保存在检查点中(并且它也适用于epoch内的检查点)。

dataloader = sb.dataio.dataloader.make_dataloader(dataset, looped_nominal_epoch=5)
for epoch in range(1,6):
    print("Epoch", epoch)
    for ind, batch in enumerate(dataloader, start=1):
        print("\tBatch", ind, ": batch size", len(batch))

引用SpeechBrain

如果您在研究中或业务中使用SpeechBrain,请使用以下BibTeX条目引用它:

@misc{speechbrainV1,
  title={Open-Source Conversational AI with {SpeechBrain} 1.0},
  author={Mirco Ravanelli and Titouan Parcollet and Adel Moumen and Sylvain de Langen and Cem Subakan and Peter Plantinga and Yingzhi Wang and Pooneh Mousavi and Luca Della Libera and Artem Ploujnikov and Francesco Paissan and Davide Borra and Salah Zaiem and Zeyu Zhao and Shucong Zhang and Georgios Karakasidis and Sung-Lin Yeh and Pierre Champion and Aku Rouhe and Rudolf Braun and Florian Mai and Juan Zuluaga-Gomez and Seyed Mahed Mousavi and Andreas Nautsch and Xuechen Liu and Sangeet Sagar and Jarod Duret and Salima Mdhaffar and Gaelle Laperriere and Mickael Rouvier and Renato De Mori and Yannick Esteve},
  year={2024},
  eprint={2407.00463},
  archivePrefix={arXiv},
  primaryClass={cs.LG},
  url={https://arxiv.org/abs/2407.00463},
}
@misc{speechbrain,
  title={{SpeechBrain}: A General-Purpose Speech Toolkit},
  author={Mirco Ravanelli and Titouan Parcollet and Peter Plantinga and Aku Rouhe and Samuele Cornell and Loren Lugosch and Cem Subakan and Nauman Dawalatabad and Abdelwahab Heba and Jianyuan Zhong and Ju-Chieh Chou and Sung-Lin Yeh and Szu-Wei Fu and Chien-Feng Liao and Elena Rastorgueva and François Grondin and William Aris and Hwidong Na and Yan Gao and Renato De Mori and Yoshua Bengio},
  year={2021},
  eprint={2106.04624},
  archivePrefix={arXiv},
  primaryClass={eess.AS},
  note={arXiv:2106.04624}
}