Skip to content

持久化

LangGraph具有内置的持久化层,通过检查点实现。当使用检查点编译图时,检查点会在每个超级步骤保存图的状态快照。这些快照被保存到一个线程中,执行图形后可以访问它们。由于线程允许在执行后访问图的状态,几个强大的功能,包括人机交互、内存、时间旅行和容错,都是可能的。请参见这个使用指南,了解如何为你的图添加和使用检查点的端到端示例。下面我们将详细讨论这些概念。

检查点

线程

线程是分配给检查点的唯一ID或线程标识符。在调用带有检查点的图时,**必须**在配置的configurable部分中指定thread_id

{"configurable": {"thread_id": "1"}}

检查点

检查点是每个超级步骤保存的图状态的快照,并由StateSnapshot对象表示,具有以下关键属性:

  • config:与此检查点相关的配置。
  • metadata:与此检查点相关的元数据。
  • values:此时状态通道的值。
  • next:将要在图中执行的下一个节点名称的元组。
  • tasks:包含有关要执行的下一个任务信息的PregelTask对象的元组。如果该步骤之前尝试过,它将包含错误信息。如果图在节点内部被动态中断,任务将包含与中断相关的其他数据。

让我们看看当一个简单的图被如下调用时保存了什么检查点:

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from typing import Annotated
from typing_extensions import TypedDict
from operator import add

class State(TypedDict):
    foo: int
    bar: Annotated[list[str], add]

def node_a(state: State):
    return {"foo": "a", "bar": ["a"]}

def node_b(state: State):
    return {"foo": "b", "bar": ["b"]}


workflow = StateGraph(State)
workflow.add_node(node_a)
workflow.add_node(node_b)
workflow.add_edge(START, "node_a")
workflow.add_edge("node_a", "node_b")
workflow.add_edge("node_b", END)

checkpointer = MemorySaver()
graph = workflow.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "1"}}
graph.invoke({"foo": ""}, config)

在我们运行图后,我们期望看到确切的4个检查点:

  • 空检查点,START作为下一个要执行的节点
  • 检查点,用户输入为{'foo': '', 'bar': []}node_a作为下一个要执行的节点
  • 检查点,node_a的输出为{'foo': 'a', 'bar': ['a']}node_b作为下一个要执行的节点
  • 检查点,node_b的输出为{'foo': 'b', 'bar': ['a', 'b']},没有下一个要执行的节点

注意bar通道的值包含了两个节点的输出,因为我们为bar通道有一个归约器。

获取状态

在与保存的图状态进行交互时,**必须**指定一个线程标识符。你可以通过调用graph.get_state(config)来查看图的*最新*状态。这将返回与提供的线程ID在配置中关联的最新检查点对应的StateSnapshot对象,或者与线程的检查点ID关联的检查点(如果提供)。

# 获取最新的状态快照
config = {"configurable": {"thread_id": "1"}}
graph.get_state(config)

# 获取特定checkpoint_id的状态快照
config = {"configurable": {"thread_id": "1", "checkpoint_id": "1ef663ba-28fe-6528-8002-5a559208592c"}}
graph.get_state(config)

在我们的示例中,get_state的输出将如下所示:

StateSnapshot(
    values={'foo': 'b', 'bar': ['a', 'b']},
    next=(),
    config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},
    metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},
    created_at='2024-08-29T19:19:38.821749+00:00',
    parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}}, tasks=()
)

获取状态历史

你可以通过调用graph.get_state_history(config)获取给定线程的图执行的完整历史。这将返回与配置中提供的线程ID相关的StateSnapshot对象的列表。重要的是,检查点将按时间顺序排序,最新的检查点/ StateSnapshot将位于列表的第一个位置。

config = {"configurable": {"thread_id": "1"}}
list(graph.get_state_history(config))

在我们的示例中,get_state_history的输出将如下所示:

[
    StateSnapshot(
        values={'foo': 'b', 'bar': ['a', 'b']},
        next=(),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},
        metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},
        created_at='2024-08-29T19:19:38.821749+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},
        tasks=(),
    ),
    StateSnapshot(
        values={'foo': 'a', 'bar': ['a']}, next=('node_b',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},
        metadata={'source': 'loop', 'writes': {'node_a': {'foo': 'a', 'bar': ['a']}}, 'step': 1},
        created_at='2024-08-29T19:19:38.819946+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
        tasks=(PregelTask(id='6fb7314f-f114-5413-a1f3-d37dfe98ff44', name='node_b', error=None, interrupts=()),),
    ),
    StateSnapshot(
        values={'foo': '', 'bar': []},
        next=('node_a',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
        metadata={'source': 'loop', 'writes': None, 'step': 0},
        created_at='2024-08-29T19:19:38.817813+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},
        tasks=(PregelTask(id='f1b14528-5ee5-579c-949b-23ef9bfbed58', name='node_a', error=None, interrupts=()),),
    ),
    StateSnapshot(
        values={'bar': []},
        next=('__start__',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},
        metadata={'source': 'input', 'writes': {'foo': ''}, 'step': -1},
        created_at='2024-08-29T19:19:38.816205+00:00',
        parent_config=None,
        tasks=(PregelTask(id='6d27aa2e-d72b-5504-a36f-8620e54a76dd', name='__start__', error=None, interrupts=()),),
    )
]

