跳转到内容

多步骤查询引擎

MultiStepQueryEngine 将一个复杂查询分解为顺序子问题。

为了回答查询:作者在哪个城市创立了他的第一家公司 Viaweb?,我们需要按顺序回答以下子问题:

  1. 创建了他的第一家公司 Viaweb 的作者是谁?
  2. 保罗·格雷厄姆在哪个城市创立了他的第一家公司 Viaweb?

例如,每一步的答案(子查询1)被用来生成下一步的问题(子查询2),这些步骤是按顺序创建而非一次性生成的。

在本笔记本中,我们将使用工作流通过 MultiStepQueryEngine 实现相同的功能。

!pip install -U llama-index
import os
os.environ["OPENAI_API_KEY"] = "sk-..."

Since workflows are async first, this all runs fine in a notebook. If you were running in your own code, you would want to use asyncio.run() to start an async event loop if one isn’t already running.

async def main():
<async code>
if __name__ == "__main__":
import asyncio
asyncio.run(main())

MultiStepQueryEngine 包含一些明确定义的步骤

  1. 索引数据,创建索引。
  2. 创建多个子查询来回答该查询。
  3. 综合最终响应

考虑到这一点,我们可以创建事件和工作流步骤来遵循这个流程!

为了处理这些步骤,我们需要定义 QueryMultiStepEvent

其他步骤将使用内置的 StartEventStopEvent 事件。

