RAG优化#

在本教程中,我们将介绍标准RAG的自动优化:

  • 介绍 HotPotQA 数据集和 HotPotQAData 类。

  • Dspy的Retriever转换为AdalFlow的Retriever以便于比较。

  • 使用RetrieverGenerator组件构建标准的RAG。

  • 学习如何连接组件之间的输出-输入,以实现自动文本梯度优化。

HotPotQA 数据集#

datasets.hotpotqa.HotPotQA 数据集在研究社区中广泛用于基准测试问答和RAG任务。 以下是该数据集的一个示例:

from adalflow.datasets.hotpot_qa import HotPotQA, HotPotQAData

dataset = HotPotQA(split="train", size=20)
print(dataset[0], type(dataset[0]))

输出将是:

HotPotQAData(id='5a8b57f25542995d1e6f1371', question='Were Scott Derrickson and Ed Wood of the same nationality?', answer='yes', gold_titles="{'Scott Derrickson', 'Ed Wood'}")

数据集附带更多文本,您可以从中检索。但在本教程中,我们将使用DsPy库中的retriever,它允许您使用BERT嵌入模型从大型维基百科语料库中检索相关段落。

检索器#

通过以下代码,我们可以轻松地将Dspy的Retriever转换为AdalFlow的Retriever

import adaflow as adal
import dspy

class DspyRetriever(adal.Retriever):
    def __init__(self, top_k: int = 3):
        super().__init__()
        self.top_k = top_k
        self.dspy_retriever = dspy.Retrieve(k=top_k)

    def call(self, input: str, top_k: Optional[int] = None) -> List[adal.RetrieverOutput]:

        k = top_k or self.top_k

        output = self.dspy_retriever(query_or_queries=input, k=k)
        final_output: List[RetrieverOutput] = []
        documents = output.passages

        final_output.append(
            RetrieverOutput(
                query=input,
                documents=documents,
                doc_indices=[],
            )
        )
        return final_output

让我们尝试一个例子:

def test_retriever():
    question = "How many storeys are in the castle that David Gregory inherited?"
    retriever = DspyRetriever(top_k=3)
    retriever_out = retriever(input=question)
    print(f"retriever_out: {retriever_out}")

输出将是:

[RetrieverOutput(doc_indices=[], doc_scores=None, query='How many storeys are in the castle that David Gregory inherited?', documents=['David Gregory (physician) | David Gregory (20 December 1625 – 1720) was a Scottish physician and inventor. His surname is sometimes spelt as Gregorie, the original Scottish spelling. He inherited Kinnairdy Castle in 1664. Three of his twenty-nine children became mathematics professors. He is credited with inventing a military cannon that Isaac Newton described as "being destructive to the human species". Copies and details of the model no longer exist. Gregory\'s use of a barometer to predict farming-related weather conditions led him to be accused of witchcraft by Presbyterian ministers from Aberdeen, although he was never convicted.', 'St. Gregory Hotel | The St. Gregory Hotel is a boutique hotel located in downtown Washington, D.C., in the United States. Established in 2000, the nine-floor hotel has 155 rooms, which includes 54 deluxe rooms, 85 suites with kitchens, and 16 top-floor suites with balconies. The hotel, which changed hands in June 2015, has a life-size statue of Marilyn Monroe in the lobby.', 'Karl D. Gregory Cooperative House | Karl D. Gregory Cooperative House is a member of the Inter-Cooperative Council at the University of Michigan. The structure that stands at 1617 Washtenaw was originally built in 1909 for the Tau Gamma Nu fraternity, but was purchased by the ICC in 1995. Gregory House is the only house in the organization that is expressly substance free. No tobacco, alcohol, or illicit drugs are allowed on the property. Gregory House has a maximum capacity of 29 members (by way of 13 single and 8 double capacity rooms) as of June 2008.'])]

可训练的RAG#

在其他教程中,我们只使用了一个组件 - Generator,因此不需要连接组件之间的输出-输入。 这就是为什么我们用单一的call方法编写了我们的任务管道,该方法同时具有traininginference模式。

之前的任务管道call方法在推理模式下将返回GeneratorOutput,在训练模式下将返回Parameter

def call(
     self, question: str, id: Optional[str] = None
 ) -> Union[adal.GeneratorOutput, adal.Parameter]:
     prompt_kwargs = self._prepare_input(question)
     output = self.llm(prompt_kwargs=prompt_kwargs, id=id)
     return output