状态

重播

还可以回放先前的图执行。如果我们在调用图时带有thread_idcheckpoint_id,那么我们将*从一个与checkpoint_id对应的检查点回放*图。

  • thread_id只是线程的ID。这是始终必需的。
  • checkpoint_id这个标识符指的是线程内的特定检查点。

你必须在调用图时将这些作为配置的configurable部分的一部分传递:

# {"configurable": {"thread_id": "1"}}  # 有效配置
# {"configurable": {"thread_id": "1", "checkpoint_id": "0c62ca34-ac19-445d-bbb0-5b4984975b2a"}}  # 也有效配置

config = {"configurable": {"thread_id": "1"}}
graph.invoke(None, config=config)

重要的是,LangGraph知道特定检查点是否已被先前执行。如果是这样,LangGraph将简单地*重播*该图中的特定步骤,而不是重新执行该步骤。请参见关于时间旅行的使用指南以了解更多关于重播的信息

重播

更新状态

除从特定检查点重播图外,我们还可以*编辑*图状态。我们可以使用graph.update_state()来做到这一点。该方法有三个不同的参数:

config

配置应包含thread_id,指定要更新的线程。当仅传递thread_id时,我们更新(或分叉)当前状态。可选地,如果我们包含checkpoint_id字段,则我们分叉该选定的检查点。

values

这些是将用于更新状态的值。请注意,此更新被视为来自节点的任何更新。也就是说,这些值将被传递给图状态中某些通道定义的归约器函数。这意味着update_state不会自动覆盖每个通道的通道值,而仅覆盖没有归约器的通道。让我们通过一个示例来说明。

假设您定义了图的状态,其模式如下(请参见上面的完整示例):

from typing import Annotated
from typing_extensions import TypedDict
from operator import add

class State(TypedDict):
    foo: int
    bar: Annotated[list[str], add]

现在假设图的当前状态是

{"foo": 1, "bar": ["a"]}

如果您按如下方式更新状态:

graph.update_state(config, {"foo": 2, "bar": ["b"]})

那么图的新状态将是:

{"foo": 2, "bar": ["a", "b"]}
foo键(通道)被完全更改(因为没有为该通道指定reducer,所以update_state会覆盖它)。然而,bar键具有指定的reducer,因此它将"b"附加到bar的状态中。

as_node

调用update_state时可以选择指定的最后一个参数as_node。如果提供,它会将更新视为来自节点as_node。如果未提供as_node,则会设置为上一个更新状态的最后一个节点,如果没有歧义。这一点很重要,因为下一步要执行的操作取决于最后一个给出更新的节点,因此可以用来控制哪个节点接下来执行。有关如何分叉状态的更多信息,请查看这个时间旅行指南

更新

内存存储

共享状态模型

一个state schema指定一组在执行图时被填充的键。如上所述,状态可以由检查点写入每个图步骤的线程,使状态持久化。

但是,如果我们想要在*线程之间*重新训练一些信息呢?考虑一个聊天机器人的案例,我们想在与用户的*所有*聊天对话(例如线程)中保留特定的信息!

仅靠检查点,我们无法在线程之间共享信息。这就需要Store接口。作为一种说明,我们可以定义一个InMemoryStore来在线程之间存储与用户相关的信息。我们简单地将图编译为一个检查点,如前所述,并使用我们的新in_memory_store变量。

基本用法

首先,让我们在不使用LangGraph的情况下单独展示这一点。

from langgraph.store.memory import InMemoryStore
in_memory_store = InMemoryStore()

记忆通过一个tuple进行命名空间化,在这个特定示例中将是(<user_id>, "memories")。命名空间可以是任意长度,并且可以表示任何东西,不一定是用户特定的。

user_id = "1"
namespace_for_memory = (user_id, "memories")

我们使用store.put方法将记忆保存到库存的命名空间中。当我们这样做时,指定命名空间,如上所定义,以及记忆的键值对:键是该记忆的唯一标识符(memory_id),值(一个字典)是记忆本身。

memory_id = str(uuid.uuid4())
memory = {"food_preference" : "I like pizza"}
in_memory_store.put(namespace_for_memory, memory_id, memory)

