使用模式#
使用模式指南更深入地介绍了QueryPipeline
的设置和使用。
设置一个处理流程#
这里我们将介绍几种不同的查询管道设置方法。
定义顺序链#
一些简单的流程本质上是纯线性的——前一模块的输出直接作为下一模块的输入。
一些示例:
- 提示 -> 大语言模型 -> 输出解析
- 提示词 -> 大语言模型 -> 提示词 -> 大语言模型
- 检索器 -> 响应合成器
这些工作流可以通过简化的chain
语法轻松地在QueryPipeline
中表达。
from llama_index.core.query_pipeline import QueryPipeline
# try chaining basic prompts
prompt_str = "Please generate related movies to {movie_name}"
prompt_tmpl = PromptTemplate(prompt_str)
llm = OpenAI(model="gpt-3.5-turbo")
p = QueryPipeline(chain=[prompt_tmpl, llm], verbose=True)
定义DAG#
许多流程需要您设置一个DAG(例如,如果您想实现标准RAG流程中的所有步骤)。
这里我们提供了一个底层API,用于添加模块及其键,并定义前一个模块输出与下一个模块输入之间的链接。
from llama_index.postprocessor.cohere_rerank import CohereRerank
from llama_index.core.response_synthesizers import TreeSummarize
# define modules
prompt_str = "Please generate a question about Paul Graham's life regarding the following topic {topic}"
prompt_tmpl = PromptTemplate(prompt_str)
llm = OpenAI(model="gpt-3.5-turbo")
retriever = index.as_retriever(similarity_top_k=3)
reranker = CohereRerank()
summarizer = TreeSummarize(llm=llm)
# define query pipeline
p = QueryPipeline(verbose=True)
p.add_modules(
{
"llm": llm,
"prompt_tmpl": prompt_tmpl,
"retriever": retriever,
"summarizer": summarizer,
"reranker": reranker,
}
)
p.add_link("prompt_tmpl", "llm")
p.add_link("llm", "retriever")
p.add_link("retriever", "reranker", dest_key="nodes")
p.add_link("llm", "reranker", dest_key="query_str")
p.add_link("reranker", "summarizer", dest_key="nodes")
p.add_link("llm", "summarizer", dest_key="query_str")
运行流程#
单输入/单输出#
输入是第一个组件的kwargs。
如果最后一个组件的输出是单个对象(而不是对象字典),那么我们直接返回该对象。
以上一个示例中的流程为例,由于最后一步是TreeSummarize
响应合成模块,输出将是一个Response
对象。
output = p.run(topic="YC")
# output type is Response
type(output)
多输入/多输出#
如果你的DAG有多个根节点或输出节点,可以尝试run_multi
。传入一个包含模块键->输入字典的输入字典。输出是模块键->输出字典的字典。
如果我们运行前面的示例,
output_dict = p.run_multi({"llm": {"topic": "YC"}})
print(output_dict)
# output dict is {"summarizer": {"output": response}}
定义部分内容#
如果你想为某个模块预填充某些输入,可以使用partial
实现!这样DAG只需接入未填充的输入即可。
你可能需要通过as_query_component
转换一个模块。
这是一个示例:
summarizer = TreeSummarize(llm=llm)
summarizer_c = summarizer.as_query_component(partial={"nodes": nodes})
# can define a chain because llm output goes into query_str, nodes is pre-filled
p = QueryPipeline(chain=[prompt_tmpl, llm, summarizer_c])
# run pipeline
p.run(topic="YC")
批量输入#
如果您希望为多轮单/多输入运行管道,请在函数调用中设置batch=True
- 该功能由run
、arun
、run_multi
和arun_multi
支持。传入您想要运行的单个/多个输入列表。batch
模式将按照输入顺序返回响应列表。
单输入/单输出示例:p.run(field=[in1: Any, in2: Any], batch=True)
--> [out1: Any, out2: Any]
output = p.run(topic=["YC", "RAG", "LlamaIndex"], batch=True)
# output is [ResponseYC, ResponseRAG, ResponseLlamaIndex]
print(output)
多输入/多输出示例:p.run_multi("root_node": {"field": [in1: Any, in2, Any]}, batch=True)
--> {"output_node": {"field": [out1: Any, out2: Any]}}
output_dict = p.run_multi({"llm": {"topic": ["YC", "RAG", "LlamaIndex"]}})
print(output_dict)
# output dict is {"summarizer": {"output": [ResponseYC, ResponseRAG, ResponseLlamaIndex]}}
中间输出#
如果你想获取QueryPipeline中模块的中间输出,可以使用run_with_intermediates
或run_multi_with_intermediates
,分别对应单输入和多输入的情况。
输出将是一个元组,包含正常输出和一个字典,字典中模块键对应ComponentIntermediates
。ComponentIntermediates有2个字段:inputs
字典和outputs
字典。
output, intermediates = p.run_with_intermediates(topic="YC")
print(output)
print(intermediates)
# output is (Response, {"module_key": ComponentIntermediates("inputs": {}, "outputs": {})})
定义自定义查询组件#
您可以轻松定义一个自定义组件:既可以通过向FnComponent
传递函数,也可以继承CustomQueryComponent
类来实现。
向FnComponent
传递函数#
定义任意函数并将其传递给FnComponent
。位置参数名称(args
)将被转换为必需的输入键,而关键字参数名称(kwargs
)将被转换为可选的输入键。
注意:我们假设只有一个输出。
from llama_index.core.query_pipeline import FnComponent
def add(a: int, b: int) -> int:
"""Adds two numbers."""
return a + b
add_component = FnComponent(fn=add, output_key="output")
# input keys to add_component are "a" and "b", output key is 'output'
继承CustomQueryComponent
类#
只需继承CustomQueryComponent
类,实现验证/运行函数及一些辅助方法,然后将其接入系统即可。
from llama_index.core.query_pipeline import CustomQueryComponent
from typing import Dict, Any
class MyComponent(CustomQueryComponent):
"""My component."""
# Pydantic class, put any attributes here
...
def _validate_component_inputs(
self, input: Dict[str, Any]
) -> Dict[str, Any]:
"""Validate component inputs during run_component."""
# NOTE: this is OPTIONAL but we show you here how to do validation as an example
return input
@property
def _input_keys(self) -> set:
"""Input keys dict."""
return {"input_key1", ...}
@property
def _output_keys(self) -> set:
# can do multi-outputs too
return {"output_key"}
def _run_component(self, **kwargs) -> Dict[str, Any]:
"""Run the component."""
# run logic
...
return {"output_key": result}
更多详情请查看我们的深度查询转换指南。
确保输出兼容#
通过在QueryPipeline
中连接模块,一个模块的输出会成为下一个模块的输入。
通常你必须确保链接能够正常工作,预期的输出和输入类型大致匹配。
我们说"大致"是因为我们对现有模块做了一些特殊处理,确保"可字符串化"的输出能够传入可被查询为"字符串"的输入。某些输出类型被视为可字符串化 - CompletionResponse
、ChatResponse
、Response
、QueryBundle
等。检索器/查询引擎会自动将string
输入转换为QueryBundle
对象。
这使您能够执行某些工作流程,如果自行编写,通常需要样板字符串转换,例如,
- LLM -> 提示词, LLM -> 检索器, LLM -> 查询引擎
- 查询引擎 -> 提示, 查询引擎 -> 检索器
如果您正在定义一个自定义组件,您应该使用_validate_component_inputs
来确保输入是正确的类型,如果不是则抛出错误。