from llama_index.core.workflow import Event
from typing import Dict, List, Any
from llama_index.core.schema import NodeWithScore
class QueryMultiStepEvent(Event):
"""
Event containing results of a multi-step query process.
Attributes:
nodes (List[NodeWithScore]): List of nodes with their associated scores.
source_nodes (List[NodeWithScore]): List of source nodes with their scores.
final_response_metadata (Dict[str, Any]): Metadata associated with the final response.
"""
nodes: List[NodeWithScore]
source_nodes: List[NodeWithScore]
final_response_metadata: Dict[str, Any]
[nltk_data] Downloading package punkt_tab to
[nltk_data] /Users/ravithejad/Desktop/llamaindex/lib/python3.9/sit
[nltk_data] e-packages/llama_index/core/_static/nltk_cache...
[nltk_data] Package punkt_tab is already up-to-date!
from llama_index.core.indices.query.query_transform.base import (
StepDecomposeQueryTransform,
)
from llama_index.core.response_synthesizers import (
get_response_synthesizer,
)
from llama_index.core.schema import QueryBundle, TextNode
from llama_index.core.workflow import (
Context,
Workflow,
StartEvent,
StopEvent,
step,
)
from llama_index.core import Settings
from llama_index.core.llms import LLM
from typing import cast
from IPython.display import Markdown, display
class MultiStepQueryEngineWorkflow(Workflow):
def combine_queries(
self,
query_bundle: QueryBundle,
prev_reasoning: str,
index_summary: str,
llm: LLM,
) -> QueryBundle:
"""Combine queries using StepDecomposeQueryTransform."""
transform_metadata = {
"prev_reasoning": prev_reasoning,
"index_summary": index_summary,
}
return StepDecomposeQueryTransform(llm=llm)(
query_bundle, metadata=transform_metadata
)
def default_stop_fn(self, stop_dict: Dict) -> bool:
"""Stop function for multi-step query combiner."""
query_bundle = cast(QueryBundle, stop_dict.get("query_bundle"))
if query_bundle is None:
raise ValueError("Response must be provided to stop function.")
return "none" in query_bundle.query_str.lower()
@step
async def query_multistep(
self, ctx: Context, ev: StartEvent
) -> QueryMultiStepEvent:
"""Execute multi-step query process."""
prev_reasoning = ""
cur_response = None
should_stop = False
cur_steps = 0
# use response
final_response_metadata: Dict[str, Any] = {"sub_qa": []}
text_chunks = []
source_nodes = []
query = ev.get("query")
await ctx.store.set("query", ev.get("query"))
llm = Settings.llm
stop_fn = self.default_stop_fn
num_steps = ev.get("num_steps")
query_engine = ev.get("query_engine")
index_summary = ev.get("index_summary")
while not should_stop:
if num_steps is not None and cur_steps >= num_steps:
should_stop = True
break
elif should_stop:
break
updated_query_bundle = self.combine_queries(
QueryBundle(query_str=query),
prev_reasoning,
index_summary,
llm,
)
print(
f"Created query for the step - {cur_steps} is: {updated_query_bundle}"
)
stop_dict = {"query_bundle": updated_query_bundle}
if stop_fn(stop_dict):
should_stop = True
break
cur_response = query_engine.query(updated_query_bundle)
# append to response builder
cur_qa_text = (
f"\nQuestion: {updated_query_bundle.query_str}\n"
f"Answer: {cur_response!s}"
)
text_chunks.append(cur_qa_text)
for source_node in cur_response.source_nodes:
source_nodes.append(source_node)
# update metadata
final_response_metadata["sub_qa"].append(
(updated_query_bundle.query_str, cur_response)
)
prev_reasoning += (
f"- {updated_query_bundle.query_str}\n" f"- {cur_response!s}\n"
)
cur_steps += 1
nodes = [
NodeWithScore(node=TextNode(text=text_chunk))
for text_chunk in text_chunks
]
return QueryMultiStepEvent(
nodes=nodes,
source_nodes=source_nodes,
final_response_metadata=final_response_metadata,
)
@step
async def synthesize(
self, ctx: Context, ev: QueryMultiStepEvent
) -> StopEvent:
"""Synthesize the response."""
response_synthesizer = get_response_synthesizer()
query = await ctx.store.get("query", default=None)
final_response = await response_synthesizer.asynthesize(
query=query,
nodes=ev.nodes,
additional_source_nodes=ev.source_nodes,
)
final_response.metadata = ev.final_response_metadata
return StopEvent(result=final_response)
!mkdir -p 'data/paul_graham/'
!wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/examples/data/paul_graham/paul_graham_essay.txt' -O 'data/paul_graham/paul_graham_essay.txt'
--2024-08-26 14:16:04-- https://raw.githubusercontent.com/run-llama/llama_index/main/docs/examples/data/paul_graham/paul_graham_essay.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 2606:50c0:8000::154, 2606:50c0:8002::154, 2606:50c0:8001::154, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|2606:50c0:8000::154|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 75042 (73K) [text/plain]
Saving to: ‘data/paul_graham/paul_graham_essay.txt’
data/paul_graham/pa 100%[===================>] 73.28K --.-KB/s in 0.01s
2024-08-26 14:16:04 (6.91 MB/s) - ‘data/paul_graham/paul_graham_essay.txt’ saved [75042/75042]
from llama_index.core import SimpleDirectoryReader
documents = SimpleDirectoryReader("data/paul_graham").load_data()
from llama_index.llms.openai import OpenAI
llm = OpenAI(model="gpt-4")
Settings.llm = llm
from llama_index.core import VectorStoreIndex
index = VectorStoreIndex.from_documents(
documents=documents,
)
query_engine = index.as_query_engine()
w = MultiStepQueryEngineWorkflow(timeout=200)
# Sets maximum number of steps taken to answer the query.
num_steps = 3
# Set summary of the index, useful to create modified query at each step.
index_summary = "Used to answer questions about the author"
query = "In which city did the author found his first company, Viaweb?"
result = await w.run(
query=query,
query_engine=query_engine,
index_summary=index_summary,
num_steps=num_steps,
)
# If created query in a step is None, the process will be stopped.
display(
Markdown("> Question: {}".format(query)),
Markdown("Answer: {}".format(result)),
)
Created query for the step - 0 is: Who is the author who founded Viaweb?
Created query for the step - 1 is: In which city did Paul Graham found his first company, Viaweb?
Created query for the step - 2 is: None

问题:作者在哪个城市创立了他的第一家公司 Viaweb?

答案:作者在剑桥创立了他的第一家公司 Viaweb。

sub_qa = result.metadata["sub_qa"]
tuples = [(t[0], t[1].response) for t in sub_qa]
display(Markdown(f"{tuples}"))

[('创建Viaweb的作者是谁?', '创建Viaweb的作者是Paul Graham。'), ('Paul Graham在哪个城市创办了他的第一家公司Viaweb?', 'Paul Graham在剑桥创办了他的第一家公司Viaweb。')]