作者:找了一圈尾巴
概述
LangGraph 作为一个强大的工作流建模工具,其核心特性之一是内置的持久化层,该层通过检查点(checkpointers)实现。当用户为图(graph)配置检查点后,系统会在每个超级步骤(super-step)保存图状态的检查点,并将这些检查点关联到特定的线程(thread)中。这种机制不仅支持图执行后的状态访问,还赋能了多项关键能力:人机协作(human-in-the-loop)、记忆功能、时间旅行(time travel)以及容错性。
持久化的核心价值在于打破了传统工作流执行的瞬时性限制。通过保存每一步的状态快照,开发者可以实现工作流的中断与恢复、历史轨迹回溯、多路径探索等高级功能,这对于构建复杂交互系统(如对话机器人、自动化审批流程)具有重要意义。
LangGraph持久化核心机制
langGraph 持久化核心机制
在 LangGraph 的持久化机制中,超级步骤(Super-steps)、检查点(Checkpoints)、线程(Threads) 和状态快照(StateSnapshot) 是四个核心概念,它们共同构成了状态管理的基础,支撑了持久化的核心能力(如时间旅行、容错性、人机协作等)。
四个概念通过 “执行 - 记录 - 存储” 的逻辑紧密关联:
图执行时,按超级步骤划分执行阶段;
每个超级步骤结束后,生成对应的检查点(本质是StateSnapshot对象);
所有检查点被关联到特定线程(通过thread_id),形成完整的执行轨迹;
开发者通过StateSnapshot对象访问检查点数据,实现状态查询、重放、修改等操作。
这种设计确保了图的状态可追溯、可恢复、可共享,是 LangGraph 持久化能力的核心支柱。
超级步骤-SuperStep
在 LangGraph 中,“超级步骤”(Superstep)可以被视为图节点上的单次迭代,是工作流执行的原子单位。在每个超级步骤中,所有并行运行的节点属于同一个超级步骤,而顺序运行的节点则属于不同的超级步骤。具体来说:
初始化状态:图执行开始时,所有节点都处于非活动状态。
激活节点:当节点在其任何传入边(或“通道”)上接收到新消息(状态)时,它将变为活跃状态。
执行与更新:活跃节点运行其函数并生成更新响应。
终止条件:在每个超级步骤结束时,没有传入消息的节点通过将其标记为非活动状态来投票停止。当所有节点都处于非活动状态且没有消息在传输时,图执行终止。
LangGraph Superstep 的实现
LangGraph 的执行机制受到了 Google 的 Pregel 系统的启发。Pregel 是一种用于大规模图处理的计算模型,其核心思想是通过离散的“超级步骤”进行迭代计算。在 Pregel 中,每个超级步骤包括以下步骤:
消息接收:顶点接收来自邻居的消息。
状态更新:顶点根据接收到的消息更新其状态。
消息发送:顶点向邻居发送新的消息。
终止条件:当所有顶点都没有更新或达到最大迭代次数时,图执行终止。
LangGraph 在此基础上进行了扩展,支持更复杂的多代理协作和状态管理。在 LangGraph 的实现中,超级步骤的执行由 Pregel 算法负责调度。每个超级步骤中,活跃的节点会并行读取其订阅的通道中的数据,执行其内部逻辑,并生成输出。然后,这些输出会被写入到相应的通道中,Pregel 执行器会根据通道的合并策略更新图的共享状态。这种机制使得 LangGraph 能够高效地处理大规模图数据,并支持复杂的多代理协作。具体实现细节请参考文章:
Pregel 算法与分布式计算 | langchain-ai/langgraph - KoalaWiki
线程-Threads
LangGraph 的Threads(线程) 是用于组织和管理检查点(checkpoints)的核心容器,它通过唯一标识符(thread_id)将一系列相关的状态快照串联起来,形成完整的执行轨迹。其核心作用是实现状态的累积、追溯和复用,是持久化机制的基础组件。
核心特性
唯一标识性:每个线程由唯一的 thread_id 区分(如 "1"、"user_chat_001" 等),这是线程的 “身份证”。当调用带有检查点的图(graph)时,必须在配置中显式指定 thread_id(格式为 {"configurable": {"thread_id": "你的线程ID"}),否则无法关联和存储状态。
状态累积容器:线程本质上是检查点的集合,它会累积同一业务流程中所有超级步骤(super-steps)产生的检查点。例如:
在对话系统中,一个用户的单次会话可对应一个线程,所有对话轮次产生的检查点(如用户提问、AI 回复、工具调用结果等)都会被存储在该线程中。
在自动化工作流中,一个任务的完整执行过程(从启动到结束的所有节点状态)会被打包到同一个线程。
线程的配置与使用
- 在调用 LangGraph 图时,必须通过 config 参数指定 thread_id,以确保图的执行与正确的线程关联。例如:
- config = {"configurable": {"thread_id": "some-thread"}}
- result = graph.invoke(..., config)
复制代码 如果需要从某个特定的检查点开始执行,还可以通过 checkpoint_id 指定起始点。例如:
- config = {"configurable": {"thread_id": "some-thread", "checkpoint_id": "0c62ca34-ac19-445d-bbb0-5b4984975b2a"}}
复制代码 注意事项
创建时机:在执行图的运行(run)前,必须先创建线程(可通过 LangGraph 平台 API 的端点完成),否则状态无法持久化。
关联范围:同一线程内的检查点共享相同的业务上下文(如同一用户、同一任务),不同线程的检查点相互隔离,避免状态混淆。
平台集成:LangGraph 平台提供了专门的 API 端点用于线程的创建、查询、管理(如获取线程列表、删除过期线程等),方便开发者在生产环境中规模化管理状态。
检查点-Checkpoints
检查点是超级步骤执行完毕后保存的状态快照,用于记录图在特定时刻的完整状态和执行计划。它是持久化机制的 “数据载体”。
检查点是图状态在某个超级步骤结束时的 “冻结帧”,包含了恢复图执行所需的全部信息。
官网代码示例
- from langgraph.graph import StateGraph, START, END
- from langgraph.checkpoint.memory import InMemorySaver
- from typing import Annotated
- from typing_extensions import TypedDict
- from operator import add
- class State(TypedDict):
- foo: str
- bar: Annotated[list[str], add]
- def node_a(state: State):
- return {"foo": "a", "bar": ["a"]}
- def node_b(state: State):
- return {"foo": "b", "bar": ["b"]}
- workflow = StateGraph(State)
- workflow.add_node(node_a)
- workflow.add_node(node_b)
- workflow.add_edge(START, "node_a")
- workflow.add_edge("node_a", "node_b")
- workflow.add_edge("node_b", END)
- checkpointer = InMemorySaver()
- graph = workflow.compile(checkpointer=checkpointer)
- config = {"configurable": {"thread_id": "1"}}
- graph.invoke({"foo": ""}, config)
复制代码 运行图之后,我们预计会看到 4 个检查点:
空检查点,其中下一个要执行的节点为 START
包含用户输入 {'foo': '', 'bar': []} 的检查点,下一个要执行的节点为 node_a
包含 node_a 输出 {'foo': 'a', 'bar': ['a']} 的检查点,下一个要执行的节点为 node_b
注意,由于我们为 bar 值配置了归约器,因此 bar 通道的值包含两个节点的输出。
状态快照-StateSnapshot
状态快照是检查点的具体数据类型,是检查点在代码层面的表现形式。它是对检查点所有属性的结构化封装。
其属性包括:
values:状态通道的具体取值(如对话历史、中间计算结果等);
next:下一个要执行的节点名称(元组形式),定义了后续执行路径;
config:与该检查点关联的配置(如所属线程 ID、检查点自身 ID 等);
metadata:元数据(如执行来源、步骤编号、节点写入记录等);
tasks:当前步骤中需要执行的任务(元组形式),每个任务对应一个 PregelTask 对象(包含任务 ID、名称、可能的错误或中断信息)。
created_at:快照创建的时间戳(ISO 格式字符串,如 2024-08-30T12:00:00+00:00)
parent_config:用于获取父快照的配置(可选)。
interrupts:当前步骤中出现的、待解决的中断(元组形式),每个中断对应一个 Interrupt 对象。
其核心作用包括:
数据载体:通过StateSnapshot对象,开发者可以直接访问检查点的所有信息(如values查看状态取值、next查看后续节点)。
操作基础:无论是获取最新状态(graph.get_state())、查询历史(graph.get_state_history()),还是基于检查点重放执行(Time travel),最终返回的结果都是StateSnapshot对象。
例如,调用graph.get_state(config)会返回当前线程的最新StateSnapshot,其中values字段包含当前图的所有状态数据,metadata字段包含执行的上下文信息(如步骤编号、写入记录等)。
核心功能
LangGraph 中使用到Checkpoints 的核心功能包括:
获取当前最新状态- get_state
查询历史状态-get_state_history
时间旅行(time-travel)的核心机制Replay 的实现
修改状态-update_state
获取当前最新状态- get_state
在 LangGraph 中,当需要与已保存的图状态进行交互时,必须指定线程标识符(thread_id)。通过调用 graph.get_state(config) 方法,可以查看图的状态,该方法会返回一个 StateSnapshot 对象 —— 它对应于配置中提供的线程 ID(thread_id)所关联的最新检查点,若配置中还指定了检查点 ID(checkpoint_id),则返回该线程中该特定检查点的状态快照。
- 获取最新状态快照:只需在配置中指定线程 ID(thread_id),即可获取该线程当前最新的状态:
- config = {"configurable": {"thread_id": "1"}} # 仅指定线程ID
- graph.get_state(config) # 返回该线程的最新StateSnapshot
复制代码
- 获取特定检查点的状态快照:若要获取线程中某个历史检查点的状态,需在配置中同时指定线程 ID 和检查点 ID(checkpoint_id):
- # 同时指定线程ID和检查点ID
- config = {"configurable": {"thread_id": "1", "checkpoint_id": "1ef663ba-28fe-6528-8002-5a559208592c"}}
- graph.get_state(config) # 返回该检查点对应的StateSnapshot
复制代码- StateSnapshot(
- values={'foo': 'b', 'bar': ['a', 'b']}, # 状态通道的当前值:foo为node_b的输出,bar累积了node_a和node_b的输出(因配置了reducer)
- next=(), # 下一个要执行的节点:空元组表示图已执行结束
- config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}}, # 当前快照的配置:包含所属线程ID和自身检查点ID
- metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2}, # 元数据:来源为"loop"(节点执行生成),node_b的写入记录,步骤编号为2
- created_at='2024-08-29T19:19:38.821749+00:00', # 快照创建时间戳
- parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}}, # 父快照的配置:指向生成当前快照的上一个检查点
- tasks=() # 待执行任务:空元组表示无未完成任务
- )
复制代码 查询历史状态-get_state_history
在 LangGraph 中,graph.get_state_history(config) 方法用于获取特定线程(thread)的图执行完整历史,返回与该线程 ID 关联的所有 StateSnapshot(状态快照)对象列表。- config = {"configurable": {"thread_id": "1"}}
- list(graph.get_state_history(config))
复制代码 get_state_history获取的内容示例如下:- [
- StateSnapshot(
- values={'foo': 'b', 'bar': ['a', 'b']},
- next=(),
- config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},
- metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},
- created_at='2024-08-29T19:19:38.821749+00:00',
- parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},
- tasks=(),
- ),
- StateSnapshot(
- values={'foo': 'a', 'bar': ['a']}, next=('node_b',),
- config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},
- metadata={'source': 'loop', 'writes': {'node_a': {'foo': 'a', 'bar': ['a']}}, 'step': 1},
- created_at='2024-08-29T19:19:38.819946+00:00',
- parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
- tasks=(PregelTask(id='6fb7314f-f114-5413-a1f3-d37dfe98ff44', name='node_b', error=None, interrupts=()),),
- ),
- StateSnapshot(
- values={'foo': '', 'bar': []},
- next=('node_a',),
- config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
- metadata={'source': 'loop', 'writes': None, 'step': 0},
- created_at='2024-08-29T19:19:38.817813+00:00',
- parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},
- tasks=(PregelTask(id='f1b14528-5ee5-579c-949b-23ef9bfbed58', name='node_a', error=None, interrupts=()),),
- ),
- StateSnapshot(
- values={'bar': []},
- next=('__start__',),
- config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},
- metadata={'source': 'input', 'writes': {'foo': ''}, 'step': -1},
- created_at='2024-08-29T19:19:38.816205+00:00',
- parent_config=None,
- tasks=(PregelTask(id='6d27aa2e-d72b-5504-a36f-8620e54a76dd', name='__start__', error=None, interrupts=()),),
- )
- ]
复制代码 重放-replay
在 LangGraph 中,Replay(重放) 指的是重新执行图(graph)的历史执行过程,其核心能力是:基于指定的检查点(checkpoint),复用该检查点之前的已执行步骤结果,仅重新执行该检查点之后的步骤,从而实现历史轨迹的回溯或衍生新的执行分支。
Replay 的本质是 “选择性复用历史执行结果”:当需要重新执行图时,通过指定线程 ID(thread_id)和特定检查点 ID(checkpoint_id),系统会自动识别该检查点之前的所有步骤(已执行过),直接复用其结果(不重复执行);而该检查点之后的步骤(无论是否执行过),都会被重新执行,形成新的执行分支。
使用方式:触发重放需在调用图时,在配置(config)的configurable部分同时指定thread_id(线程 ID)和checkpoint_id(目标检查点 ID),示例代码:
- # 配置中同时指定线程ID和检查点ID
- config = {"configurable": {"thread_id": "1", "checkpoint_id": "0c62ca34-ac19-445d-bbb0-5b4984975b2a"}}
- # 调用图执行重放
- graph.invoke(None, config=config)
复制代码智能复用历史步骤:LangGraph 能识别checkpoint_id对应的检查点之前的所有步骤(已执行过),仅 “重放” 这些步骤的结果(直接复用状态),不重新执行节点逻辑,避免冗余计算。
重新执行后续步骤:对于checkpoint_id对应的检查点之后的步骤,无论是否在历史中执行过,都会被重新执行,形成新的执行分支(new fork)。这允许基于历史状态探索新的执行路径(例如修改某个中间状态后继续执行)。
Replay 是实现 “时间旅行(time-travel)” 的核心机制,主要用于:
回溯历史执行过程,验证或调试特定阶段的状态演变;
基于历史检查点衍生新的执行路径(如修改中间结果后继续执行);
高效复用已有计算结果,减少重复执行成本。
修改状态-Update state
在 LangGraph 中,graph.update_state() 是用于编辑图(graph)状态的核心方法,支持修改现有线程的状态或从特定检查点创建新的状态分支。它通过灵活的参数配置,实现对状态的精细调整,同时遵循图的状态通道(channel)规则(如 reducer 逻辑)。
update_state() 允许开发者直接修改图的状态,既可以更新当前线程的最新状态,也可以基于历史检查点创建新的状态分支(fork)。其修改逻辑与节点(node)对状态的更新保持一致,确保状态变更的规范性。
关键参数解析
- graph.update_state(config, {"foo": 2, "bar": ["b"]})
复制代码config:指定目标线程与检查点
必填项:thread_id,用于标识要更新的线程(如 {"configurable": {"thread_id": "1"}})。
可选项:checkpoint_id,若指定,则从该检查点创建分支并更新状态(而非更新当前最新状态)。
示例:{"configurable": {"thread_id": "1", "checkpoint_id": "xxx"}} 表示基于 thread_id=1 线程中 checkpoint_id=xxx 的检查点进行状态修改。
values:要更新的状态值。values 是一个字典,包含需要修改的状态通道键值对。其更新逻辑遵循图的 reducer 规则(与节点更新状态的逻辑一致):
对于无 reducer 的通道(如示例中的 foo):update_state 会直接覆盖原 value。
对于有 reducer 的通道(如示例中的 bar,配置了 add reducer):update_state 会将新值与旧值通过 reducer 合并(而非覆盖)。
as_node:指定状态更新的 “来源节点”(可选)
作用:声明本次状态更新的虚拟来源节点(如 as_node="human_editor")。图的下一步执行节点会基于 “最后更新状态的节点” 决定,因此 as_node 可用于控制后续执行路径。
默认行为:若不指定,as_node 会自动设为上一个更新状态的节点(若唯一)。
使用场景
修正错误状态:当图的中间状态出现错误时,通过 update_state 直接修改,避免重新执行整个流程。
人工介入调整:在人机协作场景中(如人工审核后修改参数),通过 update_state 注入人工决策结果。
创建平行分支:基于历史检查点(checkpoint_id)创建新的状态分支,探索不同执行路径(如 “如果当时选择另一个参数,结果会怎样”)。
LangGraph - Persistence 完整代码示例
- from typing import TypedDict, Annotated
- from langgraph.graph import StateGraph, START, END
- from langgraph.checkpoint.memory import MemorySaver
- import operator
- # 定义状态类型
- class State(TypedDict):
- value: Annotated[list[str], operator.add] # 累积值,用于跟踪执行轨迹
- # 节点函数:模拟超级步骤1
- def superstep_1(state: State):
- new_value = "超级步骤1: 初始化数据"
- return {"value": [new_value]}
- # 节点函数:模拟超级步骤2
- def superstep_2(state: State):
- new_value = "超级步骤2: 处理数据"
- return {"value": [new_value]}
- # 节点函数:模拟超级步骤3
- def superstep_3(state: State):
- new_value = "超级步骤3: 完成计算"
- return {"value": [new_value]}
- # 构建状态图
- workflow = StateGraph(State)
- workflow.add_node("superstep_1", superstep_1)
- workflow.add_node("superstep_2", superstep_2)
- workflow.add_node("superstep_3", superstep_3)
- # 定义边:按超级步骤顺序执行
- workflow.add_edge(START, "superstep_1")
- workflow.add_edge("superstep_1", "superstep_2")
- workflow.add_edge("superstep_2", "superstep_3")
- workflow.add_edge("superstep_3", END)
- # 使用MemorySaver启用持久化检查点
- checkpointer = MemorySaver()
- graph = workflow.compile(checkpointer=checkpointer)
- # 配置线程ID,形成执行轨迹
- config = {"configurable": {"thread_id": "example_thread"}}
- # 执行图:每个超级步骤后自动生成检查点
- input_data = {"value": []}
- final_state = graph.invoke(input_data, config)
- print("最终状态:", final_state)
- # 查询当前检查点(StateSnapshot)
- current_snapshot = graph.get_state(config)
- print("\n当前检查点快照:")
- print("值:", current_snapshot.values)
- print("下一个节点:", current_snapshot.next)
- print("检查点ID:", current_snapshot.config["configurable"]["checkpoint_id"])
- # 查询所有历史检查点,形成完整执行轨迹
- print("\n执行轨迹(所有历史检查点):")
- for snapshot in graph.get_state_history(config):
- print("检查点ID:", snapshot.config["configurable"]["checkpoint_id"])
- print("值:", snapshot.values)
- print("下一个节点:", snapshot.next)
- print("---")
- # 重放:从特定检查点恢复执行(例如,从超级步骤2后的检查点开始)
- # 假设我们取第二个检查点(超级步骤1后)
- history = list(graph.get_state_history(config))
- if len(history) > 2:
- replay_config = history[2].config # 取超级步骤1后的检查点
- print("\n从检查点重放执行:")
- replay_state = graph.invoke(None, replay_config) # 从检查点继续执行
- print("重放后状态:", replay_state)
- # 修改状态:更新特定检查点的值,然后继续执行
- update_config = current_snapshot.config
- graph.update_state(update_config, {"value": ["修改后的值: 自定义轨迹"]})
- print("\n修改后检查点快照:")
- updated_snapshot = graph.get_state(update_config)
- print("修改值:", updated_snapshot.values)
- # 从修改后的检查点继续执行剩余部分
- continued_state = graph.invoke(None, update_config)
- print("从修改检查点继续执行后的最终状态:", continued_state)
复制代码 参考文献
Pregel 算法与分布式计算 | langchain-ai/langgraph - KoalaWiki
Overview
原文地址:https://blog.csdn.net/weixin_41645817/article/details/149813502 |