上述代码有效,因为Generator组件的__call__方法已经支持训练推理模式。

推理模式

我们可以通过分别使用callforward方法来区分推理模式训练模式。 实现call方法只是自由形式的Python编码。 假设这个类已经在__init__方法中配置了retrieverllm

这是我们的调用方法:

def call(self, question: str, id: str = None) -> adal.GeneratorOutput:
    if self.training:
        raise ValueError(
            "This component is not supposed to be called in training mode"
        )

    retriever_out = self.retriever.call(input=question)

    successor_map_fn = lambda x: (  # noqa E731
        "\n\n".join(x[0].documents) if x and x[0] and x[0].documents else ""
    )
    retrieved_context = successor_map_fn(retriever_out)

    prompt_kwargs = {
        "context": retrieved_context,
        "question": question,
    }

    output = self.llm.call(
        prompt_kwargs=prompt_kwargs,
        id=id,
    )
    return output

retrieverllm组件之间,我们将retriever的输出转换为字符串,并通过prompt_kwargs将其传递给llm组件。

训练模式

在这种情况下,我们需要创建一个包含RetrieverGenerator组件的可训练的RAG管道。 这里特别展示了如何编写RAG组件的forward`(训练)方法。 为了使管道可训练,我们需要在组件之间传递Parameter作为输入和输出。

Generatorfoward 方法将使用 Parameter 来构建动态计算图,我们需要一种方法将 Retriever 的输出转换为 Generator 的输入。 我们通过使用 Parameter 类的 successor_map_fn 来实现这一点。 在这种情况下,Parameterdata 字段保存了 Retriever 的输出。 successor_map_fn 将应用一个映射函数,将 RetrieverParameter 输出转换为 Generator 提示中使用的字符串格式。

这是我们的RAG组件的forward方法:

def forward(self, question: str, id: str = None) -> adal.Parameter:
    if not self.training:
        raise ValueError("This component is not supposed to be called in eval mode")
    retriever_out = self.retriever.forward(input=question)
    successor_map_fn = lambda x: (  # noqa E731
        "\n\n".join(x.data[0].documents)
        if x.data and x.data[0] and x.data[0].documents
        else ""
    )
    retriever_out.add_successor_map_fn(successor=self.llm, map_fn=successor_map_fn)
    generator_out = self.llm.forward(
        prompt_kwargs={"question": question, "context": retriever_out}, id=id
    )
    return generator_out

同一方法中的两种模式

您仍然可以选择将两种方法结合在一起。这里有一个例子:

def bicall(
    self, question: str, id: str = None
) -> Union[adal.GeneratorOutput, adal.Parameter]:
    """You can also combine both the forward and call in the same function.
    Supports both training and eval mode by using __call__ for GradComponents
    like Retriever and Generator
    """
    retriever_out = self.retriever(input=question)
    if isinstance(retriever_out, adal.Parameter):
        successor_map_fn = lambda x: (  # noqa E731
            "\n\n".join(x.data[0].documents)
            if x.data and x.data[0] and x.data[0].documents
            else ""
        )
        retriever_out.add_successor_map_fn(
            successor=self.llm, map_fn=successor_map_fn
        )
    else:
        successor_map_fn = lambda x: (  # noqa E731
            "\n\n".join(x[0].documents) if x and x[0] and x[0].documents else ""
        )
        retrieved_context = successor_map_fn(retriever_out)
    prompt_kwargs = {
        "context": retrieved_context,
        "question": question,
    }
    output = self.llm(prompt_kwargs=prompt_kwargs, id=id)
    return output

可训练参数

在这个任务中,我们将定义两个可训练的参数:一个用于优化任务描述,另一个用于进行少样本学习。

这是我们的任务类:

task_desc_str = r"""Answer questions with short factoid answers.

You will receive context(may contain relevant facts) and a question.
Think step by step."""


class VanillaRAG(adal.GradComponent):
    def __init__(self, passages_per_hop=3, model_client=None, model_kwargs=None):
        super().__init__()

        self.passages_per_hop = passages_per_hop

        self.retriever = DspyRetriever(top_k=passages_per_hop)
        self.llm_parser = adal.DataClassParser(
            data_class=AnswerData, return_data_class=True, format_type="json"
        )
        self.llm = Generator(
            model_client=model_client,
            model_kwargs=model_kwargs,
            prompt_kwargs={
                "task_desc_str": adal.Parameter(
                    data=task_desc_str,
                    role_desc="Task description for the language model",
                    param_type=adal.ParameterType.PROMPT,
                ),
                "few_shot_demos": adal.Parameter(
                    data=None,
                    requires_opt=True,
                    role_desc="To provide few shot demos to the language model",
                    param_type=adal.ParameterType.DEMOS,
                ),
                "output_format_str": self.llm_parser.get_output_format_str(),
            },
            template=answer_template,
            output_processors=self.llm_parser,
            use_cache=True,
        )

准备训练#

首先,我们需要创建一个AdalComponent来帮助配置Trainer

  • __init__: `eval_fnloss_fntask中,并为少样本学习配置teacher generator

