Skip to content

流式处理

LangGraph 提供了一流的流式处理支持。可以通过多种不同方式从图形运行中返回输出。

流式图形输出(.stream.astream

.stream.astream 是同步和异步方法,用于从图形运行中流式返回输出。当调用这些方法时,可以指定几种不同的模式(例如 graph.stream(..., mode="...")):

  • "values":在图形的每一步之后,流式传输状态的完整值。
  • "updates":在图形的每一步之后,流式传输对状态的更新。如果在同一步中进行了多次更新(例如,运行多个节点),则这些更新将分别流式传输。
  • "custom":从图形节点内部流式传输自定义数据。
  • "messages":对于调用 LLM 的图形节点,流式传输 LLM 令牌和元数据。
  • "debug":在图形执行过程中,尽可能多地流式传输信息。

您还可以通过将多个流式模式作为列表传递来同时指定多个流式模式。当这样做时,流式输出将是元组 (stream_mode, data)。例如:

graph.stream(..., stream_mode=["updates", "messages"])
...
('messages', (AIMessageChunk(content='Hi'), {'langgraph_step': 3, 'langgraph_node': 'agent', ...}))
...
('updates', {'agent': {'messages': [AIMessage(content="Hi, how can I help you?")]}})

下面的可视化展示了 valuesupdates 模式之间的区别:

values vs updates

流式 LLM 令牌和事件(.astream_events

此外,您可以使用 astream_events 方法流式返回发生在节点内部的事件。这对于 流式传输 LLM 调用的令牌 非常有用。

这是所有 LangChain 对象 的标准方法。这意味着在图形执行时,会沿途发出某些事件,如果您使用 .astream_events 运行图形,可以看到这些事件。

所有事件都有(除其他外)eventnamedata 字段。这些分别表示什么?

  • event:这是正在发出的事件类型。您可以在 这里 找到所有回调事件和触发器的详细表格。
  • name:这是事件的名称。
  • data:这是与事件相关的数据。

什么情况下会引发事件的发出?

  • 每个节点(可运行)在开始执行时发出 on_chain_start,在节点执行过程中发出 on_chain_stream,在节点完成时发出 on_chain_end。节点事件将在事件的 name 字段中具有节点名称。
  • 图形将在图形执行开始时发出 on_chain_start,在每个节点执行后发出 on_chain_stream,在图形完成时发出 on_chain_end。图形事件将在事件的 name 字段中具有 LangGraph
  • 对状态通道的任何写入(即每次更新状态键的值)将发出 on_chain_starton_chain_end 事件。

此外,在您的节点内部创建的任何事件(LLM 事件、工具事件、手动发出的事件等)也将在 .astream_events 的输出中可见。

为了更具体地说明这一点,并查看运行简单图形时返回的事件,让我们来看一下:

from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState, START, END

model = ChatOpenAI(model="gpt-4o-mini")


def call_model(state: MessagesState):
    response = model.invoke(state['messages'])
    return {"messages": response}

workflow = StateGraph(MessagesState)
workflow.add_node(call_model)
workflow.add_edge(START, "call_model")
workflow.add_edge("call_model", END)
app = workflow.compile()

inputs = [{"role": "user", "content": "hi!"}]
async for event in app.astream_events({"messages": inputs}, version="v1"):
    kind = event["event"]
    print(f"{kind}: {event['name']}")
on_chain_start: LangGraph
on_chain_start: __start__
on_chain_end: __start__
on_chain_start: call_model
on_chat_model_start: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_stream: ChatOpenAI
on_chat_model_end: ChatOpenAI
on_chain_start: ChannelWrite<call_model,messages>
on_chain_end: ChannelWrite<call_model,messages>
on_chain_stream: call_model
on_chain_end: call_model
on_chain_stream: LangGraph
on_chain_end: LangGraph

我们以整体图形开始 (on_chain_start: LangGraph)。然后我们写入 __start__ 节点(这是特殊节点,用于处理输入)。 接着我们启动 call_model 节点(on_chain_start: call_model)。然后启动聊天模型调用(on_chat_model_start: ChatOpenAI), 逐个流式返回令牌(on_chat_model_stream: ChatOpenAI),然后完成聊天模型(on_chat_model_end: ChatOpenAI)。此后, 我们将结果写回通道(ChannelWrite<call_model,messages>),然后完成 call_model 节点,最后完成整个图形。

这应该能让您对简单图形中发出的事件有一个很好的了解。但这些事件包含什么数据? 每种类型的事件以不同格式包含数据。让我们看一下 on_chat_model_stream 事件的样子。这是一个重要的事件类型, 因为它是流式传输 LLM 响应令牌所必需的。

这些事件的样子如下:

{'event': 'on_chat_model_stream',
 'name': 'ChatOpenAI',
 'run_id': '3fdbf494-acce-402e-9b50-4eab46403859',
 'tags': ['seq:step:1'],
 'metadata': {'langgraph_step': 1,
  'langgraph_node': 'call_model',
  'langgraph_triggers': ['start:call_model'],
  'langgraph_task_idx': 0,
  'checkpoint_id': '1ef657a0-0f9d-61b8-bffe-0c39e4f9ad6c',
  'checkpoint_ns': 'call_model',
  'ls_provider': 'openai',
  'ls_model_name': 'gpt-4o-mini',
  'ls_model_type': 'chat',
  'ls_temperature': 0.7},
 'data': {'chunk': AIMessageChunk(content='Hello', id='run-3fdbf494-acce-402e-9b50-4eab46403859')},
 'parent_ids': []}
我们可以看到我们得到了事件类型和名称(我们之前知道的)。

我们还在元数据中有一堆信息。尤其值得注意的是,'langgraph_node': 'call_model', 提供了非常有用的信息, 告诉我们这个模型是在什么节点内部被调用的。

最后,data 是一个非常重要的字段。它包含了此事件的实际数据!在这种情况下, 它是一个 AIMessageChunk。它包含了消息的 content,以及一个 id。 这是整个 AIMessage 的 ID(不仅仅是这一块),这非常有帮助 - 它帮助 我们跟踪哪些块属于同一消息(以便在 UI 中一起显示)。

这些信息包含了创建流式 LLM 令牌的 UI 所需的所有内容。您可以查看 这里 的指南。

ASYNC IN PYTHON<=3.10

在 Python <= 3.10 中使用 .astream_events 时,您可能无法看到来自节点内部的事件被发出。如果您在节点内部异步使用 Langchain RunnableLambda、RunnableGenerator 或工具,您将需要手动将回调传播到这些对象。这是因为 LangChain 不能在这种情况下自动将回调传播到子对象。请查看 这里这里 的示例。

LangGraph 平台

流式处理对于使 LLM 应用程序对最终用户感觉响应迅速至关重要。在创建流式运行时,流式模式决定了返回给 API 客户端的数据。LangGraph 平台支持五种流式模式:

  • values:在每个 超步骤 执行后流式传输图形的完整状态。有关流式值的 如何指导
  • messages-tuple:流式传输在节点内部生成的任何消息的 LLM 令牌。此模式主要用于支持聊天应用程序。有关流式消息的 如何指导
  • updates:在每个节点执行后流式传输图形状态的更新。有关流式更新的 如何指导
  • events:流式传输在图形执行期间发生的所有事件(包括图形的状态)。有关流式事件的 如何指导。这可以用于对 LLM 的逐令牌流式传输。
  • debug:在图形执行过程中流式传输调试事件。有关流式调试事件的 如何指导

您还可以同时指定多种流式模式。有关同时配置多种流式模式的 如何指导

有关如何创建流式运行的 API 参考

流式模式 valuesupdatesmessages-tupledebug 非常类似于 LangGraph 库中可用的模式 - 有关这些的更深入的概念解释,您可以查看 前一节

流式模式 events 与在 LangGraph 库中使用 .astream_events 相同 - 有关此的更深入的概念解释,您可以查看 前一节。 所有发出的事件都有两个属性:

  • event: 这是事件的名称
  • data: 这是与事件相关的数据