代码示例 / 生成式深度学习 / 使用FNet进行文本生成

使用FNet进行文本生成

作者: Darshan Deshpande
创建日期: 2021/10/05
最后修改日期: 2021/10/05

在Colab中查看 GitHub源代码

描述: 用于文本生成的FNet变换器。


介绍

原始的变换器实现(Vaswani等,2017)是自然语言处理领域的重要突破,催生了BERT和GPT等重要架构。然而,这些架构的缺点在于它们使用的自注意力机制在计算上是昂贵的。FNet架构提出用一种更精简的机制替换这种自注意力:基于傅里叶变换的线性混合器来处理输入标记。

FNet模型能够在训练时以比BERT快80%的速度和在TPU上几乎快70%的速度达到BERT 92-97%的准确率。这种设计提供了高效且小的模型大小,导致更快的推理时间。

在这个示例中,我们将实现并训练这个架构在康奈尔电影对话语料库上,以展示该模型在文本生成中的适用性。


导入库

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import os

# 定义超参数

VOCAB_SIZE = 8192
MAX_SAMPLES = 50000
BUFFER_SIZE = 20000
MAX_LENGTH = 40
EMBED_DIM = 256
LATENT_DIM = 512
NUM_HEADS = 8
BATCH_SIZE = 64

加载数据

我们将使用康奈尔对话语料库。我们将把电影对话解析成问题和答案集。

path_to_zip = keras.utils.get_file(
    "cornell_movie_dialogs.zip",
    origin="http://www.cs.cornell.edu/~cristian/data/cornell_movie_dialogs_corpus.zip",
    extract=True,
)

path_to_dataset = os.path.join(
    os.path.dirname(path_to_zip), "cornell movie-dialogs corpus"
)
path_to_movie_lines = os.path.join(path_to_dataset, "movie_lines.txt")
path_to_movie_conversations = os.path.join(path_to_dataset, "movie_conversations.txt")


def load_conversations():
    # 加载对话拆分的辅助函数
    id2line = {}
    with open(path_to_movie_lines, errors="ignore") as file:
        lines = file.readlines()
    for line in lines:
        parts = line.replace("\n", "").split(" +++$+++ ")
        id2line[parts[0]] = parts[4]

    inputs, outputs = [], []
    with open(path_to_movie_conversations, "r") as file:
        lines = file.readlines()
    for line in lines:
        parts = line.replace("\n", "").split(" +++$+++ ")
        # 获取对话的行ID列表
        conversation = [line[1:-1] for line in parts[3][1:-1].split(", ")]
        for i in range(len(conversation) - 1):
            inputs.append(id2line[conversation[i]])
            outputs.append(id2line[conversation[i + 1]])
            if len(inputs) >= MAX_SAMPLES:
                return inputs, outputs
    return inputs, outputs


questions, answers = load_conversations()

# 划分训练集和验证集

train_dataset = tf.data.Dataset.from_tensor_slices((questions[:40000], answers[:40000]))
val_dataset = tf.data.Dataset.from_tensor_slices((questions[40000:], answers[40000:]))
从 http://www.cs.cornell.edu/~cristian/data/cornell_movie_dialogs_corpus.zip 下载数据
9920512/9916637 [==============================] - 0s 0us/step
9928704/9916637 [==============================] - 0s 0us/step

预处理和标记化

def preprocess_text(sentence):
    sentence = tf.strings.lower(sentence)
    # 在标点符号与最后一个单词之间添加空格以允许更好的标记化
    sentence = tf.strings.regex_replace(sentence, r"([?.!,])", r" \1 ")
    # 用一个空格替换多个连续的空格
    sentence = tf.strings.regex_replace(sentence, r"\s\s+", " ")
    # 用空格替换非英语单词
    sentence = tf.strings.regex_replace(sentence, r"[^a-z?.!,]+", " ")
    sentence = tf.strings.strip(sentence)
    sentence = tf.strings.join(["[start]", sentence, "[end]"], separator=" ")
    return sentence


vectorizer = layers.TextVectorization(
    VOCAB_SIZE,
    standardize=preprocess_text,
    output_mode="int",
    output_sequence_length=MAX_LENGTH,
)

# 我们将对问题和答案同时适配vectorizer
# 此数据集被批处理以并行化和加速处理过程
vectorizer.adapt(tf.data.Dataset.from_tensor_slices((questions + answers)).batch(128))

使用TextVectorization进行标记化和填充句子

def vectorize_text(inputs, outputs):
    inputs, outputs = vectorizer(inputs), vectorizer(outputs)
    # 右侧添加一个额外的填充标记以匹配输出形状
    outputs = tf.pad(outputs, [[0, 1]])
    return (
        {"encoder_inputs": inputs, "decoder_inputs": outputs[:-1]},
        {"outputs": outputs[1:]},
    )


train_dataset = train_dataset.map(vectorize_text, num_parallel_calls=tf.data.AUTOTUNE)
val_dataset = val_dataset.map(vectorize_text, num_parallel_calls=tf.data.AUTOTUNE)

train_dataset = (
    train_dataset.cache()
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE)
    .prefetch(tf.data.AUTOTUNE)
)
val_dataset = val_dataset.cache().batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

创建 FNet 编码器

FNet 论文提出了一种替代 Transformer 架构中使用的标准注意力机制(Vaswani 等,2017)。

架构

FFT 层的输出是复数。为避免处理复数层,只提取实部(幅度)。

