ReAct智能体的工作流程¶
本笔记本将指导您如何设置一个Workflow
,从(几乎)零开始构建一个ReAct智能体。
React调用智能体的工作原理是通过提示LLM来调用工具/函数,或者返回最终响应。
我们的工作流程将具备状态记忆功能,并能够调用LLM来选择工具并处理用户输入的消息。
!pip install -U llama-index
import os
os.environ["OPENAI_API_KEY"] = "sk-proj-..."
[可选] 使用Llamatrace设置可观测性¶
设置追踪功能以可视化工作流程中的每个步骤。
!pip install "llama-index-core>=0.10.43" "openinference-instrumentation-llama-index>=2" "opentelemetry-proto>=1.12.0" opentelemetry-exporter-otlp opentelemetry-sdk
from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter as HTTPSpanExporter,
)
from openinference.instrumentation.llama_index import LlamaIndexInstrumentor
# Add Phoenix API Key for tracing
PHOENIX_API_KEY = "<YOUR-PHOENIX-API-KEY>"
os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"api_key={PHOENIX_API_KEY}"
# Add Phoenix
span_phoenix_processor = SimpleSpanProcessor(
HTTPSpanExporter(endpoint="https://app.phoenix.arize.com/v1/traces")
)
# Add them to the tracer
tracer_provider = trace_sdk.TracerProvider()
tracer_provider.add_span_processor(span_processor=span_phoenix_processor)
# Instrument the application
LlamaIndexInstrumentor().instrument(tracer_provider=tracer_provider)
由于工作流默认采用异步优先设计,这一切在笔记本环境中都能顺畅运行。若您在自己的代码中执行,当不存在已激活的异步事件循环时,您需要使用asyncio.run()
来启动一个异步事件循环。
async def main():
<async code>
if __name__ == "__main__":
import asyncio
asyncio.run(main())
设计工作流程¶
一个智能体由多个步骤组成
- 处理最新的用户消息,包括添加到内存和准备聊天记录
- 利用聊天记录和工具构建ReAct提示
- 使用react提示调用llm,并解析出函数/工具调用
- 如果没有工具调用,我们可以返回
- 如果有工具调用,我们需要执行它们,然后循环返回使用最新的工具调用来生成新的ReAct提示
工作流事件¶
为了处理这些步骤,我们需要定义几个事件:
- 用于处理新消息并准备聊天历史记录的事件
- 用于流式传输LLM响应的事件
- 一个事件,用于使用react提示来提示LLM
- 触发工具调用的事件(如果有)
- 一个用于处理工具调用结果的事件(如果有的话)
其他步骤将使用内置的StartEvent
和StopEvent
事件。
除了事件之外,我们还将使用全局上下文来存储当前的推理过程!
from llama_index.core.llms import ChatMessage
from llama_index.core.tools import ToolSelection, ToolOutput
from llama_index.core.workflow import Event
class PrepEvent(Event):
pass
class InputEvent(Event):
input: list[ChatMessage]
class StreamEvent(Event):
delta: str
class ToolCallEvent(Event):
tool_calls: list[ToolSelection]
class FunctionOutputEvent(Event):
output: ToolOutput
from typing import Any, List
from llama_index.core.agent.react import ReActChatFormatter, ReActOutputParser
from llama_index.core.agent.react.types import (
ActionReasoningStep,
ObservationReasoningStep,
)
from llama_index.core.llms.llm import LLM
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.tools.types import BaseTool
from llama_index.core.workflow import (
Context,
Workflow,
StartEvent,
StopEvent,
step,
)
from llama_index.llms.openai import OpenAI
class ReActAgent(Workflow):
def __init__(
self,
*args: Any,
llm: LLM | None = None,
tools: list[BaseTool] | None = None,
extra_context: str | None = None,
**kwargs: Any,
) -> None:
super().__init__(*args, **kwargs)
self.tools = tools or []
self.llm = llm or OpenAI()
self.formatter = ReActChatFormatter.from_defaults(
context=extra_context or ""
)
self.output_parser = ReActOutputParser()
@step
async def new_user_msg(self, ctx: Context, ev: StartEvent) -> PrepEvent:
# clear sources
await ctx.set("sources", [])
# init memory if needed
memory = await ctx.get("memory", default=None)
if not memory:
memory = ChatMemoryBuffer.from_defaults(llm=self.llm)
# get user input
user_input = ev.input
user_msg = ChatMessage(role="user", content=user_input)
memory.put(user_msg)
# clear current reasoning
await ctx.set("current_reasoning", [])
# set memory
await ctx.set("memory", memory)
return PrepEvent()
@step
async def prepare_chat_history(
self, ctx: Context, ev: PrepEvent
) -> InputEvent:
# get chat history
memory = await ctx.get("memory")
chat_history = memory.get()
current_reasoning = await ctx.get("current_reasoning", default=[])
# format the prompt with react instructions
llm_input = self.formatter.format(
self.tools, chat_history, current_reasoning=current_reasoning
)
return InputEvent(input=llm_input)
@step
async def handle_llm_input(
self, ctx: Context, ev: InputEvent
) -> ToolCallEvent | StopEvent:
chat_history = ev.input
current_reasoning = await ctx.get("current_reasoning", default=[])
memory = await ctx.get("memory")
response_gen = await self.llm.astream_chat(chat_history)
async for response in response_gen:
ctx.write_event_to_stream(StreamEvent(delta=response.delta or ""))
try:
reasoning_step = self.output_parser.parse(response.message.content)
current_reasoning.append(reasoning_step)
if reasoning_step.is_done:
memory.put(
ChatMessage(
role="assistant", content=reasoning_step.response
)
)
await ctx.set("memory", memory)
await ctx.set("current_reasoning", current_reasoning)
sources = await ctx.get("sources", default=[])
return StopEvent(
result={
"response": reasoning_step.response,
"sources": [sources],
"reasoning": current_reasoning,
}
)
elif isinstance(reasoning_step, ActionReasoningStep):
tool_name = reasoning_step.action
tool_args = reasoning_step.action_input
return ToolCallEvent(
tool_calls=[
ToolSelection(
tool_id="fake",
tool_name=tool_name,
tool_kwargs=tool_args,
)
]
)
except Exception as e:
current_reasoning.append(
ObservationReasoningStep(
observation=f"There was an error in parsing my reasoning: {e}"
)
)
await ctx.set("current_reasoning", current_reasoning)
# if no tool calls or final response, iterate again
return PrepEvent()
@step
async def handle_tool_calls(
self, ctx: Context, ev: ToolCallEvent
) -> PrepEvent:
tool_calls = ev.tool_calls
tools_by_name = {tool.metadata.get_name(): tool for tool in self.tools}
current_reasoning = await ctx.get("current_reasoning", default=[])
sources = await ctx.get("sources", default=[])
# call tools -- safely!
for tool_call in tool_calls:
tool = tools_by_name.get(tool_call.tool_name)
if not tool:
current_reasoning.append(
ObservationReasoningStep(
observation=f"Tool {tool_call.tool_name} does not exist"
)
)
continue
try:
tool_output = tool(**tool_call.tool_kwargs)
sources.append(tool_output)
current_reasoning.append(
ObservationReasoningStep(observation=tool_output.content)
)
except Exception as e:
current_reasoning.append(
ObservationReasoningStep(
observation=f"Error calling tool {tool.metadata.get_name()}: {e}"
)
)
# save new state in context
await ctx.set("sources", sources)
await ctx.set("current_reasoning", current_reasoning)
# prep the next iteraiton
return PrepEvent()
就这样!让我们稍微探索一下我们编写的工作流程。
new_user_msg()
:
将用户消息添加到内存中,并清除全局上下文以跟踪新的推理链。
prepare_chat_history()
:
使用聊天历史记录、工具和当前推理(如果有)来准备react提示
handle_llm_input()
:
使用我们的react提示词向LLM发起请求,并利用一些工具函数来解析输出。如果没有工具调用,我们可以停止并发出StopEvent
。否则,我们会发出ToolCallEvent
来处理工具调用。最后,如果既没有工具调用也没有最终响应,我们就简单地再次循环。
handle_tool_calls()
:
安全地调用工具并进行错误处理,将工具输出添加到当前推理过程中。然后通过触发PrepEvent
事件,我们进入下一轮的ReAct提示和解析循环。
运行工作流程!¶
注意: 使用循环时,我们需要留意运行时间。这里,我们设置了120秒的超时限制。
from llama_index.core.tools import FunctionTool
from llama_index.llms.openai import OpenAI
def add(x: int, y: int) -> int:
"""Useful function to add two numbers."""
return x + y
def multiply(x: int, y: int) -> int:
"""Useful function to multiply two numbers."""
return x * y
tools = [
FunctionTool.from_defaults(add),
FunctionTool.from_defaults(multiply),
]
agent = ReActAgent(
llm=OpenAI(model="gpt-4o"), tools=tools, timeout=120, verbose=True
)
ret = await agent.run(input="Hello!")
Running step new_user_msg Step new_user_msg produced event PrepEvent Running step prepare_chat_history Step prepare_chat_history produced event InputEvent Running step handle_llm_input Step handle_llm_input produced event StopEvent
print(ret["response"])
Hello! How can I assist you today?
ret = await agent.run(input="What is (2123 + 2321) * 312?")
Running step new_user_msg Step new_user_msg produced event PrepEvent Running step prepare_chat_history Step prepare_chat_history produced event InputEvent Running step handle_llm_input Step handle_llm_input produced event ToolCallEvent Running step handle_tool_calls Step handle_tool_calls produced event PrepEvent Running step prepare_chat_history Step prepare_chat_history produced event InputEvent Running step handle_llm_input Step handle_llm_input produced event ToolCallEvent Running step handle_tool_calls Step handle_tool_calls produced event PrepEvent Running step prepare_chat_history Step prepare_chat_history produced event InputEvent Running step handle_llm_input Step handle_llm_input produced event StopEvent
print(ret["response"])
The result of (2123 + 2321) * 312 is 1,386,528.
聊天历史记录¶
默认情况下,工作流会为每次运行创建一个新的Context
。这意味着聊天历史记录不会在运行之间保留。不过,我们可以传递自己的Context
给工作流来保存聊天历史。
from llama_index.core.workflow import Context
ctx = Context(agent)
ret = await agent.run(input="Hello! My name is Logan", ctx=ctx)
print(ret["response"])
ret = await agent.run(input="What is my name?", ctx=ctx)
print(ret["response"])
Running step new_user_msg Step new_user_msg produced event PrepEvent Running step prepare_chat_history Step prepare_chat_history produced event InputEvent Running step handle_llm_input Step handle_llm_input produced event StopEvent Hello, Logan! How can I assist you today? Running step new_user_msg Step new_user_msg produced event PrepEvent Running step prepare_chat_history Step prepare_chat_history produced event InputEvent Running step handle_llm_input Step handle_llm_input produced event StopEvent Your name is Logan.
流式传输¶
我们也可以通过使用.run()
方法返回的handler
对象来访问LLM的流式响应。
agent = ReActAgent(
llm=OpenAI(model="gpt-4o"), tools=tools, timeout=120, verbose=False
)
handler = agent.run(input="Hello! Tell me a joke.")
async for event in handler.stream_events():
if isinstance(event, StreamEvent):
print(event.delta, end="", flush=True)
response = await handler
# print(response)
Thought: The current language of the user is: English. I cannot use a tool to help me answer the question. Answer: Why don't scientists trust atoms? Because they make up everything!