我们可以使用store.search方法从我们的命名空间中读取记忆,它会将给定用户的所有记忆作为列表返回。最近的记忆在列表的最后。

memories = in_memory_store.search(namespace_for_memory)
memories[-1].dict()
{'value': {'food_preference': 'I like pizza'},
 'key': '07e0caf4-1631-47b7-b15f-65515d4c1843',
 'namespace': ['1', 'memories'],
 'created_at': '2024-10-02T17:22:31.590602+00:00',
 'updated_at': '2024-10-02T17:22:31.590605+00:00'}

每个记忆类型都是一个Python类(Item),具有特定的属性。我们可以通过.dict将其转换为字典格式并进行访问,正如上面所示。 它具有的属性包括:

  • value: 此记忆的值(本身是一个字典)
  • key: 此命名空间中此记忆的唯一键
  • namespace: 字符串列表,此记忆类型的命名空间
  • created_at: 此记忆创建时的时间戳
  • updated_at: 此记忆更新时间的时间戳

语义搜索

除了简单检索外,存储还支持语义搜索,允许您根据意义而非精确匹配查找记忆。为了启用此功能,需要使用嵌入模型配置存储:

from langchain.embeddings import init_embeddings

store = InMemoryStore(
    index={
        "embed": init_embeddings("openai:text-embedding-3-small"),  # 嵌入提供者
        "dims": 1536,                              # 嵌入维度
        "fields": ["food_preference", "$"]              # 要嵌入的字段
    }
)

现在在搜索时,您可以使用自然语言查询来查找相关记忆:

# 查找关于食物偏好的记忆
# (在将记忆存入存储后可以这样做)
memories = store.search(
    namespace_for_memory,
    query="用户喜欢吃什么?",
    limit=3  # 返回前3个匹配项
)

您可以通过配置fields参数或在存储记忆时指定index参数来控制记忆的哪些部分被嵌入:

# 存储时带有特定字段进行嵌入
store.put(
    namespace_for_memory,
    str(uuid.uuid4()),
    {
        "food_preference": "我喜欢意大利菜",
        "context": "讨论晚餐计划"
    },
    index=["food_preference"]  # 仅嵌入“food_preferences”字段
)

# 直接存储而不嵌入(仍能检索,但不可搜索)
store.put(
    namespace_for_memory,
    str(uuid.uuid4()),
    {"system_info": "最后更新:2024-01-01"},
    index=False
)

在LangGraph中使用

将所有这些整合好之后,我们在LangGraph中使用in_memory_storein_memory_store与检查点配合工作:检查点将状态保存到线程,如上所述,而in_memory_store允许我们存储任意信息以供在线程之间访问。我们以如下方式编译图,包括检查点和in_memory_store

from langgraph.checkpoint.memory import MemorySaver

# 我们需要这个,因为我们想启用线程(对话)
checkpointer = MemorySaver()

# ... 定义图 ...

# 用检查点和存储编译图
graph = graph.compile(checkpointer=checkpointer, store=in_memory_store)

我们以thread_id(如前所述)和user_id调用图,后者将用于将我们的记忆命名空间化到特定用户,如上所示。

# 调用图
user_id = "1"
config = {"configurable": {"thread_id": "1", "user_id": user_id}}

# 首先,我们只需要向AI打个招呼
for update in graph.stream(
    {"messages": [{"role": "user", "content": "你好"}]}, config, stream_mode="updates"
):
    print(update)

我们可以通过将store: BaseStoreconfig: RunnableConfig作为节点参数传递;在*任何节点*中访问in_memory_storeuser_id。以下是如何在节点中使用语义搜索查找相关记忆的示例:

def update_memory(state: MessagesState, config: RunnableConfig, *, store: BaseStore):

    # 从配置中获取用户ID
    user_id = config["configurable"]["user_id"]

    # 命名空间化记忆
    namespace = (user_id, "memories")

    # ... 分析对话并创建新的记忆

    # 创建新的记忆ID
    memory_id = str(uuid.uuid4())

    # 创建新记忆
    store.put(namespace, memory_id, {"memory": memory})

正如我们上面所示,我们也可以在任何节点中访问存储,并使用store.search方法获取记忆。请回忆一下,记忆以对象列表返回,可以转换为字典。

memories[-1].dict()
{'value': {'food_preference': 'I like pizza'},
 'key': '07e0caf4-1631-47b7-b15f-65515d4c1843',
 'namespace': ['1', 'memories'],
 'created_at': '2024-10-02T17:22:31.590602+00:00',
 'updated_at': '2024-10-02T17:22:31.590605+00:00'}

我们可以访问这些记忆并在我们的模型调用中使用它们。