傅里叶变换后面的密集层充当对频域进行卷积。

class FNetEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.dense_proj = keras.Sequential(
            [
                layers.Dense(dense_dim, activation="relu"),
                layers.Dense(embed_dim),
            ]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs):
        # 将输入转换为 complex64
        inp_complex = tf.cast(inputs, tf.complex64)
        # 使用 FFT2D 将输入投影到频域并
        # 提取输出的实部
        fft = tf.math.real(tf.signal.fft2d(inp_complex))
        proj_input = self.layernorm_1(inputs + fft)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

创建解码器

解码器架构与(Vaswani 等,2017)在原始 Transformer 架构中提出的结构相同,由嵌入、位置编码、两个掩蔽多头注意力层以及最后的密集输出层组成。 以下架构取自于 深度学习与 Python 第二版,第 11 章

class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim
        )
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=embed_dim
        )
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)


class FNetDecoder(layers.Layer):
    def __init__(self, embed_dim, latent_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.latent_dim = latent_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [
                layers.Dense(latent_dim, activation="relu"),
                layers.Dense(embed_dim),
            ]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)

        attention_output_1 = self.attention_1(
            query=inputs, value=inputs, key=inputs, attention_mask=causal_mask
        )
        out_1 = self.layernorm_1(inputs + attention_output_1)

        attention_output_2 = self.attention_2(
            query=out_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        out_2 = self.layernorm_2(out_1 + attention_output_2)

        proj_output = self.dense_proj(out_2)
        return self.layernorm_3(out_2 + proj_output)

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)],
            axis=0,
        )
        return tf.tile(mask, mult)


def create_model():
    encoder_inputs = keras.Input(shape=(None,), dtype="int32", name="encoder_inputs")
    x = PositionalEmbedding(MAX_LENGTH, VOCAB_SIZE, EMBED_DIM)(encoder_inputs)
    encoder_outputs = FNetEncoder(EMBED_DIM, LATENT_DIM)(x)
    encoder = keras.Model(encoder_inputs, encoder_outputs)
    decoder_inputs = keras.Input(shape=(None,), dtype="int32", name="decoder_inputs")
    encoded_seq_inputs = keras.Input(
        shape=(None, EMBED_DIM), name="decoder_state_inputs"
    )
    x = PositionalEmbedding(MAX_LENGTH, VOCAB_SIZE, EMBED_DIM)(decoder_inputs)
    x = FNetDecoder(EMBED_DIM, LATENT_DIM, NUM_HEADS)(x, encoded_seq_inputs)
    x = layers.Dropout(0.5)(x)
    decoder_outputs = layers.Dense(VOCAB_SIZE, activation="softmax")(x)
    decoder = keras.Model(
        [decoder_inputs, encoded_seq_inputs], decoder_outputs, name="outputs"
    )
    decoder_outputs = decoder([decoder_inputs, encoder_outputs])
    fnet = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs, name="fnet")
    return fnet

创建和训练模型

fnet = create_model()
fnet.compile("adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])

这里,epochs参数设置为一个epoch,但实际上模型大约需要 20-30个epochs的训练才能开始输出可理解的句子。虽然准确率 不是这个任务的一个良好衡量标准,但我们将使用它来了解 网络的改进。

fnet.fit(train_dataset, epochs=1, validation_data=val_dataset)
625/625 [==============================] - 96s 133ms/step - loss: 1.3036 - accuracy: 0.4354 - val_loss: 0.7964 - val_accuracy: 0.6374

<keras.callbacks.History at 0x7f0d8d214c90>

执行推理

VOCAB = vectorizer.get_vocabulary()


def decode_sentence(input_sentence):
    # 将输入句子映射到标记并添加起始和结束标记
    tokenized_input_sentence = vectorizer(
        tf.constant("[start] " + preprocess_text(input_sentence) + " [end]")
    )
    # 初始化仅包含起始标记的初始句子。
    tokenized_target_sentence = tf.expand_dims(VOCAB.index("[start]"), 0)
    decoded_sentence = ""

    for i in range(MAX_LENGTH):
        # 获取预测结果
        predictions = fnet.predict(
            {
                "encoder_inputs": tf.expand_dims(tokenized_input_sentence, 0),
                "decoder_inputs": tf.expand_dims(
                    tf.pad(
                        tokenized_target_sentence,
                        [[0, MAX_LENGTH - tf.shape(tokenized_target_sentence)[0]]],
                    ),
                    0,
                ),
            }
        )
        # 计算具有最高概率的标记并获取对应的单词
        sampled_token_index = tf.argmax(predictions[0, i, :])
        sampled_token = VOCAB[sampled_token_index.numpy()]
        # 如果采样的标记是结束标记,则停止生成并返回句子
        if tf.equal(sampled_token_index, VOCAB.index("[end]")):
            break
        decoded_sentence += sampled_token + " "
        tokenized_target_sentence = tf.concat(
            [tokenized_target_sentence, [sampled_token_index]], 0
        )

    return decoded_sentence


decode_sentence("Where have you been all this time?")
'i m sorry .'

结论

这个例子展示了如何使用FNet模型进行训练和推理。 要深入了解架构或进行进一步阅读,您可以参考:

  1. FNet:使用傅里叶变换混合标记 (Lee-Thorp et al., 2021)
  2. Attention Is All You Need (Vaswani et al., 2017)

感谢François Chollet提供的Keras示例, 用序列到序列的Transformer进行英西翻译, 从中提取了解码器的实现。