多步骤查询引擎
MultiStepQueryEngine 将一个复杂查询分解为顺序子问题。
为了回答查询:作者在哪个城市创立了他的第一家公司 Viaweb?,我们需要按顺序回答以下子问题:
- 创建了他的第一家公司 Viaweb 的作者是谁?
- 保罗·格雷厄姆在哪个城市创立了他的第一家公司 Viaweb?
例如,每一步的答案(子查询1)被用来生成下一步的问题(子查询2),这些步骤是按顺序创建而非一次性生成的。
在本笔记本中,我们将使用工作流通过 MultiStepQueryEngine 实现相同的功能。
!pip install -U llama-indeximport os
os.environ["OPENAI_API_KEY"] = "sk-..."Since workflows are async first, this all runs fine in a notebook. If you were running in your own code, you would want to use asyncio.run() to start an async event loop if one isn’t already running.
async def main(): <async code>
if __name__ == "__main__": import asyncio asyncio.run(main())MultiStepQueryEngine 包含一些明确定义的步骤
- 索引数据,创建索引。
- 创建多个子查询来回答该查询。
- 综合最终响应
考虑到这一点,我们可以创建事件和工作流步骤来遵循这个流程!
为了处理这些步骤,我们需要定义 QueryMultiStepEvent
其他步骤将使用内置的 StartEvent 和 StopEvent 事件。
from llama_index.core.workflow import Eventfrom typing import Dict, List, Anyfrom llama_index.core.schema import NodeWithScore
class QueryMultiStepEvent(Event): """ Event containing results of a multi-step query process.
Attributes: nodes (List[NodeWithScore]): List of nodes with their associated scores. source_nodes (List[NodeWithScore]): List of source nodes with their scores. final_response_metadata (Dict[str, Any]): Metadata associated with the final response. """
nodes: List[NodeWithScore] source_nodes: List[NodeWithScore] final_response_metadata: Dict[str, Any][nltk_data] Downloading package punkt_tab to[nltk_data] /Users/ravithejad/Desktop/llamaindex/lib/python3.9/sit[nltk_data] e-packages/llama_index/core/_static/nltk_cache...[nltk_data] Package punkt_tab is already up-to-date!from llama_index.core.indices.query.query_transform.base import ( StepDecomposeQueryTransform,)from llama_index.core.response_synthesizers import ( get_response_synthesizer,)
from llama_index.core.schema import QueryBundle, TextNode
from llama_index.core.workflow import ( Context, Workflow, StartEvent, StopEvent, step,)
from llama_index.core import Settingsfrom llama_index.core.llms import LLM
from typing import castfrom IPython.display import Markdown, display
class MultiStepQueryEngineWorkflow(Workflow): def combine_queries( self, query_bundle: QueryBundle, prev_reasoning: str, index_summary: str, llm: LLM, ) -> QueryBundle: """Combine queries using StepDecomposeQueryTransform.""" transform_metadata = { "prev_reasoning": prev_reasoning, "index_summary": index_summary, } return StepDecomposeQueryTransform(llm=llm)( query_bundle, metadata=transform_metadata )
def default_stop_fn(self, stop_dict: Dict) -> bool: """Stop function for multi-step query combiner.""" query_bundle = cast(QueryBundle, stop_dict.get("query_bundle")) if query_bundle is None: raise ValueError("Response must be provided to stop function.")
return "none" in query_bundle.query_str.lower()
@step async def query_multistep( self, ctx: Context, ev: StartEvent ) -> QueryMultiStepEvent: """Execute multi-step query process.""" prev_reasoning = "" cur_response = None should_stop = False cur_steps = 0
# use response final_response_metadata: Dict[str, Any] = {"sub_qa": []}
text_chunks = [] source_nodes = []
query = ev.get("query") await ctx.store.set("query", ev.get("query"))
llm = Settings.llm stop_fn = self.default_stop_fn
num_steps = ev.get("num_steps") query_engine = ev.get("query_engine") index_summary = ev.get("index_summary")
while not should_stop: if num_steps is not None and cur_steps >= num_steps: should_stop = True break elif should_stop: break
updated_query_bundle = self.combine_queries( QueryBundle(query_str=query), prev_reasoning, index_summary, llm, )
print( f"Created query for the step - {cur_steps} is: {updated_query_bundle}" )
stop_dict = {"query_bundle": updated_query_bundle} if stop_fn(stop_dict): should_stop = True break
cur_response = query_engine.query(updated_query_bundle)
# append to response builder cur_qa_text = ( f"\nQuestion: {updated_query_bundle.query_str}\n" f"Answer: {cur_response!s}" ) text_chunks.append(cur_qa_text) for source_node in cur_response.source_nodes: source_nodes.append(source_node) # update metadata final_response_metadata["sub_qa"].append( (updated_query_bundle.query_str, cur_response) )
prev_reasoning += ( f"- {updated_query_bundle.query_str}\n" f"- {cur_response!s}\n" ) cur_steps += 1
nodes = [ NodeWithScore(node=TextNode(text=text_chunk)) for text_chunk in text_chunks ] return QueryMultiStepEvent( nodes=nodes, source_nodes=source_nodes, final_response_metadata=final_response_metadata, )
@step async def synthesize( self, ctx: Context, ev: QueryMultiStepEvent ) -> StopEvent: """Synthesize the response.""" response_synthesizer = get_response_synthesizer() query = await ctx.store.get("query", default=None) final_response = await response_synthesizer.asynthesize( query=query, nodes=ev.nodes, additional_source_nodes=ev.source_nodes, ) final_response.metadata = ev.final_response_metadata
return StopEvent(result=final_response)!mkdir -p 'data/paul_graham/'!wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/examples/data/paul_graham/paul_graham_essay.txt' -O 'data/paul_graham/paul_graham_essay.txt'--2024-08-26 14:16:04-- https://raw.githubusercontent.com/run-llama/llama_index/main/docs/examples/data/paul_graham/paul_graham_essay.txtResolving raw.githubusercontent.com (raw.githubusercontent.com)... 2606:50c0:8000::154, 2606:50c0:8002::154, 2606:50c0:8001::154, ...Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|2606:50c0:8000::154|:443... connected.HTTP request sent, awaiting response... 200 OKLength: 75042 (73K) [text/plain]Saving to: ‘data/paul_graham/paul_graham_essay.txt’
data/paul_graham/pa 100%[===================>] 73.28K --.-KB/s in 0.01s
2024-08-26 14:16:04 (6.91 MB/s) - ‘data/paul_graham/paul_graham_essay.txt’ saved [75042/75042]from llama_index.core import SimpleDirectoryReader
documents = SimpleDirectoryReader("data/paul_graham").load_data()from llama_index.llms.openai import OpenAI
llm = OpenAI(model="gpt-4")
Settings.llm = llmfrom llama_index.core import VectorStoreIndex
index = VectorStoreIndex.from_documents( documents=documents,)
query_engine = index.as_query_engine()w = MultiStepQueryEngineWorkflow(timeout=200)# Sets maximum number of steps taken to answer the query.num_steps = 3
# Set summary of the index, useful to create modified query at each step.index_summary = "Used to answer questions about the author"query = "In which city did the author found his first company, Viaweb?"result = await w.run( query=query, query_engine=query_engine, index_summary=index_summary, num_steps=num_steps,)
# If created query in a step is None, the process will be stopped.
display( Markdown("> Question: {}".format(query)), Markdown("Answer: {}".format(result)),)Created query for the step - 0 is: Who is the author who founded Viaweb?Created query for the step - 1 is: In which city did Paul Graham found his first company, Viaweb?Created query for the step - 2 is: None问题:作者在哪个城市创立了他的第一家公司 Viaweb?
答案:作者在剑桥创立了他的第一家公司 Viaweb。
sub_qa = result.metadata["sub_qa"]tuples = [(t[0], t[1].response) for t in sub_qa]display(Markdown(f"{tuples}"))[('创建Viaweb的作者是谁?', '创建Viaweb的作者是Paul Graham。'), ('Paul Graham在哪个城市创办了他的第一家公司Viaweb?', 'Paul Graham在剑桥创办了他的第一家公司Viaweb。')]