路由查询引擎¶
RouterQueryEngine
从多个选项中选择最合适的查询引擎来处理给定的查询。
本笔记本通过工作流演示了Router Query Engine的实现。
具体来说,我们将实现RouterQueryEngine。
!pip install -U llama-index
import os
os.environ["OPENAI_API_KEY"] = "sk-.."
由于工作流默认采用异步优先设计,这一切在笔记本环境中都能顺畅运行。若您在自己的代码中执行,当不存在已激活的异步事件循环时,您需要使用asyncio.run()
来启动一个异步事件循环。
async def main():
<async code>
if __name__ == "__main__":
import asyncio
asyncio.run(main())
定义事件¶
from llama_index.core.workflow import Event
from llama_index.core.base.base_selector import SelectorResult
from typing import Dict, List, Any
from llama_index.core.base.response.schema import RESPONSE_TYPE
class QueryEngineSelectionEvent(Event):
"""Result of selecting the query engine tools."""
selected_query_engines: SelectorResult
class SynthesizeEvent(Event):
"""Event for synthesizing the response from different query engines."""
result: List[RESPONSE_TYPE]
selected_query_engines: SelectorResult
工作流程¶
selector:
- 它接收一个StartEvent作为输入,并返回一个QueryEngineSelectionEvent。
LLMSingleSelector
/PydanticSingleSelector
/PydanticMultiSelector
将选择一个/多个查询引擎工具。
generate_responses:
该函数使用选定的查询引擎生成响应并返回SynthesizeEvent。
synthesize_responses:
如果选择了多个查询引擎,此函数会合并生成的响应并综合最终响应,否则返回单个生成的响应。
这些步骤将使用内置的StartEvent
和StopEvent
事件。
定义好事件后,我们就可以构建工作流和步骤了。
from llama_index.core.workflow import (
Context,
Workflow,
StartEvent,
StopEvent,
step,
)
from llama_index.llms.openai import OpenAI
from llama_index.core.selectors.utils import get_selector_from_llm
from llama_index.core.base.response.schema import (
PydanticResponse,
Response,
AsyncStreamingResponse,
)
from llama_index.core.bridge.pydantic import BaseModel
from llama_index.core.response_synthesizers import TreeSummarize
from llama_index.core.schema import QueryBundle
from llama_index.core import Settings
from IPython.display import Markdown, display
import asyncio
class RouterQueryEngineWorkflow(Workflow):
@step
async def selector(
self, ctx: Context, ev: StartEvent
) -> QueryEngineSelectionEvent:
"""
Selects a single/ multiple query engines based on the query.
"""
await ctx.set("query", ev.get("query"))
await ctx.set("llm", ev.get("llm"))
await ctx.set("query_engine_tools", ev.get("query_engine_tools"))
await ctx.set("summarizer", ev.get("summarizer"))
llm = Settings.llm
select_multiple_query_engines = ev.get("select_multi")
query = ev.get("query")
query_engine_tools = ev.get("query_engine_tools")
selector = get_selector_from_llm(
llm, is_multi=select_multiple_query_engines
)
query_engines_metadata = [
query_engine.metadata for query_engine in query_engine_tools
]
selected_query_engines = await selector.aselect(
query_engines_metadata, query
)
return QueryEngineSelectionEvent(
selected_query_engines=selected_query_engines
)
@step
async def generate_responses(
self, ctx: Context, ev: QueryEngineSelectionEvent
) -> SynthesizeEvent:
"""Generate the responses from the selected query engines."""
query = await ctx.get("query", default=None)
selected_query_engines = ev.selected_query_engines
query_engine_tools = await ctx.get("query_engine_tools")
query_engines = [engine.query_engine for engine in query_engine_tools]
print(
f"number of selected query engines: {len(selected_query_engines.selections)}"
)
if len(selected_query_engines.selections) > 1:
tasks = []
for selected_query_engine in selected_query_engines.selections:
print(
f"Selected query engine: {selected_query_engine.index}: {selected_query_engine.reason}"
)
query_engine = query_engines[selected_query_engine.index]
tasks.append(query_engine.aquery(query))
response_generated = await asyncio.gather(*tasks)
else:
query_engine = query_engines[
selected_query_engines.selections[0].index
]
print(
f"Selected query engine: {selected_query_engines.ind}: {selected_query_engines.reason}"
)
response_generated = [await query_engine.aquery(query)]
return SynthesizeEvent(
result=response_generated,
selected_query_engines=selected_query_engines,
)
async def acombine_responses(
self,
summarizer: TreeSummarize,
responses: List[RESPONSE_TYPE],
query_bundle: QueryBundle,
) -> RESPONSE_TYPE:
"""Async combine multiple response from sub-engines."""
print("Combining responses from multiple query engines.")
response_strs = []
source_nodes = []
for response in responses:
if isinstance(
response, (AsyncStreamingResponse, PydanticResponse)
):
response_obj = await response.aget_response()
else:
response_obj = response
source_nodes.extend(response_obj.source_nodes)
response_strs.append(str(response))
summary = await summarizer.aget_response(
query_bundle.query_str, response_strs
)
if isinstance(summary, str):
return Response(response=summary, source_nodes=source_nodes)
elif isinstance(summary, BaseModel):
return PydanticResponse(
response=summary, source_nodes=source_nodes
)
else:
return AsyncStreamingResponse(
response_gen=summary, source_nodes=source_nodes
)
@step
async def synthesize_responses(
self, ctx: Context, ev: SynthesizeEvent
) -> StopEvent:
"""Synthesizes the responses from the generated responses."""
response_generated = ev.result
query = await ctx.get("query", default=None)
summarizer = await ctx.get("summarizer")
selected_query_engines = ev.selected_query_engines
if len(response_generated) > 1:
response = await self.acombine_responses(
summarizer, response_generated, QueryBundle(query_str=query)
)
else:
response = response_generated[0]
response.metadata = response.metadata or {}
response.metadata["selector_result"] = selected_query_engines
return StopEvent(result=response)
定义LLM¶
llm = OpenAI(model="gpt-4o-mini")
Settings.llm = llm
定义摘要生成器¶
from llama_index.core.prompts.default_prompt_selectors import (
DEFAULT_TREE_SUMMARIZE_PROMPT_SEL,
)
summarizer = TreeSummarize(
llm=llm,
summary_template=DEFAULT_TREE_SUMMARIZE_PROMPT_SEL,
)
下载数据¶
!mkdir -p 'data/paul_graham/'
!wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt' -O 'data/paul_graham/paul_graham_essay.txt'
--2024-08-26 22:46:42-- https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 2606:50c0:8000::154, 2606:50c0:8003::154, 2606:50c0:8002::154, ... Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|2606:50c0:8000::154|:443... connected. HTTP request sent, awaiting response... 200 OK Length: 75042 (73K) [text/plain] Saving to: ‘data/paul_graham/paul_graham_essay.txt’ data/paul_graham/pa 100%[===================>] 73.28K --.-KB/s in 0.02s 2024-08-26 22:46:42 (3.82 MB/s) - ‘data/paul_graham/paul_graham_essay.txt’ saved [75042/75042]
加载数据¶
from llama_index.core import SimpleDirectoryReader
documents = SimpleDirectoryReader("./data/paul_graham").load_data()
创建节点¶
nodes = Settings.node_parser.get_nodes_from_documents(documents)
创建索引¶
我们将创建三个索引:摘要索引(SummaryIndex)、向量存储索引(VectorStoreIndex)和简单关键词表索引(SimpleKeywordTableIndex)。
from llama_index.core import (
VectorStoreIndex,
SummaryIndex,
SimpleKeywordTableIndex,
)
summary_index = SummaryIndex(nodes)
vector_index = VectorStoreIndex(nodes)
keyword_index = SimpleKeywordTableIndex(nodes)
创建查询引擎工具¶
from llama_index.core.tools import QueryEngineTool
list_query_engine = summary_index.as_query_engine(
response_mode="tree_summarize",
use_async=True,
)
vector_query_engine = vector_index.as_query_engine()
keyword_query_engine = keyword_index.as_query_engine()
list_tool = QueryEngineTool.from_defaults(
query_engine=list_query_engine,
description=(
"Useful for summarization questions related to Paul Graham eassy on"
" What I Worked On."
),
)
vector_tool = QueryEngineTool.from_defaults(
query_engine=vector_query_engine,
description=(
"Useful for retrieving specific context from Paul Graham essay on What"
" I Worked On."
),
)
keyword_tool = QueryEngineTool.from_defaults(
query_engine=keyword_query_engine,
description=(
"Useful for retrieving specific context using keywords from Paul"
" Graham essay on What I Worked On."
),
)
query_engine_tools = [list_tool, vector_tool, keyword_tool]
运行工作流程!¶
import nest_asyncio
nest_asyncio.apply()
w = RouterQueryEngineWorkflow(timeout=200)
查询¶
摘要查询¶
# This should use summary query engine/ tool.
query = "Provide the summary of the document?"
result = await w.run(
query=query,
llm=llm,
query_engine_tools=query_engine_tools,
summarizer=summarizer,
select_multi=True, # You can change it to default it to select only one query engine.
)
display(
Markdown("> Question: {}".format(query)),
Markdown("Answer: {}".format(result)),
)
number of selected query engines: 1 Selected query engine: 0: This choice directly addresses the need for a summary of the document.
问题:请提供文档的摘要?
回答:该文档记述了一个人从年轻时写作和编程,到探索人工智能,最终成为成功企业家和散文家的历程。大学期间他最初被哲学吸引,但发现其无法满足内心,受文学和纪录片启发转而专注于AI。他的学术追求促使他逆向工程了一个自然语言程序,但很快意识到当时AI的局限性。
在完成博士学位后,他冒险进入艺术领域,参加课程并绘画,同时还在撰写一本关于Lisp编程的书籍。他在科技行业的经历,尤其在一家软件公司的工作经历,塑造了他对商业动态的理解以及在市场中作为入门级选择的重要性。
20世纪90年代中期,他联合创立了Viaweb——一个用于创建在线商店的早期网络应用程序,该平台后来被雅虎收购。此后,他开始涉足天使投资,并联合创办了Y Combinator,这家初创企业加速器通过同时支持多家初创公司的方式彻底改变了种子轮融资模式。
这段叙述突出了作者对工作本质的思考、追求非知名项目的重要性,以及他从编程转向撰写文章的兴趣演变。他强调独立思考的价值,以及互联网对出版和创业的影响。最终,这份文件展现了一种以探索、创造力和致力于帮助他人在事业上取得成功为特征的生活。
指向性上下文查询¶
# This should use vector query engine/ tool.
query = "What did the author do growing up?"
result = await w.run(
query=query,
llm=llm,
query_engine_tools=query_engine_tools,
summarizer=summarizer,
select_multi=False, # You can change it to select multiple query engines.
)
display(
Markdown("> Question: {}".format(query)),
Markdown("Answer: {}".format(result)),
)
number of selected query engines: 1 Selected query engine: 1: The question asks for specific context about the author's experiences growing up, which aligns with retrieving specific context from the essay.
问题:作者在成长过程中做了什么?
回答:作者在成长过程中,课余时间专注于写作和编程。起初他创作短篇小说,后来他形容这些作品情节薄弱但人物情感丰富。年幼时他开始在IBM 1401计算机上接触编程,尝试使用早期Fortran语言和穿孔卡片。最终他说服父亲购买了TRS-80微型计算机,这让他能够编写简单游戏和文字处理软件。尽管热爱编程,他最初计划大学攻读哲学,认为这是追求终极真理的途径。但在发现哲学课程枯燥后,他将重心转向了人工智能领域。
# This query could use either a keyword or vector query engine
# so it will combine responses from both
query = "What were noteable events and people from the authors time at Interleaf and YC?"
result = await w.run(
query=query,
llm=llm,
query_engine_tools=query_engine_tools,
summarizer=summarizer,
select_multi=True, # Since query should use two query engine tools, we enabled it.
)
display(
Markdown("> Question: {}".format(query)),
Markdown("Answer: {}".format(result)),
)
number of selected query engines: 2 Selected query engine: 1: This choice is useful for retrieving specific context related to notable events and people from the author's time at Interleaf and YC. Selected query engine: 2: This choice allows for retrieving specific context using keywords, which can help in identifying notable events and people. Combining responses from multiple query engines.
问题:作者在Interleaf和YC期间有哪些值得注意的事件和人物?
Answer: Notable events during the author's time at Interleaf included the establishment of a large Release Engineering group, which underscored the complexities of software updates and version management. The company also made a significant decision to incorporate a scripting language inspired by Emacs, aimed at attracting Lisp hackers to enhance their software capabilities. The author reflected on this period as the closest they had to a normal job, despite acknowledging their shortcomings as an employee.
在Y Combinator(YC),关键事件包括首个夏季创始人计划的启动,该计划收到了225份申请并资助了八家初创公司,其中不乏知名人物,如Reddit的创始人贾斯汀·坎和埃米特·希尔(后来创立了Twitch),以及亚伦·斯沃茨。该计划在创始人之间培育了一个互助社区,并标志着YC从一个小型项目向更大组织的转型。这一时期的重要人物包括杰西卡·利文斯顿,作者与她保持着密切的职业和个人关系,还有罗伯特·莫里斯和特雷弗·布莱克威尔,他们分别因开发购物车软件和编程能力而受到认可。后来成为YC第二任主席的山姆·奥尔特曼也被提及为这一时期的重要人物。