使用Lance文本数据集训练LLM

使用Lance文本数据集进行大型语言模型的预训练/微调既简单又高效。 本示例延续了使用Lance创建用于LLM训练的文本数据集的示例。 如果尚未查看,请先参阅该示例。

在本示例中,我们将使用🤗 transformers在我们之前示例中创建的"wikitext_500K" lance数据集上训练一个LLM。

导入与设置

让我们通过导入所有必要的库并定义一些基础内容来设置环境。

import numpy as np
import lance

import torch
from torch.utils.data import Dataset, DataLoader, Sampler

from transformers import AutoTokenizer, AutoModelForCausalLM
from tqdm.auto import tqdm

# We'll be training the pre-trained GPT2 model in this example
model_name = 'gpt2'
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name)

# Also define some hyperparameters
lr = 3e-4
nb_epochs = 10
block_size = 1024
batch_size = 8
device = 'cuda:0'
dataset_path = 'wikitext_500K.lance'

既然基础设置已经完成,接下来让我们定义自定义数据集(Dataset)和一个采样器(Sampler),用于从我们的Lance数据集中流式传输token。

数据加载设置

我们首先定义一个实用函数,它将帮助我们以"块"的形式从lance数据集中加载任意数量的令牌。

def from_indices(dataset, indices):
    """Load the elements on given indices from the dataset"""
    chunk = dataset.take(indices).to_pylist()
    chunk = list(map(lambda x: x['input_ids'], chunk))
    return chunk

现在让我们定义用于加载令牌的自定义数据集和采样器。

class LanceDataset(Dataset):
    def __init__(
        self,
        dataset_path,
        block_size,
    ):
        # Load the lance dataset from the saved path
        self.ds = lance.dataset(dataset_path)
        self.block_size = block_size

        # Doing this so the sampler never asks for an index at the end of text
        self.length = self.ds.count_rows() - block_size

    def __len__(self):
        return self.length

    def __getitem__(self, idx):
        """
        Generate a window of indices starting from the current idx to idx+block_size
        and return the tokens at those indices
        """
        window = np.arange(idx, idx + self.block_size)
        sample = from_indices(self.ds, window)

        return {"input_ids": torch.tensor(sample), "labels": torch.tensor(sample)}

当采样器给出随机索引时,数据集将从当前索引开始加载接下来的block_size()个令牌。这本质上会形成一个样本,因为加载的令牌具有因果性。

不过我们还需要确保从数据集中获取的标记(token)不会重叠。让我们通过一个例子来理解这一点:

假设在训练循环中,对于某个任意的块大小,数据集返回以下标记:

“维也纳是奥地利的首都” 在样本 #1 的索引 = 12 处,以及,

“是奥地利的首都” 在样本 #2 的索引 = 13 处,以此类推

这里的问题是,如果我们允许数据加载器为任意数量的索引获取‘样本’,它们可能会重叠(如上所示)。 这对模型不利,因为它在看到足够多的重叠标记后可能会开始过拟合。

为了解决这个问题,我们定义了一个自定义采样器,它只返回彼此间隔'block_size'的索引,确保我们不会看到任何重叠的样本。

class LanceSampler(Sampler):
    r"""Samples tokens randomly but `block_size` indices apart.

    Args:
        data_source (Dataset): dataset to sample from
        block_size (int): minimum index distance between each random sample
    """

    def __init__(self, data_source, block_size=512):
        self.data_source = data_source
        self.num_samples = len(self.data_source)
        self.available_indices = list(range(0, self.num_samples, block_size))
        np.random.shuffle(self.available_indices)

    def __iter__(self):
        yield from self.available_indices

    def __len__(self) -> int:
        return len(self.available_indices)

现在当我们从数据集中获取tokens时,使用LanceSampler()作为采样器,可以确保模型在训练过程中看到的所有批次中的所有样本都不会重叠。

这是通过生成一个从0开始到数据集末尾的索引列表来实现的(如果您还记得,数据集长度等于lance数据集长度减去块大小),其中每个索引之间相隔'block_size'个位置。 然后我们打乱这个列表并从中生成索引。

以上就是数据加载的全部内容了!现在我们只剩下训练模型这一步了!

模型训练

现在你可以像处理其他数据集一样训练模型了!

# Define the dataset, sampler and dataloader
dataset = LanceDataset(dataset_path, block_size)
sampler = LanceSampler(dataset, block_size)
dataloader = DataLoader(
    dataset,
    shuffle=False,
    batch_size=batch_size,
    sampler=sampler,
    pin_memory=True
)

# Define the optimizer, training loop and train the model!
model = model.to(device)
model.train()
optimizer = torch.optim.AdamW(model.parameters(), lr=lr)

for epoch in range(nb_epochs):
    print(f"========= Epoch: {epoch+1} / {nb_epochs} =========")
    epoch_loss = []
    prog_bar = tqdm(dataloader, total=len(dataloader))
    for batch in prog_bar:
        optimizer.zero_grad(set_to_none=True)

        # Put both input_ids and labels to the device
        for k, v in batch.items():
            batch[k] = v.to(device)

        # Perform one forward pass and get the loss
        outputs = model(**batch)
        loss = outputs.loss

        # Perform backward pass
        loss.backward()
        optimizer.step()

        prog_bar.set_description(f"loss: {loss.item():.4f}")

        epoch_loss.append(loss.item())

    # Calculate training perplexity for this epoch
    try:
        perplexity = np.exp(np.mean(epoch_loss))
    except OverflowError:
        perplexity = float("-inf")

    print(f"train_perplexity: {perplexity}")

一个小提示:如果您的lance数据集非常大(比如wikitext_500K),并且您想调试模型以查找错误,您可能希望将数据加载器包装在iter()函数中,并且只运行几个批次。

基本上就是这样!

使用Lance、自定义数据集和采样器的最佳之处在于,得益于Lance提供的闪电般快速随机访问🚀,您将获得高达95%的平均GPU利用率,以及最小的CPU开销。