ReAct智能体的工作流程
本笔记本将逐步指导如何设置一个 Workflow 来(几乎)从零开始构建一个ReAct智能体。
React调用智能体的工作原理是通过提示大型语言模型来调用工具/函数,或返回最终响应。
我们的工作流程将具备带记忆的状态管理能力,并能够调用大语言模型来选择工具并处理传入的用户消息。
!pip install -U llama-indeximport os
os.environ["OPENAI_API_KEY"] = "sk-proj-..."[可选] 使用 Llamatrace 设置可观测性
Section titled “[Optional] Set up observability with Llamatrace”设置追踪功能以可视化工作流中的每个步骤。
!pip install "llama-index-core>=0.10.43" "openinference-instrumentation-llama-index>=2" "opentelemetry-proto>=1.12.0" opentelemetry-exporter-otlp opentelemetry-sdkfrom opentelemetry.sdk import trace as trace_sdkfrom opentelemetry.sdk.trace.export import SimpleSpanProcessorfrom opentelemetry.exporter.otlp.proto.http.trace_exporter import ( OTLPSpanExporter as HTTPSpanExporter,)from openinference.instrumentation.llama_index import LlamaIndexInstrumentor
# Add Phoenix API Key for tracingPHOENIX_API_KEY = "<YOUR-PHOENIX-API-KEY>"os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"api_key={PHOENIX_API_KEY}"
# Add Phoenixspan_phoenix_processor = SimpleSpanProcessor( HTTPSpanExporter(endpoint="https://app.phoenix.arize.com/v1/traces"))
# Add them to the tracertracer_provider = trace_sdk.TracerProvider()tracer_provider.add_span_processor(span_processor=span_phoenix_processor)
# Instrument the applicationLlamaIndexInstrumentor().instrument(tracer_provider=tracer_provider)Since workflows are async first, this all runs fine in a notebook. If you were running in your own code, you would want to use asyncio.run() to start an async event loop if one isn’t already running.
async def main(): <async code>
if __name__ == "__main__": import asyncio asyncio.run(main())一个智能体包含多个步骤
- 处理最新的用户传入消息,包括添加到记忆库并准备聊天历史记录
- 使用聊天记录和工具构建一个ReAct提示
- 使用反应提示调用大语言模型,并解析出函数/工具调用
- 如果没有工具调用,我们可以返回
- 如果存在工具调用,我们需要执行它们,然后使用最新的工具调用循环返回以获取新的ReAct提示
为了处理这些步骤,我们需要定义几个事件:
- 处理新消息并准备聊天历史记录的事件
- 用于流式传输LLM响应的事件
- 一个用于使用反应提示词触发LLM的事件
- 触发工具调用的事件(如果有)
- 一个用于处理工具调用结果(如果有的话)的事件
The other steps will use the built-in StartEvent and StopEvent events.
除了事件之外,我们还将使用全局上下文来存储当前的推理过程!
from llama_index.core.llms import ChatMessagefrom llama_index.core.tools import ToolSelection, ToolOutputfrom llama_index.core.workflow import Event
class PrepEvent(Event): pass
class InputEvent(Event): input: list[ChatMessage]
class StreamEvent(Event): delta: str
class ToolCallEvent(Event): tool_calls: list[ToolSelection]
class FunctionOutputEvent(Event): output: ToolOutput定义好事件后,我们可以构建工作流和步骤。
请注意,工作流会自动使用类型注解进行自我验证,因此我们步骤中的类型注解非常有用!
from typing import Any, List
from llama_index.core.agent.react import ReActChatFormatter, ReActOutputParserfrom llama_index.core.agent.react.types import ( ActionReasoningStep, ObservationReasoningStep,)from llama_index.core.llms.llm import LLMfrom llama_index.core.memory import ChatMemoryBufferfrom llama_index.core.tools.types import BaseToolfrom llama_index.core.workflow import ( Context, Workflow, StartEvent, StopEvent, step,)from llama_index.llms.openai import OpenAI
class ReActAgent(Workflow): def __init__( self, *args: Any, llm: LLM | None = None, tools: list[BaseTool] | None = None, extra_context: str | None = None, **kwargs: Any, ) -> None: super().__init__(*args, **kwargs) self.tools = tools or [] self.llm = llm or OpenAI() self.formatter = ReActChatFormatter.from_defaults( context=extra_context or "" ) self.output_parser = ReActOutputParser()
@step async def new_user_msg(self, ctx: Context, ev: StartEvent) -> PrepEvent: # clear sources await ctx.store.set("sources", [])
# init memory if needed memory = await ctx.store.get("memory", default=None) if not memory: memory = ChatMemoryBuffer.from_defaults(llm=self.llm)
# get user input user_input = ev.input user_msg = ChatMessage(role="user", content=user_input) memory.put(user_msg)
# clear current reasoning await ctx.store.set("current_reasoning", [])
# set memory await ctx.store.set("memory", memory)
return PrepEvent()
@step async def prepare_chat_history( self, ctx: Context, ev: PrepEvent ) -> InputEvent: # get chat history memory = await ctx.store.get("memory") chat_history = memory.get() current_reasoning = await ctx.store.get( "current_reasoning", default=[] )
# format the prompt with react instructions llm_input = self.formatter.format( self.tools, chat_history, current_reasoning=current_reasoning ) return InputEvent(input=llm_input)
@step async def handle_llm_input( self, ctx: Context, ev: InputEvent ) -> ToolCallEvent | StopEvent: chat_history = ev.input current_reasoning = await ctx.store.get( "current_reasoning", default=[] ) memory = await ctx.store.get("memory")
response_gen = await self.llm.astream_chat(chat_history) async for response in response_gen: ctx.write_event_to_stream(StreamEvent(delta=response.delta or ""))
try: reasoning_step = self.output_parser.parse(response.message.content) current_reasoning.append(reasoning_step)
if reasoning_step.is_done: memory.put( ChatMessage( role="assistant", content=reasoning_step.response ) ) await ctx.store.set("memory", memory) await ctx.store.set("current_reasoning", current_reasoning)
sources = await ctx.store.get("sources", default=[])
return StopEvent( result={ "response": reasoning_step.response, "sources": [sources], "reasoning": current_reasoning, } ) elif isinstance(reasoning_step, ActionReasoningStep): tool_name = reasoning_step.action tool_args = reasoning_step.action_input return ToolCallEvent( tool_calls=[ ToolSelection( tool_id="fake", tool_name=tool_name, tool_kwargs=tool_args, ) ] ) except Exception as e: current_reasoning.append( ObservationReasoningStep( observation=f"There was an error in parsing my reasoning: {e}" ) ) await ctx.store.set("current_reasoning", current_reasoning)
# if no tool calls or final response, iterate again return PrepEvent()
@step async def handle_tool_calls( self, ctx: Context, ev: ToolCallEvent ) -> PrepEvent: tool_calls = ev.tool_calls tools_by_name = {tool.metadata.get_name(): tool for tool in self.tools} current_reasoning = await ctx.store.get( "current_reasoning", default=[] ) sources = await ctx.store.get("sources", default=[])
# call tools -- safely! for tool_call in tool_calls: tool = tools_by_name.get(tool_call.tool_name) if not tool: current_reasoning.append( ObservationReasoningStep( observation=f"Tool {tool_call.tool_name} does not exist" ) ) continue
try: tool_output = tool(**tool_call.tool_kwargs) sources.append(tool_output) current_reasoning.append( ObservationReasoningStep(observation=tool_output.content) ) except Exception as e: current_reasoning.append( ObservationReasoningStep( observation=f"Error calling tool {tool.metadata.get_name()}: {e}" ) )
# save new state in context await ctx.store.set("sources", sources) await ctx.store.set("current_reasoning", current_reasoning)
# prep the next iteraiton return PrepEvent()就这样!让我们稍微探索一下我们编写的工作流程。
new_user_msg()new_user_msg():
将用户消息添加到内存中,并清除全局上下文以跟踪新的推理链。
prepare_chat_history()prepare_chat_history():
使用聊天历史记录、工具和当前推理(如有)来准备反应提示
handle_llm_input():
使用我们的反应提示向LLM发出提示,并利用一些实用函数来解析输出。如果没有工具调用,我们可以停止并发出一个StopEvent。否则,我们发出一个ToolCallEvent来处理工具调用。最后,如果没有工具调用且没有最终响应,我们只需再次循环。
handle_tool_calls():
安全地调用工具并包含错误处理,将工具输出添加到当前推理中。然后,通过发出一个PrepEvent,我们循环进行另一轮ReAct提示和解析。
注意:使用循环时,我们需要留意运行时间。这里我们设置了120秒的超时。
from llama_index.core.tools import FunctionToolfrom llama_index.llms.openai import OpenAI
def add(x: int, y: int) -> int: """Useful function to add two numbers.""" return x + y
def multiply(x: int, y: int) -> int: """Useful function to multiply two numbers.""" return x * y
tools = [ FunctionTool.from_defaults(add), FunctionTool.from_defaults(multiply),]
agent = ReActAgent( llm=OpenAI(model="gpt-4o"), tools=tools, timeout=120, verbose=True)
ret = await agent.run(input="Hello!")Running step new_user_msgStep new_user_msg produced event PrepEventRunning step prepare_chat_historyStep prepare_chat_history produced event InputEventRunning step handle_llm_inputStep handle_llm_input produced event StopEventprint(ret["response"])Hello! How can I assist you today?ret = await agent.run(input="What is (2123 + 2321) * 312?")Running step new_user_msgStep new_user_msg produced event PrepEventRunning step prepare_chat_historyStep prepare_chat_history produced event InputEventRunning step handle_llm_inputStep handle_llm_input produced event ToolCallEventRunning step handle_tool_callsStep handle_tool_calls produced event PrepEventRunning step prepare_chat_historyStep prepare_chat_history produced event InputEventRunning step handle_llm_inputStep handle_llm_input produced event ToolCallEventRunning step handle_tool_callsStep handle_tool_calls produced event PrepEventRunning step prepare_chat_historyStep prepare_chat_history produced event InputEventRunning step handle_llm_inputStep handle_llm_input produced event StopEventprint(ret["response"])The result of (2123 + 2321) * 312 is 1,386,528.默认情况下,该工作流每次运行时都会创建一个新的 Context。这意味着聊天记录在多次运行之间不会被保留。不过,我们可以将自己的 Context 传递给工作流以保留聊天记录。
from llama_index.core.workflow import Context
ctx = Context(agent)
ret = await agent.run(input="Hello! My name is Logan", ctx=ctx)print(ret["response"])
ret = await agent.run(input="What is my name?", ctx=ctx)print(ret["response"])Running step new_user_msgStep new_user_msg produced event PrepEventRunning step prepare_chat_historyStep prepare_chat_history produced event InputEventRunning step handle_llm_inputStep handle_llm_input produced event StopEventHello, Logan! How can I assist you today?Running step new_user_msgStep new_user_msg produced event PrepEventRunning step prepare_chat_historyStep prepare_chat_history produced event InputEventRunning step handle_llm_inputStep handle_llm_input produced event StopEventYour name is Logan.我们还可以通过使用从 .run() 方法返回的 handler 对象来访问来自LLM的流式响应。
agent = ReActAgent( llm=OpenAI(model="gpt-4o"), tools=tools, timeout=120, verbose=False)
handler = agent.run(input="Hello! Tell me a joke.")
async for event in handler.stream_events(): if isinstance(event, StreamEvent): print(event.delta, end="", flush=True)
response = await handler# print(response)Thought: The current language of the user is: English. I cannot use a tool to help me answer the question.Answer: Why don't scientists trust atoms? Because they make up everything!