多步查询引擎¶
MultiStepQueryEngine
将复杂查询分解为一系列顺序子问题。
要回答查询:作者在哪个城市创办了他的第一家公司Viaweb?我们需要依次回答以下子问题:
- 哪位作者创立了他的第一家公司Viaweb?
- 保罗·格雷厄姆在哪个城市创办了他的第一家公司Viaweb?
例如,每个步骤的答案(子查询1)用于生成下一步的问题(子查询2),这些步骤是按顺序创建而非一次性生成的。
在本笔记本中,我们将使用工作流通过MultiStepQueryEngine实现相同的功能。
In [ ]:
Copied!
!pip install -U llama-index
!pip install -U llama-index
In [ ]:
Copied!
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
由于工作流默认采用异步优先设计,这一切在笔记本环境中都能顺畅运行。若您在自己的代码中执行,当不存在已激活的异步事件循环时,您需要使用asyncio.run()
来启动一个异步事件循环。
async def main():
<async code>
if __name__ == "__main__":
import asyncio
asyncio.run(main())
定义事件¶
In [ ]:
Copied!
from llama_index.core.workflow import Event
from typing import Dict, List, Any
from llama_index.core.schema import NodeWithScore
class QueryMultiStepEvent(Event):
"""
Event containing results of a multi-step query process.
Attributes:
nodes (List[NodeWithScore]): List of nodes with their associated scores.
source_nodes (List[NodeWithScore]): List of source nodes with their scores.
final_response_metadata (Dict[str, Any]): Metadata associated with the final response.
"""
nodes: List[NodeWithScore]
source_nodes: List[NodeWithScore]
final_response_metadata: Dict[str, Any]
from llama_index.core.workflow import Event
from typing import Dict, List, Any
from llama_index.core.schema import NodeWithScore
class QueryMultiStepEvent(Event):
"""
包含多步查询过程结果的事件。
属性:
nodes (List[NodeWithScore]): 带有关联分数的节点列表。
source_nodes (List[NodeWithScore]): 带有分数的源节点列表。
final_response_metadata (Dict[str, Any]): 与最终响应关联的元数据。
"""
nodes: List[NodeWithScore]
source_nodes: List[NodeWithScore]
final_response_metadata: Dict[str, Any]
[nltk_data] Downloading package punkt_tab to [nltk_data] /Users/ravithejad/Desktop/llamaindex/lib/python3.9/sit [nltk_data] e-packages/llama_index/core/_static/nltk_cache... [nltk_data] Package punkt_tab is already up-to-date!
定义工作流程¶
In [ ]:
Copied!
from llama_index.core.indices.query.query_transform.base import (
StepDecomposeQueryTransform,
)
from llama_index.core.response_synthesizers import (
get_response_synthesizer,
)
from llama_index.core.schema import QueryBundle, TextNode
from llama_index.core.workflow import (
Context,
Workflow,
StartEvent,
StopEvent,
step,
)
from llama_index.core import Settings
from llama_index.core.llms import LLM
from typing import cast
from IPython.display import Markdown, display
class MultiStepQueryEngineWorkflow(Workflow):
def combine_queries(
self,
query_bundle: QueryBundle,
prev_reasoning: str,
index_summary: str,
llm: LLM,
) -> QueryBundle:
"""Combine queries using StepDecomposeQueryTransform."""
transform_metadata = {
"prev_reasoning": prev_reasoning,
"index_summary": index_summary,
}
return StepDecomposeQueryTransform(llm=llm)(
query_bundle, metadata=transform_metadata
)
def default_stop_fn(self, stop_dict: Dict) -> bool:
"""Stop function for multi-step query combiner."""
query_bundle = cast(QueryBundle, stop_dict.get("query_bundle"))
if query_bundle is None:
raise ValueError("Response must be provided to stop function.")
return "none" in query_bundle.query_str.lower()
@step
async def query_multistep(
self, ctx: Context, ev: StartEvent
) -> QueryMultiStepEvent:
"""Execute multi-step query process."""
prev_reasoning = ""
cur_response = None
should_stop = False
cur_steps = 0
# use response
final_response_metadata: Dict[str, Any] = {"sub_qa": []}
text_chunks = []
source_nodes = []
query = ev.get("query")
await ctx.set("query", ev.get("query"))
llm = Settings.llm
stop_fn = self.default_stop_fn
num_steps = ev.get("num_steps")
query_engine = ev.get("query_engine")
index_summary = ev.get("index_summary")
while not should_stop:
if num_steps is not None and cur_steps >= num_steps:
should_stop = True
break
elif should_stop:
break
updated_query_bundle = self.combine_queries(
QueryBundle(query_str=query),
prev_reasoning,
index_summary,
llm,
)
print(
f"Created query for the step - {cur_steps} is: {updated_query_bundle}"
)
stop_dict = {"query_bundle": updated_query_bundle}
if stop_fn(stop_dict):
should_stop = True
break
cur_response = query_engine.query(updated_query_bundle)
# append to response builder
cur_qa_text = (
f"\nQuestion: {updated_query_bundle.query_str}\n"
f"Answer: {cur_response!s}"
)
text_chunks.append(cur_qa_text)
for source_node in cur_response.source_nodes:
source_nodes.append(source_node)
# update metadata
final_response_metadata["sub_qa"].append(
(updated_query_bundle.query_str, cur_response)
)
prev_reasoning += (
f"- {updated_query_bundle.query_str}\n" f"- {cur_response!s}\n"
)
cur_steps += 1
nodes = [
NodeWithScore(node=TextNode(text=text_chunk))
for text_chunk in text_chunks
]
return QueryMultiStepEvent(
nodes=nodes,
source_nodes=source_nodes,
final_response_metadata=final_response_metadata,
)
@step
async def synthesize(
self, ctx: Context, ev: QueryMultiStepEvent
) -> StopEvent:
"""Synthesize the response."""
response_synthesizer = get_response_synthesizer()
query = await ctx.get("query", default=None)
final_response = await response_synthesizer.asynthesize(
query=query,
nodes=ev.nodes,
additional_source_nodes=ev.source_nodes,
)
final_response.metadata = ev.final_response_metadata
return StopEvent(result=final_response)
from llama_index.core.indices.query.query_transform.base import (
StepDecomposeQueryTransform,
)
from llama_index.core.response_synthesizers import (
get_response_synthesizer,
)
from llama_index.core.schema import QueryBundle, TextNode
from llama_index.core.workflow import (
Context,
Workflow,
StartEvent,
StopEvent,
step,
)
from llama_index.core import Settings
from llama_index.core.llms import LLM
from typing import cast
from IPython.display import Markdown, display
class MultiStepQueryEngineWorkflow(Workflow):
def combine_queries(
self,
query_bundle: QueryBundle,
prev_reasoning: str,
index_summary: str,
llm: LLM,
) -> QueryBundle:
"""使用StepDecomposeQueryTransform组合查询"""
transform_metadata = {
"prev_reasoning": prev_reasoning,
"index_summary": index_summary,
}
return StepDecomposeQueryTransform(llm=llm)(
query_bundle, metadata=transform_metadata
)
def default_stop_fn(self, stop_dict: Dict) -> bool:
"""多步查询组合器的停止函数"""
query_bundle = cast(QueryBundle, stop_dict.get("query_bundle"))
if query_bundle is None:
raise ValueError("必须向停止函数提供响应")
return "none" in query_bundle.query_str.lower()
@step
async def query_multistep(
self, ctx: Context, ev: StartEvent
) -> QueryMultiStepEvent:
"""执行多步查询流程"""
prev_reasoning = ""
cur_response = None
should_stop = False
cur_steps = 0
# 使用响应
final_response_metadata: Dict[str, Any] = {"sub_qa": []}
text_chunks = []
source_nodes = []
query = ev.get("query")
await ctx.set("query", ev.get("query"))
llm = Settings.llm
stop_fn = self.default_stop_fn
num_steps = ev.get("num_steps")
query_engine = ev.get("query_engine")
index_summary = ev.get("index_summary")
while not should_stop:
if num_steps is not None and cur_steps >= num_steps:
should_stop = True
break
elif should_stop:
break
updated_query_bundle = self.combine_queries(
QueryBundle(query_str=query),
prev_reasoning,
index_summary,
llm,
)
print(
f"为步骤 {cur_steps} 创建的查询是: {updated_query_bundle}"
)
stop_dict = {"query_bundle": updated_query_bundle}
if stop_fn(stop_dict):
should_stop = True
break
cur_response = query_engine.query(updated_query_bundle)
# 添加到响应构建器
cur_qa_text = (
f"\n问题: {updated_query_bundle.query_str}\n"
f"答案: {cur_response!s}"
)
text_chunks.append(cur_qa_text)
for source_node in cur_response.source_nodes:
source_nodes.append(source_node)
# 更新元数据
final_response_metadata["sub_qa"].append(
(updated_query_bundle.query_str, cur_response)
)
prev_reasoning += (
f"- {updated_query_bundle.query_str}\n" f"- {cur_response!s}\n"
)
cur_steps += 1
nodes = [
NodeWithScore(node=TextNode(text=text_chunk))
for text_chunk in text_chunks
]
return QueryMultiStepEvent(
nodes=nodes,
source_nodes=source_nodes,
final_response_metadata=final_response_metadata,
)
@step
async def synthesize(
self, ctx: Context, ev: QueryMultiStepEvent
) -> StopEvent:
"""合成响应"""
response_synthesizer = get_response_synthesizer()
query = await ctx.get("query", default=None)
final_response = await response_synthesizer.asynthesize(
query=query,
nodes=ev.nodes,
additional_source_nodes=ev.source_nodes,
)
final_response.metadata = ev.final_response_metadata
return StopEvent(result=final_response)
下载数据¶
In [ ]:
Copied!
!mkdir -p 'data/paul_graham/'
!wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt' -O 'data/paul_graham/paul_graham_essay.txt'
!mkdir -p 'data/paul_graham/'
!wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt' -O 'data/paul_graham/paul_graham_essay.txt'
--2024-08-26 14:16:04-- https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 2606:50c0:8000::154, 2606:50c0:8002::154, 2606:50c0:8001::154, ... Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|2606:50c0:8000::154|:443... connected. HTTP request sent, awaiting response... 200 OK Length: 75042 (73K) [text/plain] Saving to: ‘data/paul_graham/paul_graham_essay.txt’ data/paul_graham/pa 100%[===================>] 73.28K --.-KB/s in 0.01s 2024-08-26 14:16:04 (6.91 MB/s) - ‘data/paul_graham/paul_graham_essay.txt’ saved [75042/75042]
加载数据¶
In [ ]:
Copied!
from llama_index.core import SimpleDirectoryReader
documents = SimpleDirectoryReader("data/paul_graham").load_data()
from llama_index.core import SimpleDirectoryReader
documents = SimpleDirectoryReader("data/paul_graham").load_data()
设置LLM¶
In [ ]:
Copied!
from llama_index.llms.openai import OpenAI
llm = OpenAI(model="gpt-4")
Settings.llm = llm
from llama_index.llms.openai import OpenAI
llm = OpenAI(model="gpt-4")
Settings.llm = llm
创建索引和查询引擎¶
In [ ]:
Copied!
from llama_index.core import VectorStoreIndex
index = VectorStoreIndex.from_documents(
documents=documents,
)
query_engine = index.as_query_engine()
from llama_index.core import VectorStoreIndex
index = VectorStoreIndex.from_documents(
documents=documents,
)
query_engine = index.as_query_engine()
运行工作流程!¶
In [ ]:
Copied!
w = MultiStepQueryEngineWorkflow(timeout=200)
w = MultiStepQueryEngineWorkflow(timeout=200)
设置参数¶
In [ ]:
Copied!
# Sets maximum number of steps taken to answer the query.
num_steps = 3
# Set summary of the index, useful to create modified query at each step.
index_summary = "Used to answer questions about the author"
# 设置回答查询的最大步骤数。
num_steps = 3
# 设置索引摘要,有助于在每个步骤创建修改后的查询。
index_summary = "用于回答关于作者的问题"
使用查询进行测试¶
In [ ]:
Copied!
query = "In which city did the author found his first company, Viaweb?"
query = "作者在哪个城市创办了他的第一家公司Viaweb?"
结果¶
In [ ]:
Copied!
result = await w.run(
query=query,
query_engine=query_engine,
index_summary=index_summary,
num_steps=num_steps,
)
# If created query in a step is None, the process will be stopped.
display(
Markdown("> Question: {}".format(query)),
Markdown("Answer: {}".format(result)),
)
result = await w.run(
query=query,
query_engine=query_engine,
index_summary=index_summary,
num_steps=num_steps,
)
# 如果在步骤中创建的查询为None,该过程将被停止。
display(
Markdown("> 问题: {}".format(query)),
Markdown("答案: {}".format(result)),
)
Created query for the step - 0 is: Who is the author who founded Viaweb? Created query for the step - 1 is: In which city did Paul Graham found his first company, Viaweb? Created query for the step - 2 is: None
问题:作者在哪个城市创立了他的第一家公司Viaweb?
回答:作者在剑桥创立了他的第一家公司Viaweb。
显示创建的步骤查询¶
In [ ]:
Copied!
sub_qa = result.metadata["sub_qa"]
tuples = [(t[0], t[1].response) for t in sub_qa]
display(Markdown(f"{tuples}"))
sub_qa = result.metadata["sub_qa"]
tuples = [(t[0], t[1].response) for t in sub_qa]
display(Markdown(f"{tuples}"))
[('创建Viaweb的作者是谁?', '创建Viaweb的作者是Paul Graham。'), ('Paul Graham在哪个城市创办了他的第一家公司Viaweb?', 'Paul Graham在剑桥创办了他的第一家公司Viaweb。')]