以及backward_engineoptimizer用于text_grad优化。

  • 至少,我们需要让Trainer知道(1)如何在两种模式下调用任务管道。

  1. 在推理/评估模式下,如何将最后一个输出(GenereratorOutput)解析为eval_fn的输入。

  2. 在训练模式下,如何将最后一个输出(参数)解析为loss_fn的输入。

这是用于VanillaRAG任务的AdalComponent类:

class VallinaRAGAdal(adal.AdalComponent):
    def __init__(
        self,
        model_client: adal.ModelClient,
        model_kwargs: Dict,
        backward_engine_model_config: Dict | None = None,
        teacher_model_config: Dict | None = None,
        text_optimizer_model_config: Dict | None = None,
    ):
        task = VanillaRAG(
            model_client=model_client,
            model_kwargs=model_kwargs,
            passages_per_hop=3,
        )
        eval_fn = AnswerMatchAcc(type="fuzzy_match").compute_single_item
        loss_fn = adal.EvalFnToTextLoss(
            eval_fn=eval_fn, eval_fn_desc="fuzzy_match: 1 if str(y) in str(y_gt) else 0"
        )
        super().__init__(
            task=task,
            eval_fn=eval_fn,
            loss_fn=loss_fn,
            backward_engine_model_config=backward_engine_model_config,
            teacher_model_config=teacher_model_config,
            text_optimizer_model_config=text_optimizer_model_config,
        )

    # tell the trainer how to call the task
    def prepare_task(self, sample: HotPotQAData) -> Tuple[Callable[..., Any], Dict]:
        if self.task.training:
            return self.task.forward, {"question": sample.question, "id": sample.id}
        else:
            return self.task.call, {"question": sample.question, "id": sample.id}


    # eval mode: get the generator output, directly engage with the eval_fn
    def prepare_eval(self, sample: HotPotQAData, y_pred: adal.GeneratorOutput) -> float:
        y_label = ""
        if y_pred and y_pred.data and y_pred.data.answer:
            y_label = y_pred.data.answer
        return self.eval_fn, {"y": y_label, "y_gt": sample.answer}


    # train mode: get the loss and get the data from the full_response
    def prepare_loss(self, sample: HotPotQAData, pred: adal.Parameter):
        # prepare gt parameter
        y_gt = adal.Parameter(
            name="y_gt",
            data=sample.answer,
            eval_input=sample.answer,
            requires_opt=False,
        )

        # pred's full_response is the output of the task pipeline which is GeneratorOutput
        pred.eval_input = (
            pred.full_response.data.answer
            if pred.full_response
            and pred.full_response.data
            and pred.full_response.data.answer
            else ""
        )
        return self.loss_fn, {"kwargs": {"y": pred, "y_gt": y_gt}}

诊断#

在我们开始训练之前,我们决定诊断管道并分析优化前的当前性能。 以下是代码:

def train_diagnose(
    model_client: adal.ModelClient,
    model_kwargs: Dict,
) -> Dict:

    trainset, valset, testset = load_datasets()

    adal_component = VallinaRAGAdal(
        model_client,
        model_kwargs,
        backward_engine_model_config=gpt_4o_model,
        teacher_model_config=gpt_3_model,
        text_optimizer_model_config=gpt_3_model,
    )
    trainer = adal.Trainer(adaltask=adal_component)
    trainer.diagnose(dataset=trainset, split="train")
    # trainer.diagnose(dataset=valset, split="val")
    # trainer.diagnose(dataset=testset, split="test")

由此,我发现初始评估会认为Yesyes是不同的答案。 我们修复了评估器,使用预测值和真实值的小写形式进行比较。 未经优化的管道在测试集上达到了约\(0.6\)的准确率。

训练#

首先,我们仅对最终生成的答案进行监督训练,而不对检索器进行监督。

API 参考

  • datasets.hotpotqa.HotPotQA