def call_model(state: MessagesState, config: RunnableConfig, *, store: BaseStore):
    # 从配置中获取用户ID
    user_id = config["configurable"]["user_id"]

    # 根据最近的消息进行搜索
    memories = store.search(
        namespace,
        query=state["messages"][-1].content,
        limit=3
    )
    info = "\n".join([d.value["memory"] for d in memories])

    # ... 在模型调用中使用记忆

如果我们创建一个新线程,只要user_id相同,我们仍然可以访问相同的记忆。

# 调用图
config = {"configurable": {"thread_id": "2", "user_id": "1"}}

# 让我们再打个招呼
for update in graph.stream(
    {"messages": [{"role": "user", "content": "嗨,告诉我我的记忆"}]}, config, stream_mode="updates"
):
    print(update)

当我们使用LangGraph平台时,无论是本地(例如,在LangGraph Studio中)还是使用LangGraph Cloud,基本存储默认可用,无需在图编译时指定。但是,要启用语义搜索,您**确实**需要在您的langgraph.json文件中配置索引设置。例如:

{
    ...
    "store": {
        "index": {
            "embed": "openai:text-embeddings-3-small",
            "dims": 1536,
            "fields": ["$"]
        }
    }
}

有关更多详细信息和配置选项,请参阅部署指南

检查点库

在后台,检查点由符合BaseCheckpointSaver接口的检查点对象驱动。LangGraph提供了几种检查点实现,所有实现均通过独立的可安装库实现:

  • langgraph-checkpoint:检查点保存程序的基本接口(BaseCheckpointSaver)和序列化/反序列化接口(SerializerProtocol)。包括内存检查点实现(MemorySaver),以用于实验。LangGraph随附了langgraph-checkpoint
  • langgraph-checkpoint-sqlite: LangGraph 检查点保存器的实现,使用 SQLite 数据库 (SqliteSaver / AsyncSqliteSaver)。非常适合实验和本地工作流程。需要单独安装。
  • langgraph-checkpoint-postgres: 使用 Postgres 数据库的高级检查点保存器 (PostgresSaver / AsyncPostgresSaver),用于 LangGraph Cloud。理想用于生产环境。需要单独安装。

检查点保存器接口

每个检查点保存器符合 BaseCheckpointSaver 接口,并实现以下方法:

  • .put - 存储具有其配置和元数据的检查点。
  • .put_writes - 存储与检查点相关联的中间写入 (即 pending writes)。
  • .get_tuple - 使用给定配置 (thread_idcheckpoint_id) 获取检查点元组。用于填充 graph.get_state() 中的 StateSnapshot
  • .list - 列出与给定配置和过滤条件匹配的检查点。用于填充 graph.get_state_history() 中的状态历史记录。

如果检查点保存器与异步图执行一起使用(即通过 .ainvoke.astream.abatch 执行图),则将使用上述方法的异步版本 (.aput.aput_writes.aget_tuple.alist)。

Note

要异步运行您的图,您可以使用 MemorySaver,或 Sqlite/Postgres 检查点保存器的异步版本 -- AsyncSqliteSaver / AsyncPostgresSaver 检查点保存器。

序列化器

当检查点保存器保存图状态时,需要序列化状态中的通道值。这是通过序列化对象完成的。 langgraph_checkpoint 定义了实现序列化器的 协议,并提供了一个默认实现 (JsonPlusSerializer),该实现处理多种类型,包括 LangChain 和 LangGraph 原语、日期时间、枚举等。

能力

人类在环

首先,检查点保存器通过允许人类检查、干预和批准图步骤来促进 人类在环工作流程。需要这些工作流程的检查点保存器,因为人类必须能够在任何时候查看图的状态,并且图必须能够在人类对状态进行任何更新后恢复执行。有关具体示例,请参见 这些操作指南

记忆

其次,检查点保存器允许在交互之间形成 "记忆"。在重复的人类交互情况下(例如对话),可以将任何后续消息发送到该线程,该线程将保留之前的记忆。有关如何使用检查点保存器添加和管理对话记忆的端到端示例,请参见 这份操作指南

时间旅行

第三,检查点保存器允许 "时间旅行",允许用户重放先前的图执行以回顾和/或调试特定的图步骤。此外,检查点保存器使在任意检查点处分叉图状态成为可能,以探索替代路径。

容错

最后,检查点还提供了容错和错误恢复:如果一个或多个节点在给定的超步中失败,您可以从最后一个成功步骤重新启动图。此外,当图节点在给定超步中执行中途失败时,LangGraph 储存来自任何其他在该超步中成功完成的节点的挂起检查点写入,这样每当我们从该超步恢复图执行时,就不会重新运行成功的节点。

挂起写入

此外,当图节点在给定超步中执行中途失败时,LangGraph 会存储来自任何其他在该超步中成功完成的节点的挂起检查点写入,以便每当我们从该超步恢复图执行时,不会重新运行成功的节点。

优云智算