使用PyTorch Ignite进行机器翻译
本教程简要介绍了如何使用PyTorch Ignite训练机器翻译模型(或任何其他seq2seq模型)。 本笔记本使用了来自Huggingface的模型、数据集和分词器,因此它们可以轻松替换为🤗 Hub中的其他模型。
必需的依赖项
%%capture
!pip install pytorch-ignite
!pip install transformers
!pip install datasets
!pip install sentencepiece
对于TPUs
# VERSION = !curl -s https://api.github.com/repos/pytorch/xla/releases/latest | grep -Po '"tag_name": "v\K.*?(?=")'
# VERSION = VERSION[0].rstrip('.0') # remove trailing zero
# !pip install cloud-tpu-client==0.10 https://storage.googleapis.com/tpu-pytorch/wheels/torch_xla-{VERSION}-cp37-cp37m-linux_x86_64.whl
通用配置
我们维护一个配置字典,可以扩展或更改以存储训练期间所需的参数。当我们稍后使用这些参数时,可以参考这段代码。
在这个例子中,我们使用的是t5-small,它有60M参数。t5模型的工作方式是它们接受带有任务特定前缀的输入。这个前缀(如“将英语翻译成德语”)会让我们的模型知道它需要执行哪个任务。更多详情请参考原始论文这里。
在这里,我们在每个步骤中训练较少的迭代次数,并且使用有限的数据集,这可以通过修改train_dataset_length和epoch_length配置来调整。
config = {
"seed": 216,
"with_amp": False,
"num_epochs": 1,
"batch_size": 32,
"output_path_": "/content",
"model_name": "t5-small",
"tokenizer_name": "t5-small",
"freeze_encoder": False,
"num_workers": 4,
"weight_decay": 0.01,
"learning_rate": 1e-4,
"accumulation_steps": 1,
"epoch_length": 500,
"print_output_every": 50,
}
dataset_configs = {
"source_language":"English",
"source_text_id":"en",
"target_language":"German",
"target_text_id":"de",
"max_length": 80,
"train_dataset_length": -1,
"validation_dataset_length": 100,
"train_test_split": 0.3,
}
基本设置
导入
import warnings
from datetime import datetime
from pathlib import Path
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.cuda.amp import GradScaler, autocast
from torch.utils.data import random_split
import ignite
import ignite.distributed as idist
from ignite.contrib.engines import common
from ignite.engine import Engine, Events
from ignite.handlers import Checkpoint, global_step_from_engine
from ignite.metrics import Bleu
from ignite.utils import manual_seed, setup_logger
from datasets import load_dataset
from transformers import T5ForConditionalGeneration, AutoTokenizer
warnings.filterwarnings("ignore")
准备数据
我们将使用来自🤗 Hub的new_commentary数据(英语 - 德语)来进行这个示例。
from datasets import load_dataset
dataset = load_dataset("news_commentary", "de-en")
dataset = dataset.shuffle(seed=config["seed"])
Reusing dataset news_commentary (/root/.cache/huggingface/datasets/news_commentary/de-en/11.0.0/cfab724ce975dc2da51cdae45302389860badc88b74db8570d561ced6004f8b4)
0%| | 0/1 [00:00<?, ?it/s]
Loading cached shuffled indices for dataset at /root/.cache/huggingface/datasets/news_commentary/de-en/11.0.0/cfab724ce975dc2da51cdae45302389860badc88b74db8570d561ced6004f8b4/cache-199f0b60779b6122.arrow
dataset = dataset["train"]
dataset = dataset.train_test_split(test_size=dataset_configs["train_test_split"])
train_dataset, validation_dataset = dataset["train"], dataset["test"]
print("Lengths")
print("\t Train Set - {}".format(len(train_dataset)))
print("\t Val Set - {}".format(len(validation_dataset)))
Loading cached split indices for dataset at /root/.cache/huggingface/datasets/news_commentary/de-en/11.0.0/cfab724ce975dc2da51cdae45302389860badc88b74db8570d561ced6004f8b4/cache-23d286abe396b3d4.arrow and /root/.cache/huggingface/datasets/news_commentary/de-en/11.0.0/cfab724ce975dc2da51cdae45302389860badc88b74db8570d561ced6004f8b4/cache-387687cf22f2e607.arrow
Lengths
Train Set - 156207
Val Set - 66946
查看数据集样本。
print("Example of a Datapoint \n")
print(train_dataset[0], "\n")
Example of a Datapoint
{'id': '123598', 'translation': {'de': 'Nachrichtenberichte und „Analysen“ der staatlich kontrollierten Sender in Russland und Georgien, die ein negatives Image „des Feindes“ zeichnen, dienen lediglich dazu, die Kluft zwischen den ethnischen Gruppen noch zu vertiefen.', 'en': 'News reports and “analysis” by state-controlled channels in both Russia and Georgia that promote negative images of “the enemy” serve only to widen the gap between ethnic groups.'}}
分词器
需要定义分词器以将输入从字符串转换为令牌ID。机器翻译分词器需要关于源语言和目标语言的额外参数,更多信息请参考这里。
tokenizer = AutoTokenizer.from_pretrained(config["tokenizer_name"])
数据集类
将数据分词并返回一个包含输入和目标的字典。
如果你想在数据的子集上进行训练 - 修改数据集配置中的train_dataset_length和validation_dataset_length。保持它们为-1以获取整个长度。
class TransformerDataset(torch.utils.data.Dataset):
def __init__(
self, data, src_text_id, tgt_text_id, tokenizer, max_length, length_dataset
):
self.data = data
self.src_text_id = src_text_id
self.tgt_text_id = tgt_text_id
self.tokenizer = tokenizer
self.max_length = max_length
self.length_dataset = length_dataset if length_dataset != -1 else len(self.data)
def __getitem__(self, idx):
# t5 models require a prefix describing the task
task_prefix = "translate {} to {}: ".format(dataset_configs["source_language"], dataset_configs["target_language"])
src_text = [task_prefix + str(self.data[idx]["translation"][self.src_text_id])]
tgt_text = [str(self.data[idx]["translation"][self.tgt_text_id])]
input_txt_tokenized = self.tokenizer(
src_text, max_length=self.max_length, padding="max_length", truncation=True
)
with self.tokenizer.as_target_tokenizer():
tgt_text_tokenized = self.tokenizer(
tgt_text,
max_length=self.max_length,
padding="max_length",
truncation=True,
)
# The pad token in target is replaced with -100 so that it doesn't get added to loss.
tgt_text_tokenized = [
[(l if l != self.tokenizer.pad_token_id else -100) for l in label]
for label in tgt_text_tokenized.input_ids
]
input_txt_tokenized.update({"tgt": tgt_text_tokenized[0]})
batch = {
k: torch.tensor(v).squeeze(0) for (k, v) in input_txt_tokenized.items()
}
return batch
def __len__(self):
return self.length_dataset
train_data = TransformerDataset(
train_dataset,
dataset_configs["source_text_id"],
dataset_configs["target_text_id"],
tokenizer,
dataset_configs["max_length"],
dataset_configs["train_dataset_length"],
)
val_data = TransformerDataset(
validation_dataset,
dataset_configs["source_text_id"],
dataset_configs["target_text_id"],
tokenizer,
dataset_configs["max_length"],
dataset_configs["validation_dataset_length"],
)
训练师
训练器接收一批输入并将它们通过模型(在这种情况下还包括目标)并获取损失。
混合精度
前向传播被包裹在自动混合精度训练的上下文管理器中。在这个例子中它被关闭了,因为使用batch_size为1或2时不会有任何内存优势。更改配置中的with_amp标志以开启它。
梯度累积
梯度累积的实现是因为批大小为1会导致更新噪声。请检查配置中的accumulation_steps变量以定义累积梯度的步数。
训练器处理器
处理程序可以直接定义并附加到训练引擎上。在这里,我们还使用了一个特殊函数:setup_common_training_handlers,它已经定义了许多常用且有用的处理程序(如save_every_iters、clear_cuda_cache等)。要了解更多关于此函数的信息,请参阅文档这里。
# Create Trainer
def create_trainer(model, optimizer, with_amp, train_sampler, logger):
device = idist.device()
scaler = GradScaler(enabled=with_amp)
def train_step(engine, batch):
model.train()
if batch["tgt"].device != device:
batch = {
k: v.to(device, non_blocking=True, dtype=torch.long)
for (k, v) in batch.items()
}
src_ids = batch["input_ids"]
src_attention_mask = batch["attention_mask"]
tgt = batch["tgt"]
with autocast(enabled=with_amp):
y = model(input_ids=src_ids, attention_mask=src_attention_mask, labels=tgt)
loss = y["loss"]
loss /= config["accumulation_steps"]
scaler.scale(loss).backward()
if engine.state.iteration % config["accumulation_steps"] == 0:
scaler.step(optimizer)
scaler.update()
optimizer.zero_grad()
return {"batch loss": loss.item()}
trainer = Engine(train_step)
trainer.logger = logger
metric_names = ["batch loss"]
common.setup_common_training_handlers(
trainer=trainer,
train_sampler=train_sampler,
output_names=metric_names,
clear_cuda_cache=False,
with_pbars=True,
)
return trainer
评估器
类似于训练器,我们为验证步骤创建了一个评估器。在这里,我们计算指标(如Bleu分数)。为此,Bleu分数需要句子而不是logits。ids_to_clean_text函数用于实现这一点。
如果你想改变输出句子的打印频率,可以更改print_output_every标志。
# Let's now setup evaluator engine to perform model's validation and compute metrics
def create_evaluator(model, tokenizer, metrics, logger, tag="val"):
device = idist.device()
def ids_to_clean_text(generated_ids):
gen_text = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)
return list(map(str.strip, gen_text))
@torch.no_grad()
def evaluate_step(engine, batch):
model.eval()
if batch["tgt"].device != device:
batch = {
k: v.to(device, non_blocking=True, dtype=torch.long)
for (k, v) in batch.items()
}
src_ids = batch["input_ids"]
src_attention_mask = batch["attention_mask"]
tgt = batch["tgt"]
if idist.get_world_size() > 1:
y_pred = model.module.generate(input_ids=src_ids, attention_mask=src_attention_mask)
else:
y_pred = model.generate(input_ids=src_ids, attention_mask=src_attention_mask)
tgt = torch.where(tgt != -100, tgt, tokenizer.pad_token_id)
preds = ids_to_clean_text(y_pred)
tgt = ids_to_clean_text(tgt)
preds = [_preds.split() for _preds in preds]
tgt = [[_tgt.split()] for _tgt in tgt]
if engine.state.iteration % config["print_output_every"] == 0:
logger.info(f'\n Preds : {" ".join(preds[0])} \n')
logger.info(f'\n Target : {" ".join(tgt[0][0])} \n')
return preds, tgt
evaluator = Engine(evaluate_step)
for name, metric in metrics.items():
metric.attach(evaluator, name)
return evaluator
初始化函数
在这里我们初始化模型和优化器。
get_dataloader 返回用于训练和验证的数据加载器。
def freeze_params(model):
for par in model.parameters():
par.requires_grad = False
def initialize():
model = T5ForConditionalGeneration.from_pretrained(config["model_name"])
lr = config["learning_rate"] * idist.get_world_size()
no_decay = ["bias", "LayerNorm.weight"]
optimizer_grouped_parameters = [
{
"params": [
p
for n, p in model.named_parameters()
if not any(nd in n for nd in no_decay)
],
"weight_decay": config["weight_decay"],
},
{
"params": [
p
for n, p in model.named_parameters()
if any(nd in n for nd in no_decay)
],
"weight_decay": 0.0,
},
]
if config["freeze_encoder"]:
freeze_params(model.get_encoder())
model = idist.auto_model(model)
optimizer = optim.AdamW(optimizer_grouped_parameters, lr=lr)
optimizer = idist.auto_optim(optimizer)
return model, optimizer
def get_dataloaders(train_dataset, val_dataset):
# Setup data loader also adapted to distributed config: nccl, gloo, xla-tpu
train_loader = idist.auto_dataloader(
train_dataset,
batch_size=config["batch_size"],
num_workers=config["num_workers"],
shuffle=True,
drop_last=True,
)
val_loader = idist.auto_dataloader(
val_dataset,
batch_size=2 * config["batch_size"],
num_workers=config["num_workers"],
shuffle=False,
)
return train_loader, val_loader
日志处理程序
此步骤是可选的,然而,我们可以传递一个setup_logger()对象给log_basic_info()并记录所有基本信息,例如不同版本、当前配置、当前进程(由其本地排名标识)使用的设备和后端,以及进程数量(world size)。idist(ignite.distributed)提供了几个实用函数,如get_local_rank()、backend()、get_world_size()等,以实现这一点。
log_metrics_eval 用于记录运行评估时的指标和评估时间。
get_save_handler 用于在每次调用时将模型保存到输出路径。
def log_metrics_eval(logger, epoch, elapsed, tag, metrics):
metrics_output = "\n".join([f"\t{k}: {v}" for k, v in metrics.items()])
logger.info(
f"\nEpoch {epoch} - Evaluation time (seconds): {elapsed:.2f} - {tag} metrics:\n {metrics_output}"
)
def log_basic_info(logger, config):
logger.info(f"Train on CIFAR10")
logger.info(f"- PyTorch version: {torch.__version__}")
logger.info(f"- Ignite version: {ignite.__version__}")
if torch.cuda.is_available():
# explicitly import cudnn as torch.backends.cudnn can not be pickled with hvd spawning procs
from torch.backends import cudnn
logger.info(
f"- GPU Device: {torch.cuda.get_device_name(idist.get_local_rank())}"
)
logger.info(f"- CUDA version: {torch.version.cuda}")
logger.info(f"- CUDNN version: {cudnn.version()}")
logger.info("\n")
logger.info("Configuration:")
for key, value in config.items():
logger.info(f"\t{key}: {value}")
logger.info("\n")
if idist.get_world_size() > 1:
logger.info("\nDistributed setting:")
logger.info(f"\tbackend: {idist.backend()}")
logger.info(f"\tworld size: {idist.get_world_size()}")
logger.info("\n")
def get_save_handler(config):
return config["output_path_"]
开始训练
这是主要逻辑所在的地方,即我们将从这里调用上述所有函数:
- Basic Setup
- 我们设置了一个
manual_seed()和setup_logger(),然后记录所有基本信息。 - 初始化数据加载器、模型和优化器。
- 我们设置了一个
- 我们使用上述对象来创建一个训练器。
- Evaluator
- 定义一些相关的 Ignite 指标,比如
Bleu()。 - 创建评估器:
evaluator用于在val_dataloader上计算指标。 - 定义
run_validation()以计算两个数据加载器上的指标并记录它们。然后我们将此函数附加到训练器,以便在训练周期后运行。
- 定义一些相关的 Ignite 指标,比如
- 在主进程中使用
setup_tb_logging()为评估器设置 TensorBoard 日志,以便可以记录验证指标和学习率。 - 定义一个
Checkpoint()对象来存储验证准确度(在指标中定义为Bleu())最好的两个模型 (n_saved),并将其附加到 val_evaluator,以便每次执行evaluator时都可以执行。 - 尝试在
train_loader上训练num_epochs次 - 训练完成后关闭 Tensorboard 记录器。
def training(local_rank):
rank = idist.get_rank()
manual_seed(config["seed"] + rank)
device = idist.device()
logger = setup_logger(name="NMT", distributed_rank=local_rank)
log_basic_info(logger, config)
train_loader, val_loader = get_dataloaders(train_data, val_data)
model, optimizer = initialize()
trainer = create_trainer(
model, optimizer, config["with_amp"], train_loader.sampler, logger
)
metrics = {
"bleu": Bleu(ngram=4, smooth="smooth1", average="micro"),
"bleu_smooth_2": Bleu(ngram=4, smooth="smooth2", average="micro"),
}
evaluator = create_evaluator(
model, tokenizer, metrics, logger, tag="val"
)
@trainer.on(Events.EPOCH_COMPLETED(every=1) | Events.COMPLETED | Events.STARTED)
def run_validation(engine):
epoch = trainer.state.epoch
state = evaluator.run(val_loader)
log_metrics_eval(
logger, epoch, state.times["COMPLETED"], "Validation", state.metrics
)
if rank == 0:
now = datetime.now().strftime("%Y%m%d-%H%M%S")
folder_name = f"Translation_Model_backend-{idist.backend()}-{idist.get_world_size()}_{now}"
output_path = Path(config["output_path_"]) / folder_name
if not output_path.exists():
output_path.mkdir(parents=True)
logger.info(f"Output path: {output_path}")
evaluators = {"val": evaluator}
tb_logger = common.setup_tb_logging(
config["output_path_"], trainer, optimizer, evaluators=evaluators
)
best_model_handler = Checkpoint(
{"model": model},
get_save_handler(config),
filename_prefix="best",
n_saved=2,
global_step_transform=global_step_from_engine(trainer),
score_name="val_bleu",
score_function=Checkpoint.get_default_score_fn("bleu"),
)
evaluator.add_event_handler(Events.COMPLETED, best_model_handler)
try:
state = trainer.run(
train_loader,
max_epochs=config["num_epochs"],
epoch_length=config["epoch_length"],
)
except Exception as e:
logger.exception("")
raise e
if rank == 0:
tb_logger.close()
运行
要使用TPU运行,请将backend更改为“xla-tpu”,并将nproc_per_node更改为1或8。
def run():
with idist.Parallel(backend=None, nproc_per_node=None) as parallel:
parallel.run(training)
if __name__ == '__main__':
run()
2021-10-21 13:46:21,877 ignite.distributed.launcher.Parallel INFO: - Run '<function training at 0x7f1fbee15710>' in 1 processes
2021-10-21 13:46:21,918 NMT INFO: Train on CIFAR10
2021-10-21 13:46:21,920 NMT INFO: - PyTorch version: 1.9.0+cu111
2021-10-21 13:46:21,922 NMT INFO: - Ignite version: 0.5.0
2021-10-21 13:46:21,925 NMT INFO: - GPU Device: Tesla K80
2021-10-21 13:46:21,926 NMT INFO: - CUDA version: 11.1
2021-10-21 13:46:21,931 NMT INFO: - CUDNN version: 8005
2021-10-21 13:46:21,933 NMT INFO:
2021-10-21 13:46:21,936 NMT INFO: Configuration:
2021-10-21 13:46:21,938 NMT INFO: seed: 216
2021-10-21 13:46:21,940 NMT INFO: with_amp: False
2021-10-21 13:46:21,943 NMT INFO: num_epochs: 1
2021-10-21 13:46:21,946 NMT INFO: batch_size: 32
2021-10-21 13:46:21,949 NMT INFO: output_path_: /content
2021-10-21 13:46:21,952 NMT INFO: model_name: t5-small
2021-10-21 13:46:21,956 NMT INFO: tokenizer_name: t5-small
2021-10-21 13:46:21,959 NMT INFO: freeze_encoder: False
2021-10-21 13:46:21,961 NMT INFO: num_workers: 4
2021-10-21 13:46:21,964 NMT INFO: weight_decay: 0.01
2021-10-21 13:46:21,968 NMT INFO: learning_rate: 0.0001
2021-10-21 13:46:21,972 NMT INFO: accumulation_steps: 1
2021-10-21 13:46:21,974 NMT INFO: epoch_length: 500
2021-10-21 13:46:21,976 NMT INFO: print_output_every: 50
2021-10-21 13:46:21,980 NMT INFO:
2021-10-21 13:46:21,983 ignite.distributed.auto.auto_dataloader INFO: Use data loader kwargs for dataset '<__main__.Transforme':
{'batch_size': 32, 'num_workers': 4, 'shuffle': True, 'drop_last': True, 'pin_memory': True}
2021-10-21 13:46:21,986 ignite.distributed.auto.auto_dataloader INFO: Use data loader kwargs for dataset '<__main__.Transforme':
{'batch_size': 64, 'num_workers': 4, 'shuffle': False, 'pin_memory': True}
2021-10-21 13:46:26,245 NMT INFO: Output path: /content/Translation_Model_backend-None-1_20211021-134626
2021-10-21 13:46:26,327 NMT INFO: Engine run starting with max_epochs=1.
2021-10-21 13:46:28,533 NMT INFO:
Epoch 0 - Evaluation time (seconds): 2.10 - Validation metrics:
bleu: 0.10135051023993102
bleu_smooth_2: 0.10169442246586281
100%|##########| 1/1 [00:00<?, ?it/s]
[1/500] 0%| [00:00<?]
2021-10-21 13:52:00,975 NMT INFO:
Epoch 1 - Evaluation time (seconds): 2.03 - Validation metrics:
bleu: 0.10242125441879026
bleu_smooth_2: 0.10276058920188186
2021-10-21 13:52:00,978 NMT INFO: Epoch[1] Complete. Time taken: 00:05:32
2021-10-21 13:52:03,141 NMT INFO:
Epoch 1 - Evaluation time (seconds): 2.04 - Validation metrics:
bleu: 0.10242125441879026
bleu_smooth_2: 0.10276058920188186
2021-10-21 13:52:03,143 NMT INFO: Engine run complete. Time taken: 00:05:37
2021-10-21 13:52:03,267 ignite.distributed.launcher.Parallel INFO: End of run