! [ -e /content ] && pip install -Uqq fastai # 在Colab上升级fastai文本学习者
from __future__ import annotations
from fastai.basics import *
from fastai.text.core import *
from fastai.text.data import *
from fastai.text.models.core import *
from fastai.text.models.awdlstm import *
from fastai.callback.rnn import *
from fastai.callback.progress import *from nbdev.showdoc import *构建适合于NLP中迁移学习的
Learner所需的所有函数
该模块最重要的功能是 language_model_learner 和 text_classifier_learner。它们将帮助您使用预训练模型定义一个 Learner。有关用法示例,请参见 文本教程。
加载预训练模型
在文本中,加载预训练模型时,我们需要将用于预训练的词汇的嵌入适配到我们当前语料库的词汇中。
def match_embeds(
old_wgts:dict, # 嵌入权重
old_vocab:list, # 用于预训练的语料库词汇
new_vocab:list # 当前语料库词汇
) -> dict:
"Convert the embedding in `old_wgts` to go from `old_vocab` to `new_vocab`."
bias, wgts = old_wgts.get('1.decoder.bias', None), old_wgts['0.encoder.weight']
wgts_m = wgts.mean(0)
new_wgts = wgts.new_zeros((len(new_vocab),wgts.size(1)))
if bias is not None:
bias_m = bias.mean(0)
new_bias = bias.new_zeros((len(new_vocab),))
old_o2i = old_vocab.o2i if hasattr(old_vocab, 'o2i') else {w:i for i,w in enumerate(old_vocab)}
for i,w in enumerate(new_vocab):
idx = old_o2i.get(w, -1)
new_wgts[i] = wgts[idx] if idx>=0 else wgts_m
if bias is not None: new_bias[i] = bias[idx] if idx>=0 else bias_m
old_wgts['0.encoder.weight'] = new_wgts
if '0.encoder_dp.emb.weight' in old_wgts: old_wgts['0.encoder_dp.emb.weight'] = new_wgts.clone()
old_wgts['1.decoder.weight'] = new_wgts.clone()
if bias is not None: old_wgts['1.decoder.bias'] = new_bias
return old_wgts对于 new_vocab 中没有对应匹配的 old_vocab 中的单词,我们使用所有预训练嵌入的均值。
wgts = {'0.encoder.weight': torch.randn(5,3)}
new_wgts = match_embeds(wgts.copy(), ['a', 'b', 'c'], ['a', 'c', 'd', 'b'])
old,new = wgts['0.encoder.weight'],new_wgts['0.encoder.weight']
test_eq(new[0], old[0])
test_eq(new[1], old[2])
test_eq(new[2], old.mean(0))
test_eq(new[3], old[1])#带有偏见
wgts = {'0.encoder.weight': torch.randn(5,3), '1.decoder.bias': torch.randn(5)}
new_wgts = match_embeds(wgts.copy(), ['a', 'b', 'c'], ['a', 'c', 'd', 'b'])
old_w,new_w = wgts['0.encoder.weight'],new_wgts['0.encoder.weight']
old_b,new_b = wgts['1.decoder.bias'], new_wgts['1.decoder.bias']
test_eq(new_w[0], old_w[0])
test_eq(new_w[1], old_w[2])
test_eq(new_w[2], old_w.mean(0))
test_eq(new_w[3], old_w[1])
test_eq(new_b[0], old_b[0])
test_eq(new_b[1], old_b[2])
test_eq(new_b[2], old_b.mean(0))
test_eq(new_b[3], old_b[1])def _get_text_vocab(dls:DataLoaders) -> list:
"Get vocabulary from `DataLoaders`"
vocab = dls.vocab
if isinstance(vocab, L): vocab = vocab[0]
return vocabdef load_ignore_keys(
model, # 模型架构
wgts:dict # 模型权重
) -> tuple:
"Load `wgts` in `model` ignoring the names of the keys, just taking parameters in order"
sd = model.state_dict()
for k1,k2 in zip(sd.keys(), wgts.keys()): sd[k1].data = wgts[k2].data.clone()
return model.load_state_dict(sd)def _rm_module(n:str):
t = n.split('.')
for i in range(len(t)-1, -1, -1):
if t[i] == 'module':
t.pop(i)
break
return '.'.join(t)#为了兼容之前的版本,发布时请删除
def clean_raw_keys(wgts:dict):
keys = list(wgts.keys())
for k in keys:
t = k.split('.module')
if f'{_rm_module(k)}_raw' in keys: del wgts[k]
return wgts#为了兼容之前的版本,发布时请删除
def load_model_text(
file:str, # 保存的文本模型文件名
model, # 模型架构
opt:Optimizer, # `优化器`用于拟合模型
with_opt:bool=None, # 启用加载 `优化器` 状态
device:int|str|torch.device=None, # Sets the device, uses 'cpu' if unspecified
strict:bool=True # 是否严格要求`file`的状态字典键与模型`Module.state_dict`的键完全匹配
):
"Load `model` from `file` along with `opt` (if available, and if `with_opt`)"
distrib_barrier()
if isinstance(device, int): device = torch.device('cuda', device)
elif device is None: device = 'cpu'
state = torch.load(file, map_location=device)
hasopt = set(state)=={'model', 'opt'}
model_state = state['model'] if hasopt else state
get_model(model).load_state_dict(clean_raw_keys(model_state), strict=strict)
if hasopt and ifnone(with_opt,True):
try: opt.load_state_dict(state['opt'])
except:
if with_opt: warn("Could not load the optimizer state.")
elif with_opt: warn("Saved file doesn't contain an optimizer state.")@delegates(Learner.__init__)
class TextLearner(Learner):
"Basic class for a `Learner` in NLP."
def __init__(self,
dls:DataLoaders, # 文本 `DataLoaders`
model, # 一个标准的PyTorch模型
alpha:float=2., # `RNNRegularizer`的参数
beta:float=1., # `RNNRegularizer`的参数
moms:tuple=(0.8,0.7,0.8), # `余弦退火调度器`的势头
**kwargs
):
super().__init__(dls, model, moms=moms, **kwargs)
self.add_cbs(rnn_cbs())
def save_encoder(self,
file:str # `Encoder` 的文件名
):
"Save the encoder to `file` in the model directory"
if rank_distrib(): return # don't save if child proc
encoder = get_model(self.model)[0]
if hasattr(encoder, 'module'): encoder = encoder.module
torch.save(encoder.state_dict(), join_path_file(file, self.path/self.model_dir, ext='.pth'))
def load_encoder(self,
file:str, # Filename of the saved encoder
device:int|str|torch.device=None # 用于加载的设备,默认为 `dls` 设备
):
"Load the encoder `file` from the model directory, optionally ensuring it's on `device`"
encoder = get_model(self.model)[0]
if device is None: device = self.dls.device
if hasattr(encoder, 'module'): encoder = encoder.module
distrib_barrier()
wgts = torch.load(join_path_file(file,self.path/self.model_dir, ext='.pth'), map_location=device)
encoder.load_state_dict(clean_raw_keys(wgts))
self.freeze()
return self
def load_pretrained(self,
wgts_fname:str, # Filename of saved weights
vocab_fname:str, # Saved vocabulary filename in pickle format
model=None # Model to load parameters from, defaults to `Learner.model`
):
"Load a pretrained model and adapt it to the data vocabulary."
old_vocab = load_pickle(vocab_fname)
new_vocab = _get_text_vocab(self.dls)
distrib_barrier()
wgts = torch.load(wgts_fname, map_location = lambda storage,loc: storage)
if 'model' in wgts: wgts = wgts['model'] #以防预训练模型是与优化器一起保存的
wgts = match_embeds(wgts, old_vocab, new_vocab)
load_ignore_keys(self.model if model is None else model, clean_raw_keys(wgts))
self.freeze()
return self
#为了兼容之前的版本。在发布时删除
@delegates(load_model_text)
def load(self,
file:str, # 保存模型的文件名
with_opt:bool=None, # 启用加载 `优化器` 状态
device:int|str|torch.device=None, # 用于加载的设备,默认为 `dls` 设备
**kwargs
):
if device is None: device = self.dls.device
if self.opt is None: self.create_opt()
file = join_path_file(file, self.path/self.model_dir, ext='.pth')
load_model_text(file, self.model, self.opt, device=device, **kwargs)
return self添加了一个 ModelResetter 和一个带有 alpha 和 beta 的 RNNRegularizer 到回调中,其余部分与 Learner 的初始化相同。
这个 Learner 为基类增加了功能:
show_doc(TextLearner.load_pretrained)TextLearner.load_pretrained
TextLearner.load_pretrained (wgts_fname:str, vocab_fname:str, model=None)
Load a pretrained model and adapt it to the data vocabulary.
| Type | Default | Details | |
|---|---|---|---|
| wgts_fname | str | Filename of saved weights | |
| vocab_fname | str | Saved vocabulary filename in pickle format | |
| model | NoneType | None | Model to load parameters from, defaults to Learner.model |
wgts_fname 应指向预训练模型的权重,vocab_fname 应指向用于预训练的词汇表。
show_doc(TextLearner.save_encoder)TextLearner.save_encoder
TextLearner.save_encoder (file:str)
Save the encoder to file in the model directory
| Type | Details | |
|---|---|---|
| file | str | Filename for Encoder |
模型目录为 Learner.path/Learner.model_dir。
show_doc(TextLearner.load_encoder)TextLearner.load_encoder
TextLearner.load_encoder (file:str, device:int|str|torch.device=None)
Load the encoder file from the model directory, optionally ensuring it’s on device
| Type | Default | Details | |
|---|---|---|---|
| file | str | Filename of the saved encoder | |
| device | int | str | torch.device | None | Device used to load, defaults to dls device |
语言模型预测
对于语言模型,predict 方法与其他应用有很大的不同,这就是为什么它需要自己的子类。
def decode_spec_tokens(tokens):
"Decode the special tokens in `tokens`"
new_toks,rule,arg = [],None,None
for t in tokens:
if t in [TK_MAJ, TK_UP, TK_REP, TK_WREP]: rule = t
elif rule is None: new_toks.append(t)
elif rule == TK_MAJ:
new_toks.append(t[:1].upper() + t[1:].lower())
rule = None
elif rule == TK_UP:
new_toks.append(t.upper())
rule = None
elif arg is None:
try: arg = int(t)
except: rule = None
else:
if rule == TK_REP: new_toks.append(t * arg)
else: new_toks += [t] * arg
return new_tokstest_eq(decode_spec_tokens(['xxmaj', 'text']), ['Text'])
test_eq(decode_spec_tokens(['xxup', 'text']), ['TEXT'])
test_eq(decode_spec_tokens(['xxrep', '3', 'a']), ['aaa'])
test_eq(decode_spec_tokens(['xxwrep', '3', 'word']), ['word', 'word', 'word'])class LMLearner(TextLearner):
"Add functionality to `TextLearner` when dealing with a language model"
def predict(self, text, n_words=1, no_unk=True, temperature=1., min_p=None, no_bar=False,
decoder=decode_spec_tokens, only_last_word=False):
"Return `text` and the `n_words` that come after"
self.model.reset()
idxs = idxs_all = self.dls.test_dl([text]).items[0].to(self.dls.device)
if no_unk: unk_idx = self.dls.vocab.index(UNK)
for _ in (range(n_words) if no_bar else progress_bar(range(n_words), leave=False)):
with self.no_bar(): preds,_ = self.get_preds(dl=[(idxs[None],)])
res = preds[0][-1]
if no_unk: res[unk_idx] = 0.
if min_p is not None:
if (res >= min_p).float().sum() == 0:
warn(f"There is no item with probability >= {min_p}, try a lower value.")
else: res[res < min_p] = 0.
if temperature != 1.: res.pow_(1 / temperature)
idx = torch.multinomial(res, 1).item()
idxs = idxs_all = torch.cat([idxs_all, idxs.new([idx])])
if only_last_word: idxs = idxs[-1][None]
num = self.dls.train_ds.numericalize
tokens = [num.vocab[i] for i in idxs_all if num.vocab[i] not in [BOS, PAD]]
sep = self.dls.train_ds.tokenizer.sep
return sep.join(decoder(tokens))
@delegates(Learner.get_preds)
def get_preds(self, concat_dim=1, **kwargs): return super().get_preds(concat_dim=1, **kwargs)show_doc(LMLearner, title_level=3)LMLearner
LMLearner (dls:DataLoaders, model, alpha:float=2.0, beta:float=1.0, moms:tuple=(0.8, 0.7, 0.8), loss_func:callable|None=None, opt_func=<function Adam>, lr=0.001, splitter:callable=<function trainable_params>, cbs=None, metrics=None, path=None, model_dir='models', wd=None, wd_bn_bias=False, train_bn=True, default_cbs:bool=True)
Add functionality to TextLearner when dealing with a language model
| Type | Default | Details | |
|---|---|---|---|
| dls | DataLoaders | Text DataLoaders |
|
| model | A standard PyTorch model | ||
| alpha | float | 2.0 | Param for RNNRegularizer |
| beta | float | 1.0 | Param for RNNRegularizer |
| moms | tuple | (0.8, 0.7, 0.8) | Momentum for Cosine Annealing Scheduler |
| loss_func | callable | None | None | Loss function for training |
| opt_func | function | Adam | Optimisation function for training |
| lr | float | 0.001 | Learning rate |
| splitter | callable | trainable_params | Used to split parameters into layer groups |
| cbs | NoneType | None | Callbacks |
| metrics | NoneType | None | Printed after each epoch |
| path | NoneType | None | Parent directory to save, load, and export models |
| model_dir | str | models | Subdirectory to save and load models |
| wd | NoneType | None | Weight decay |
| wd_bn_bias | bool | False | Apply weight decay to batchnorm bias params? |
| train_bn | bool | True | Always train batchnorm layers? |
| default_cbs | bool | True | Include default callbacks? |
show_doc(LMLearner.predict)LMLearner.predict
LMLearner.predict (text, n_words=1, no_unk=True, temperature=1.0, min_p=None, no_bar=False, decoder=<function decode_spec_tokens>, only_last_word=False)
Return text and the n_words that come after
这些词是根据每个索引的概率随机选择的。no_unk表示我们不会选择UNK标记,temperature被应用于预测中,如果传递了min_p,我们将不考虑概率低于该值的索引。如果您不想要任何进度条,可以将no_bar设置为True,同时您可以传递一个自定义的decoder来处理预测的标记。
Learner 便利函数
from fastai.text.models.core import _model_metadef _get_text_vocab(dls):
vocab = dls.vocab
if isinstance(vocab, L): vocab = vocab[0]
return vocab@delegates(Learner.__init__)
def language_model_learner(dls, arch, config=None, drop_mult=1., backwards=False, pretrained=True, pretrained_fnames=None, **kwargs):
"Create a `Learner` with a language model from `dls` and `arch`."
vocab = _get_text_vocab(dls)
model = get_language_model(arch, len(vocab), config=config, drop_mult=drop_mult)
meta = _model_meta[arch]
learn = LMLearner(dls, model, loss_func=CrossEntropyLossFlat(), splitter=meta['split_lm'], **kwargs)
url = 'url_bwd' if backwards else 'url'
if pretrained or pretrained_fnames:
if pretrained_fnames is not None:
fnames = [learn.path/learn.model_dir/f'{fn}.{ext}' for fn,ext in zip(pretrained_fnames, ['pth', 'pkl'])]
else:
if url not in meta:
warn("There are no pretrained weights for that architecture yet!")
return learn
model_path = untar_data(meta[url] , c_key='model')
try: fnames = [list(model_path.glob(f'*.{ext}'))[0] for ext in ['pth', 'pkl']]
except IndexError: print(f'The model in {model_path} is incomplete, download again'); raise
learn = learn.load_pretrained(*fnames)
return learn您可以使用 config 来自定义使用的架构(为此更改 awd_lstm_lm_config 中的值),pretrained 将使用 fastai 的预训练模型(如果可用)或您可以传递包含您自己的预训练模型和相应词汇表的具体 pretrained_fnames。所有其他参数都将传递给 Learner。
path = untar_data(URLs.IMDB_SAMPLE)
df = pd.read_csv(path/'texts.csv')
dls = TextDataLoaders.from_df(df, path=path, text_col='text', is_lm=True, valid_col='is_valid')
learn = language_model_learner(dls, AWD_LSTM)您可以使用 .predict 方法生成新文本。
learn.predict('This movie is about', n_words=20)'This movie is about plans by Tom Cruise to win a loyalty sharing award at the Battle of Christmas'
默认情况下,每次预测一个单词后,整个句子会被重新输入到模型中,这个小技巧提升了生成文本的质量。如果你只想输入最后一个单词,请指定参数 only_last_word。
learn.predict('This movie is about', n_words=20, only_last_word=True)'This movie is about the J. Intelligent , ha - agency . Griffith , and Games on the early after'
@delegates(Learner.__init__)
def text_classifier_learner(dls, arch, seq_len=72, config=None, backwards=False, pretrained=True, drop_mult=0.5, n_out=None,
lin_ftrs=None, ps=None, max_len=72*20, y_range=None, **kwargs):
"Create a `Learner` with a text classifier from `dls` and `arch`."
vocab = _get_text_vocab(dls)
if n_out is None: n_out = get_c(dls)
assert n_out, "`n_out` is not defined, and could not be inferred from data, set `dls.c` or pass `n_out`"
model = get_text_classifier(arch, len(vocab), n_out, seq_len=seq_len, config=config, y_range=y_range,
drop_mult=drop_mult, lin_ftrs=lin_ftrs, ps=ps, max_len=max_len)
meta = _model_meta[arch]
learn = TextLearner(dls, model, splitter=meta['split_clas'], **kwargs)
url = 'url_bwd' if backwards else 'url'
if pretrained:
if url not in meta:
warn("There are no pretrained weights for that architecture yet!")
return learn
model_path = untar_data(meta[url], c_key='model')
try: fnames = [list(model_path.glob(f'*.{ext}'))[0] for ext in ['pth', 'pkl']]
except IndexError: print(f'The model in {model_path} is incomplete, download again'); raise
learn = learn.load_pretrained(*fnames, model=learn.model[0])
learn.freeze()
return learn您可以使用 config 自定义所使用的架构(通过更改 awd_lstm_clas_config 中的值来实现),pretrained 将使用 fastai 的预训练模型来支持该 arch(如果可用)。drop_mult 是一个全局乘子,用于控制所有的丢弃率。n_out 通常从 dls 中推断,但您也可以直接传递它。
该模型使用 SentenceEncoder,这意味着文本一次传递 seq_len 个标记,并且只会在最后的 max_len 步骤上计算梯度。lin_ftrs 和 ps 被传递给 get_text_classifier。
所有其他参数都被传递给 Learner。
path = untar_data(URLs.IMDB_SAMPLE)
df = pd.read_csv(path/'texts.csv')
dls = TextDataLoaders.from_df(df, path=path, text_col='text', label_col='label', valid_col='is_valid')
learn = text_classifier_learner(dls, AWD_LSTM)显示方法 -
@typedispatch
def show_results(x: LMTensorText, y, samples, outs, ctxs=None, max_n=10, **kwargs):
if ctxs is None: ctxs = get_empty_df(min(len(samples), max_n))
for i,l in enumerate(['input', 'target']):
ctxs = [b.show(ctx=c, label=l, **kwargs) for b,c,_ in zip(samples.itemgot(i),ctxs,range(max_n))]
ctxs = [b.show(ctx=c, label='pred', **kwargs) for b,c,_ in zip(outs.itemgot(0),ctxs,range(max_n))]
display_df(pd.DataFrame(ctxs))
return ctxs@typedispatch
def show_results(x: TensorText, y, samples, outs, ctxs=None, max_n=10, trunc_at=150, **kwargs):
if ctxs is None: ctxs = get_empty_df(min(len(samples), max_n))
samples = L((s[0].truncate(trunc_at),*s[1:]) for s in samples)
ctxs = show_results[object](x, y, samples, outs, ctxs=ctxs, max_n=max_n, **kwargs)
display_df(pd.DataFrame(ctxs))
return ctxs@typedispatch
def plot_top_losses(x: TensorText, y:TensorCategory, samples, outs, raws, losses, trunc_at=150, **kwargs):
rows = get_empty_df(len(samples))
samples = L((s[0].truncate(trunc_at),*s[1:]) for s in samples)
for i,l in enumerate(['input', 'target']):
rows = [b.show(ctx=c, label=l, **kwargs) for b,c in zip(samples.itemgot(i),rows)]
outs = L(o + (TitledFloat(r.max().item()), TitledFloat(l.item())) for o,r,l in zip(outs, raws, losses))
for i,l in enumerate(['predicted', 'probability', 'loss']):
rows = [b.show(ctx=c, label=l, **kwargs) for b,c in zip(outs.itemgot(i),rows)]
display_df(pd.DataFrame(rows))导出 -
from nbdev import nbdev_export
nbdev_export()