跳至内容

使用模式#

使用模式指南更深入地介绍了QueryPipeline的设置和使用。

设置一个处理流程#

这里我们将介绍几种不同的查询管道设置方法。

定义顺序链#

一些简单的流程本质上是纯线性的——前一模块的输出直接作为下一模块的输入。

一些示例:

  • 提示 -> 大语言模型 -> 输出解析
  • 提示词 -> 大语言模型 -> 提示词 -> 大语言模型
  • 检索器 -> 响应合成器

这些工作流可以通过简化的chain语法轻松地在QueryPipeline中表达。

from llama_index.core.query_pipeline import QueryPipeline

# try chaining basic prompts
prompt_str = "Please generate related movies to {movie_name}"
prompt_tmpl = PromptTemplate(prompt_str)
llm = OpenAI(model="gpt-3.5-turbo")

p = QueryPipeline(chain=[prompt_tmpl, llm], verbose=True)

定义DAG#

许多流程需要您设置一个DAG(例如,如果您想实现标准RAG流程中的所有步骤)。

这里我们提供了一个底层API,用于添加模块及其键,并定义前一个模块输出与下一个模块输入之间的链接。

from llama_index.postprocessor.cohere_rerank import CohereRerank
from llama_index.core.response_synthesizers import TreeSummarize

# define modules
prompt_str = "Please generate a question about Paul Graham's life regarding the following topic {topic}"
prompt_tmpl = PromptTemplate(prompt_str)
llm = OpenAI(model="gpt-3.5-turbo")
retriever = index.as_retriever(similarity_top_k=3)
reranker = CohereRerank()
summarizer = TreeSummarize(llm=llm)

# define query pipeline
p = QueryPipeline(verbose=True)
p.add_modules(
    {
        "llm": llm,
        "prompt_tmpl": prompt_tmpl,
        "retriever": retriever,
        "summarizer": summarizer,
        "reranker": reranker,
    }
)
p.add_link("prompt_tmpl", "llm")
p.add_link("llm", "retriever")
p.add_link("retriever", "reranker", dest_key="nodes")
p.add_link("llm", "reranker", dest_key="query_str")
p.add_link("reranker", "summarizer", dest_key="nodes")
p.add_link("llm", "summarizer", dest_key="query_str")

运行流程#

单输入/单输出#

输入是第一个组件的kwargs。

如果最后一个组件的输出是单个对象(而不是对象字典),那么我们直接返回该对象。

以上一个示例中的流程为例,由于最后一步是TreeSummarize响应合成模块,输出将是一个Response对象。

output = p.run(topic="YC")
# output type is Response
type(output)

多输入/多输出#

如果你的DAG有多个根节点或输出节点,可以尝试run_multi。传入一个包含模块键->输入字典的输入字典。输出是模块键->输出字典的字典。

如果我们运行前面的示例,

output_dict = p.run_multi({"llm": {"topic": "YC"}})
print(output_dict)

# output dict is {"summarizer": {"output": response}}

定义部分内容#

如果你想为某个模块预填充某些输入,可以使用partial实现!这样DAG只需接入未填充的输入即可。

你可能需要通过as_query_component转换一个模块。

这是一个示例:

summarizer = TreeSummarize(llm=llm)
summarizer_c = summarizer.as_query_component(partial={"nodes": nodes})
# can define a chain because llm output goes into query_str, nodes is pre-filled
p = QueryPipeline(chain=[prompt_tmpl, llm, summarizer_c])
# run pipeline
p.run(topic="YC")

批量输入#

如果您希望为多轮单/多输入运行管道,请在函数调用中设置batch=True - 该功能由runarunrun_multiarun_multi支持。传入您想要运行的单个/多个输入列表。batch模式将按照输入顺序返回响应列表。

单输入/单输出示例:p.run(field=[in1: Any, in2: Any], batch=True) --> [out1: Any, out2: Any]

output = p.run(topic=["YC", "RAG", "LlamaIndex"], batch=True)
# output is [ResponseYC, ResponseRAG, ResponseLlamaIndex]
print(output)

多输入/多输出示例:p.run_multi("root_node": {"field": [in1: Any, in2, Any]}, batch=True) --> {"output_node": {"field": [out1: Any, out2: Any]}}

output_dict = p.run_multi({"llm": {"topic": ["YC", "RAG", "LlamaIndex"]}})
print(output_dict)

# output dict is {"summarizer": {"output": [ResponseYC, ResponseRAG, ResponseLlamaIndex]}}

中间输出#

如果你想获取QueryPipeline中模块的中间输出,可以使用run_with_intermediatesrun_multi_with_intermediates,分别对应单输入和多输入的情况。

输出将是一个元组,包含正常输出和一个字典,字典中模块键对应ComponentIntermediates。ComponentIntermediates有2个字段:inputs字典和outputs字典。

output, intermediates = p.run_with_intermediates(topic="YC")
print(output)
print(intermediates)

# output is (Response, {"module_key": ComponentIntermediates("inputs": {}, "outputs": {})})

定义自定义查询组件#

您可以轻松定义一个自定义组件:既可以通过向FnComponent传递函数,也可以继承CustomQueryComponent类来实现。

FnComponent传递函数#

定义任意函数并将其传递给FnComponent。位置参数名称(args)将被转换为必需的输入键,而关键字参数名称(kwargs)将被转换为可选的输入键。

注意:我们假设只有一个输出。

from llama_index.core.query_pipeline import FnComponent


def add(a: int, b: int) -> int:
    """Adds two numbers."""
    return a + b


add_component = FnComponent(fn=add, output_key="output")

# input keys to add_component are "a" and "b", output key is 'output'

继承CustomQueryComponent#

只需继承CustomQueryComponent类,实现验证/运行函数及一些辅助方法,然后将其接入系统即可。

from llama_index.core.query_pipeline import CustomQueryComponent
from typing import Dict, Any


class MyComponent(CustomQueryComponent):
    """My component."""

    # Pydantic class, put any attributes here
    ...

    def _validate_component_inputs(
        self, input: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Validate component inputs during run_component."""
        # NOTE: this is OPTIONAL but we show you here how to do validation as an example
        return input

    @property
    def _input_keys(self) -> set:
        """Input keys dict."""
        return {"input_key1", ...}

    @property
    def _output_keys(self) -> set:
        # can do multi-outputs too
        return {"output_key"}

    def _run_component(self, **kwargs) -> Dict[str, Any]:
        """Run the component."""
        # run logic
        ...
        return {"output_key": result}

更多详情请查看我们的深度查询转换指南

确保输出兼容#

通过在QueryPipeline中连接模块,一个模块的输出会成为下一个模块的输入。

通常你必须确保链接能够正常工作,预期的输出和输入类型大致匹配。

我们说"大致"是因为我们对现有模块做了一些特殊处理,确保"可字符串化"的输出能够传入可被查询为"字符串"的输入。某些输出类型被视为可字符串化 - CompletionResponseChatResponseResponseQueryBundle等。检索器/查询引擎会自动将string输入转换为QueryBundle对象。

这使您能够执行某些工作流程,如果自行编写,通常需要样板字符串转换,例如,

  • LLM -> 提示词, LLM -> 检索器, LLM -> 查询引擎
  • 查询引擎 -> 提示, 查询引擎 -> 检索器

如果您正在定义一个自定义组件,您应该使用_validate_component_inputs来确保输入是正确的类型,如果不是则抛出错误。

优云智算