多策略工作流与反思¶
在本笔记本中,我们将演示一个并行尝试3种不同查询策略并选择最佳方案的工作流程。
如下图所示:
- 首先会评估查询的质量。如果查询质量不佳,将触发
BadQueryEvent
事件,然后improve_query
步骤会在重试前尝试提升查询质量。这就是反思过程。 - 一旦找到可接受的查询,将同时触发三个事件:
NaiveRAGEvent
、HighTopKEvent
和RerankEvent
。 - 每个事件都由一个专用步骤捕获,该步骤在同一个索引上尝试不同的RAG策略。所有3个步骤都会发出
ResponseEvent
judge
步骤会等待直到收集完所有三个ResponseEvents
,然后进行比较。最终它会将最佳响应作为StopEvent
发出
安装依赖项¶
我们需要LlamaIndex、文件阅读器(用于读取PDF)、工作流可视化工具(用于绘制上图),以及OpenAI来嵌入数据并查询LLM。
In [ ]:
Copied!
!pip install llama-index-core llama-index-llms-openai llama-index-utils-workflow llama-index-readers-file llama-index-embeddings-openai
!pip install llama-index-core llama-index-llms-openai llama-index-utils-workflow llama-index-readers-file llama-index-embeddings-openai
获取数据¶
我们使用了三份旧金山2016年至2018年年度预算的长篇PDF文件。
In [ ]:
Copied!
!mkdir data
!wget "https://www.dropbox.com/scl/fi/xt3squt47djba0j7emmjb/2016-CSF_Budget_Book_2016_FINAL_WEB_with-cover-page.pdf?rlkey=xs064cjs8cb4wma6t5pw2u2bl&dl=0" -O "data/2016-CSF_Budget_Book_2016_FINAL_WEB_with-cover-page.pdf"
!wget "https://www.dropbox.com/scl/fi/jvw59g5nscu1m7f96tjre/2017-Proposed-Budget-FY2017-18-FY2018-19_1.pdf?rlkey=v988oigs2whtcy87ti9wti6od&dl=0" -O "data/2017-Proposed-Budget-FY2017-18-FY2018-19_1.pdf"
!wget "https://www.dropbox.com/scl/fi/izknlwmbs7ia0lbn7zzyx/2018-o0181-18.pdf?rlkey=p5nv2ehtp7272ege3m9diqhei&dl=0" -O "data/2018-o0181-18.pdf"
!mkdir data
!wget "https://www.dropbox.com/scl/fi/xt3squt47djba0j7emmjb/2016-CSF_Budget_Book_2016_FINAL_WEB_with-cover-page.pdf?rlkey=xs064cjs8cb4wma6t5pw2u2bl&dl=0" -O "data/2016-CSF_Budget_Book_2016_FINAL_WEB_with-cover-page.pdf"
!wget "https://www.dropbox.com/scl/fi/jvw59g5nscu1m7f96tjre/2017-Proposed-Budget-FY2017-18-FY2018-19_1.pdf?rlkey=v988oigs2whtcy87ti9wti6od&dl=0" -O "data/2017-Proposed-Budget-FY2017-18-FY2018-19_1.pdf"
!wget "https://www.dropbox.com/scl/fi/izknlwmbs7ia0lbn7zzyx/2018-o0181-18.pdf?rlkey=p5nv2ehtp7272ege3m9diqhei&dl=0" -O "data/2018-o0181-18.pdf"
引入依赖项¶
现在我们导入所有依赖项
In [ ]:
Copied!
import os
from llama_index.core import (
SimpleDirectoryReader,
VectorStoreIndex,
StorageContext,
load_index_from_storage,
)
from llama_index.core.workflow import (
step,
Context,
Workflow,
Event,
StartEvent,
StopEvent,
)
from llama_index.llms.openai import OpenAI
from llama_index.core.postprocessor.rankGPT_rerank import RankGPTRerank
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.chat_engine import SimpleChatEngine
from llama_index.utils.workflow import draw_all_possible_flows
import os
from llama_index.core import (
SimpleDirectoryReader,
VectorStoreIndex,
StorageContext,
load_index_from_storage,
)
from llama_index.core.workflow import (
step,
Context,
Workflow,
Event,
StartEvent,
StopEvent,
)
from llama_index.llms.openai import OpenAI
from llama_index.core.postprocessor.rankGPT_rerank import RankGPTRerank
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.chat_engine import SimpleChatEngine
from llama_index.utils.workflow import draw_all_possible_flows
我们还需要设置我们的OpenAI密钥。
In [ ]:
Copied!
from google.colab import userdata
os.environ["OPENAI_API_KEY"] = userdata.get("openai-key")
from google.colab import userdata
os.environ["OPENAI_API_KEY"] = userdata.get("openai-key")
定义事件类¶
我们的流程生成了相当多不同的事件类型。
In [ ]:
Copied!
class JudgeEvent(Event):
query: str
class BadQueryEvent(Event):
query: str
class NaiveRAGEvent(Event):
query: str
class HighTopKEvent(Event):
query: str
class RerankEvent(Event):
query: str
class ResponseEvent(Event):
query: str
response: str
class SummarizeEvent(Event):
query: str
response: str
class JudgeEvent(Event):
query: str
class BadQueryEvent(Event):
query: str
class NaiveRAGEvent(Event):
query: str
class HighTopKEvent(Event):
query: str
class RerankEvent(Event):
query: str
class ResponseEvent(Event):
query: str
response: str
class SummarizeEvent(Event):
query: str
response: str
定义工作流程¶
这是我们工作流程的核心内容,让我们来详细分解一下:
load_or_create_index
是一个常规的RAG函数,它会从磁盘读取我们的PDF文件,如果尚未建立索引则会进行索引。如果索引已存在,它将直接从磁盘恢复现有索引。judge_query
执行了几项操作- 它初始化LLM并调用
load_or_create_index
进行设置。它将这些东西存储在上下文中以便后续使用。 - 它会评估查询的质量
- 如果查询条件不佳,它会触发一个
BadQueryEvent
- 如果查询良好,它会发出一个
NaiveRAGEvent
、一个HighTopKEvent
和一个RerankerEvent
- 它初始化LLM并调用
improve_query
接收BadQueryEvent
并使用LLM尝试在可能的情况下扩展和消除查询歧义,然后循环回到judge_query
naive_rag
,high_top_k
和rerank
接收各自的事件并尝试3种不同的RAG策略。每个策略都会发出一个带有结果的ResponseEvent
,以及一个说明所用策略的source
参数judge
每次在ResponseEvent
触发时都会执行,但它使用collect_events
来缓冲这些事件,直到收到全部3个为止。然后它将响应发送给LLM,并要求其选择"最佳"响应。最后它会将最佳响应作为StopEvent发出
In [ ]:
Copied!
class ComplicatedWorkflow(Workflow):
def load_or_create_index(self, directory_path, persist_dir):
# Check if the index already exists
if os.path.exists(persist_dir):
print("Loading existing index...")
# Load the index from disk
storage_context = StorageContext.from_defaults(
persist_dir=persist_dir
)
index = load_index_from_storage(storage_context)
else:
print("Creating new index...")
# Load documents from the specified directory
documents = SimpleDirectoryReader(directory_path).load_data()
# Create a new index from the documents
index = VectorStoreIndex.from_documents(documents)
# Persist the index to disk
index.storage_context.persist(persist_dir=persist_dir)
return index
@step
async def judge_query(
self, ctx: Context, ev: StartEvent | JudgeEvent
) -> BadQueryEvent | NaiveRAGEvent | HighTopKEvent | RerankEvent:
# initialize
llm = await ctx.get("llm", default=None)
if llm is None:
await ctx.set("llm", OpenAI(model="gpt-4o", temperature=0.1))
await ctx.set(
"index", self.load_or_create_index("data", "storage")
)
# we use a chat engine so it remembers previous interactions
await ctx.set("judge", SimpleChatEngine.from_defaults())
response = await ctx.get("judge").chat(
f"""
Given a user query, determine if this is likely to yield good results from a RAG system as-is. If it's good, return 'good', if it's bad, return 'bad'.
Good queries use a lot of relevant keywords and are detailed. Bad queries are vague or ambiguous.
Here is the query: {ev.query}
"""
)
if response == "bad":
# try again
return BadQueryEvent(query=ev.query)
else:
# send query to all 3 strategies
self.send_event(NaiveRAGEvent(query=ev.query))
self.send_event(HighTopKEvent(query=ev.query))
self.send_event(RerankEvent(query=ev.query))
@step
async def improve_query(
self, ctx: Context, ev: BadQueryEvent
) -> JudgeEvent:
response = await ctx.get("llm").complete(
f"""
This is a query to a RAG system: {ev.query}
The query is bad because it is too vague. Please provide a more detailed query that includes specific keywords and removes any ambiguity.
"""
)
return JudgeEvent(query=str(response))
@step
async def naive_rag(
self, ctx: Context, ev: NaiveRAGEvent
) -> ResponseEvent:
index = await ctx.get("index")
engine = index.as_query_engine(similarity_top_k=5)
response = engine.query(ev.query)
print("Naive response:", response)
return ResponseEvent(
query=ev.query, source="Naive", response=str(response)
)
@step
async def high_top_k(
self, ctx: Context, ev: HighTopKEvent
) -> ResponseEvent:
index = await ctx.get("index")
engine = index.as_query_engine(similarity_top_k=20)
response = engine.query(ev.query)
print("High top k response:", response)
return ResponseEvent(
query=ev.query, source="High top k", response=str(response)
)
@step
async def rerank(self, ctx: Context, ev: RerankEvent) -> ResponseEvent:
index = await ctx.get("index")
reranker = RankGPTRerank(top_n=5, llm=await ctx.get("llm"))
retriever = index.as_retriever(similarity_top_k=20)
engine = RetrieverQueryEngine.from_args(
retriever=retriever,
node_postprocessors=[reranker],
)
response = engine.query(ev.query)
print("Reranker response:", response)
return ResponseEvent(
query=ev.query, source="Reranker", response=str(response)
)
@step
async def judge(self, ctx: Context, ev: ResponseEvent) -> StopEvent:
ready = ctx.collect_events(ev, [ResponseEvent] * 3)
if ready is None:
return None
response = await ctx.get("judge").chat(
f"""
A user has provided a query and 3 different strategies have been used
to try to answer the query. Your job is to decide which strategy best
answered the query. The query was: {ev.query}
Response 1 ({ready[0].source}): {ready[0].response}
Response 2 ({ready[1].source}): {ready[1].response}
Response 3 ({ready[2].source}): {ready[2].response}
Please provide the number of the best response (1, 2, or 3).
Just provide the number, with no other text or preamble.
"""
)
best_response = int(str(response))
print(
f"Best response was number {best_response}, which was from {ready[best_response-1].source}"
)
return StopEvent(result=str(ready[best_response - 1].response))
class ComplicatedWorkflow(Workflow):
def load_or_create_index(self, directory_path, persist_dir):
# Check if the index already exists
if os.path.exists(persist_dir):
print("Loading existing index...")
# Load the index from disk
storage_context = StorageContext.from_defaults(
persist_dir=persist_dir
)
index = load_index_from_storage(storage_context)
else:
print("Creating new index...")
# Load documents from the specified directory
documents = SimpleDirectoryReader(directory_path).load_data()
# Create a new index from the documents
index = VectorStoreIndex.from_documents(documents)
# Persist the index to disk
index.storage_context.persist(persist_dir=persist_dir)
return index
@step
async def judge_query(
self, ctx: Context, ev: StartEvent | JudgeEvent
) -> BadQueryEvent | NaiveRAGEvent | HighTopKEvent | RerankEvent:
# initialize
llm = await ctx.get("llm", default=None)
if llm is None:
await ctx.set("llm", OpenAI(model="gpt-4o", temperature=0.1))
await ctx.set(
"index", self.load_or_create_index("data", "storage")
)
# we use a chat engine so it remembers previous interactions
await ctx.set("judge", SimpleChatEngine.from_defaults())
response = await ctx.get("judge").chat(
f"""
Given a user query, determine if this is likely to yield good results from a RAG system as-is. If it's good, return 'good', if it's bad, return 'bad'.
Good queries use a lot of relevant keywords and are detailed. Bad queries are vague or ambiguous.
Here is the query: {ev.query}
"""
)
if response == "bad":
# try again
return BadQueryEvent(query=ev.query)
else:
# send query to all 3 strategies
self.send_event(NaiveRAGEvent(query=ev.query))
self.send_event(HighTopKEvent(query=ev.query))
self.send_event(RerankEvent(query=ev.query))
@step
async def improve_query(
self, ctx: Context, ev: BadQueryEvent
) -> JudgeEvent:
response = await ctx.get("llm").complete(
f"""
This is a query to a RAG system: {ev.query}
The query is bad because it is too vague. Please provide a more detailed query that includes specific keywords and removes any ambiguity.
"""
)
return JudgeEvent(query=str(response))
@step
async def naive_rag(
self, ctx: Context, ev: NaiveRAGEvent
) -> ResponseEvent:
index = await ctx.get("index")
engine = index.as_query_engine(similarity_top_k=5)
response = engine.query(ev.query)
print("Naive response:", response)
return ResponseEvent(
query=ev.query, source="Naive", response=str(response)
)
@step
async def high_top_k(
self, ctx: Context, ev: HighTopKEvent
) -> ResponseEvent:
index = await ctx.get("index")
engine = index.as_query_engine(similarity_top_k=20)
response = engine.query(ev.query)
print("High top k response:", response)
return ResponseEvent(
query=ev.query, source="High top k", response=str(response)
)
@step
async def rerank(self, ctx: Context, ev: RerankEvent) -> ResponseEvent:
index = await ctx.get("index")
reranker = RankGPTRerank(top_n=5, llm=await ctx.get("llm"))
retriever = index.as_retriever(similarity_top_k=20)
engine = RetrieverQueryEngine.from_args(
retriever=retriever,
node_postprocessors=[reranker],
)
response = engine.query(ev.query)
print("Reranker response:", response)
return ResponseEvent(
query=ev.query, source="Reranker", response=str(response)
)
@step
async def judge(self, ctx: Context, ev: ResponseEvent) -> StopEvent:
ready = ctx.collect_events(ev, [ResponseEvent] * 3)
if ready is None:
return None
response = await ctx.get("judge").chat(
f"""
A user has provided a query and 3 different strategies have been used
to try to answer the query. Your job is to decide which strategy best
answered the query. The query was: {ev.query}
Response 1 ({ready[0].source}): {ready[0].response}
Response 2 ({ready[1].source}): {ready[1].response}
Response 3 ({ready[2].source}): {ready[2].response}
Please provide the number of the best response (1, 2, or 3).
Just provide the number, with no other text or preamble.
"""
)
best_response = int(str(response))
print(
f"Best response was number {best_response}, which was from {ready[best_response-1].source}"
)
return StopEvent(result=str(ready[best_response - 1].response))
绘制流程图¶
这就是我们如何得到一开始展示的图表。
In [ ]:
Copied!
draw_all_possible_flows(
ComplicatedWorkflow, filename="complicated_workflow.html"
)
draw_all_possible_flows(
ComplicatedWorkflow, filename="complicated_workflow.html"
)
运行工作流¶
让我们来体验一下这个工作流程:
- judge_query事件没有返回任何内容。这是因为使用了
send_event
替代。因此查询被判定为"good"。 - 所有3个RAG步骤运行并为查询生成不同的答案
judge
步骤会运行3次。前2次不会产生任何事件,因为它尚未收集到所需的3个ResponseEvent
。- 第三次时,它会选择最佳响应并返回一个
StopEvent
In [ ]:
Copied!
c = ComplicatedWorkflow(timeout=120, verbose=True)
result = await c.run(
# query="How has spending on police changed in San Francisco's budgets from 2016 to 2018?"
# query="How has spending on healthcare changed in San Francisco?"
query="How has spending changed?"
)
print(result)
c = ComplicatedWorkflow(timeout=120, verbose=True)
result = await c.run(
# 查询="2016年至2018年旧金山预算中警察支出有何变化?"
# 查询="旧金山医疗保健支出有何变化?"
query="支出有何变化?"
)
print(result)
Running step judge_query Creating new index... Step judge_query produced no event Running step naive_rag Naive response: Spending has increased over the years due to various factors such as new voter-approved minimum spending requirements, the creation of new voter-approved baselines, and growth in baseline funded requirements. Additionally, there have been notable changes in spending across different service areas and departments, with increases in funding for areas like public protection, transportation, and public works. Step naive_rag produced event ResponseEvent Running step rerank Reranker response: Spending has increased over the years, with notable changes in the allocation of funds to various service areas and departments. The budget reflects adjustments in spending to address evolving needs and priorities, resulting in a rise in overall expenditures across different categories. Step rerank produced event ResponseEvent Running step high_top_k High top k response: Spending has increased over the years, with the total budget showing growth in various areas such as aid assistance/grants, materials & supplies, equipment, debt service, services of other departments, and professional & contractual services. Additionally, there have been new investments in programs like workforce development, economic development, film services, and finance and administration. The budget allocations have been adjusted to accommodate changing needs and priorities, reflecting an overall increase in spending across different departments and programs. Step high_top_k produced event ResponseEvent Running step judge Step judge produced no event Running step judge Step judge produced no event Running step judge Best response was number 3, which was from High top k Step judge produced event StopEvent Spending has increased over the years, with the total budget showing growth in various areas such as aid assistance/grants, materials & supplies, equipment, debt service, services of other departments, and professional & contractual services. Additionally, there have been new investments in programs like workforce development, economic development, film services, and finance and administration. The budget allocations have been adjusted to accommodate changing needs and priorities, reflecting an overall increase in spending across different departments and programs.