AI创想

标题: LangGraph认知篇-Persistence 持久化 [打印本页]

作者: 创想小编    时间: 6 小时前
标题: LangGraph认知篇-Persistence 持久化
作者:找了一圈尾巴
概述

        LangGraph 作为一个强大的工作流建模工具,其核心特性之一是内置的持久化层,该层通过检查点(checkpointers)实现。当用户为图(graph)配置检查点后,系统会在每个超级步骤(super-step)保存图状态的检查点,并将这些检查点关联到特定的线程(thread)中。这种机制不仅支持图执行后的状态访问,还赋能了多项关键能力:人机协作(human-in-the-loop)、记忆功能、时间旅行(time travel)以及容错性。
        持久化的核心价值在于打破了传统工作流执行的瞬时性限制。通过保存每一步的状态快照,开发者可以实现工作流的中断与恢复、历史轨迹回溯、多路径探索等高级功能,这对于构建复杂交互系统(如对话机器人、自动化审批流程)具有重要意义。
LangGraph持久化核心机制

(, 下载次数: 0)


langGraph 持久化核心机制

        在 LangGraph 的持久化机制中,超级步骤(Super-steps)、检查点(Checkpoints)、线程(Threads) 和状态快照(StateSnapshot) 是四个核心概念,它们共同构成了状态管理的基础,支撑了持久化的核心能力(如时间旅行、容错性、人机协作等)。
        四个概念通过 “执行 - 记录 - 存储” 的逻辑紧密关联:
        这种设计确保了图的状态可追溯、可恢复、可共享,是 LangGraph 持久化能力的核心支柱。
超级步骤-SuperStep

        在 LangGraph 中,“超级步骤”(Superstep)可以被视为图节点上的单次迭代,是工作流执行的原子单位。在每个超级步骤中,所有并行运行的节点属于同一个超级步骤,而顺序运行的节点则属于不同的超级步骤。具体来说:
LangGraph Superstep 的实现

        LangGraph 的执行机制受到了 Google 的 Pregel 系统的启发。Pregel 是一种用于大规模图处理的计算模型,其核心思想是通过离散的“超级步骤”进行迭代计算。在 Pregel 中,每个超级步骤包括以下步骤:
(, 下载次数: 0)


(, 下载次数: 0)


        LangGraph 在此基础上进行了扩展,支持更复杂的多代理协作和状态管理。在 LangGraph 的实现中,超级步骤的执行由 Pregel 算法负责调度。每个超级步骤中,活跃的节点会并行读取其订阅的通道中的数据,执行其内部逻辑,并生成输出。然后,这些输出会被写入到相应的通道中,Pregel 执行器会根据通道的合并策略更新图的共享状态。这种机制使得 LangGraph 能够高效地处理大规模图数据,并支持复杂的多代理协作。具体实现细节请参考文章:
Pregel 算法与分布式计算 | langchain-ai/langgraph - KoalaWiki
线程-Threads

        LangGraph 的Threads(线程) 是用于组织和管理检查点(checkpoints)的核心容器,它通过唯一标识符(thread_id)将一系列相关的状态快照串联起来,形成完整的执行轨迹。其核心作用是实现状态的累积、追溯和复用,是持久化机制的基础组件。
核心特性

线程的配置与使用

  1. config = {"configurable": {"thread_id": "some-thread", "checkpoint_id": "0c62ca34-ac19-445d-bbb0-5b4984975b2a"}}
复制代码
注意事项

检查点-Checkpoints

检查点是超级步骤执行完毕后保存的状态快照,用于记录图在特定时刻的完整状态和执行计划。它是持久化机制的 “数据载体”。
        检查点是图状态在某个超级步骤结束时的 “冻结帧”,包含了恢复图执行所需的全部信息。
官网代码示例
  1. from langgraph.graph import StateGraph, START, END
  2. from langgraph.checkpoint.memory import InMemorySaver
  3. from typing import Annotated
  4. from typing_extensions import TypedDict
  5. from operator import add
  6. class State(TypedDict):
  7.     foo: str
  8.     bar: Annotated[list[str], add]
  9. def node_a(state: State):
  10.     return {"foo": "a", "bar": ["a"]}
  11. def node_b(state: State):
  12.     return {"foo": "b", "bar": ["b"]}
  13. workflow = StateGraph(State)
  14. workflow.add_node(node_a)
  15. workflow.add_node(node_b)
  16. workflow.add_edge(START, "node_a")
  17. workflow.add_edge("node_a", "node_b")
  18. workflow.add_edge("node_b", END)
  19. checkpointer = InMemorySaver()
  20. graph = workflow.compile(checkpointer=checkpointer)
  21. config = {"configurable": {"thread_id": "1"}}
  22. graph.invoke({"foo": ""}, config)
复制代码
       运行图之后,我们预计会看到 4 个检查点:
状态快照-StateSnapshot

状态快照是检查点的具体数据类型,是检查点在代码层面的表现形式。它是对检查点所有属性的结构化封装。
        其属性包括:
        其核心作用包括:
        例如,调用graph.get_state(config)会返回当前线程的最新StateSnapshot,其中values字段包含当前图的所有状态数据,metadata字段包含执行的上下文信息(如步骤编号、写入记录等)。
核心功能

(, 下载次数: 0)


        LangGraph 中使用到Checkpoints 的核心功能包括:
获取当前最新状态- get_state

        在 LangGraph 中,当需要与已保存的图状态进行交互时,必须指定线程标识符(thread_id)。通过调用 graph.get_state(config) 方法,可以查看图的状态,该方法会返回一个 StateSnapshot 对象 —— 它对应于配置中提供的线程 ID(thread_id)所关联的最新检查点,若配置中还指定了检查点 ID(checkpoint_id),则返回该线程中该特定检查点的状态快照。
  1. config = {"configurable": {"thread_id": "1"}}  # 仅指定线程ID
  2. graph.get_state(config)  # 返回该线程的最新StateSnapshot
复制代码
  1. # 同时指定线程ID和检查点ID
  2. config = {"configurable": {"thread_id": "1", "checkpoint_id": "1ef663ba-28fe-6528-8002-5a559208592c"}}
  3. graph.get_state(config)  # 返回该检查点对应的StateSnapshot
复制代码
  1. StateSnapshot(
  2.     values={'foo': 'b', 'bar': ['a', 'b']},  # 状态通道的当前值:foo为node_b的输出,bar累积了node_a和node_b的输出(因配置了reducer)
  3.     next=(),  # 下一个要执行的节点:空元组表示图已执行结束
  4.     config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},  # 当前快照的配置:包含所属线程ID和自身检查点ID
  5.     metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},  # 元数据:来源为"loop"(节点执行生成),node_b的写入记录,步骤编号为2
  6.     created_at='2024-08-29T19:19:38.821749+00:00',  # 快照创建时间戳
  7.     parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},  # 父快照的配置:指向生成当前快照的上一个检查点
  8.     tasks=()  # 待执行任务:空元组表示无未完成任务
  9. )
复制代码
查询历史状态-get_state_history

        在 LangGraph 中,graph.get_state_history(config) 方法用于获取特定线程(thread)的图执行完整历史,返回与该线程 ID 关联的所有 StateSnapshot(状态快照)对象列表。
  1. config = {"configurable": {"thread_id": "1"}}
  2. list(graph.get_state_history(config))
复制代码
       get_state_history获取的内容示例如下:
  1. [
  2.     StateSnapshot(
  3.         values={'foo': 'b', 'bar': ['a', 'b']},
  4.         next=(),
  5.         config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},
  6.         metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},
  7.         created_at='2024-08-29T19:19:38.821749+00:00',
  8.         parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},
  9.         tasks=(),
  10.     ),
  11.     StateSnapshot(
  12.         values={'foo': 'a', 'bar': ['a']}, next=('node_b',),
  13.         config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},
  14.         metadata={'source': 'loop', 'writes': {'node_a': {'foo': 'a', 'bar': ['a']}}, 'step': 1},
  15.         created_at='2024-08-29T19:19:38.819946+00:00',
  16.         parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
  17.         tasks=(PregelTask(id='6fb7314f-f114-5413-a1f3-d37dfe98ff44', name='node_b', error=None, interrupts=()),),
  18.     ),
  19.     StateSnapshot(
  20.         values={'foo': '', 'bar': []},
  21.         next=('node_a',),
  22.         config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
  23.         metadata={'source': 'loop', 'writes': None, 'step': 0},
  24.         created_at='2024-08-29T19:19:38.817813+00:00',
  25.         parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},
  26.         tasks=(PregelTask(id='f1b14528-5ee5-579c-949b-23ef9bfbed58', name='node_a', error=None, interrupts=()),),
  27.     ),
  28.     StateSnapshot(
  29.         values={'bar': []},
  30.         next=('__start__',),
  31.         config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},
  32.         metadata={'source': 'input', 'writes': {'foo': ''}, 'step': -1},
  33.         created_at='2024-08-29T19:19:38.816205+00:00',
  34.         parent_config=None,
  35.         tasks=(PregelTask(id='6d27aa2e-d72b-5504-a36f-8620e54a76dd', name='__start__', error=None, interrupts=()),),
  36.     )
  37. ]
复制代码
重放-replay

        在 LangGraph 中,Replay(重放) 指的是重新执行图(graph)的历史执行过程,其核心能力是:基于指定的检查点(checkpoint),复用该检查点之前的已执行步骤结果,仅重新执行该检查点之后的步骤,从而实现历史轨迹的回溯或衍生新的执行分支。
Replay 的本质是 “选择性复用历史执行结果”:当需要重新执行图时,通过指定线程 ID(thread_id)和特定检查点 ID(checkpoint_id),系统会自动识别该检查点之前的所有步骤(已执行过),直接复用其结果(不重复执行);而该检查点之后的步骤(无论是否执行过),都会被重新执行,形成新的执行分支。
  1. # 配置中同时指定线程ID和检查点ID
  2. config = {"configurable": {"thread_id": "1", "checkpoint_id": "0c62ca34-ac19-445d-bbb0-5b4984975b2a"}}
  3. # 调用图执行重放
  4. graph.invoke(None, config=config)
复制代码
        Replay 是实现 “时间旅行(time-travel)” 的核心机制,主要用于:
修改状态-Update state

        在 LangGraph 中,graph.update_state() 是用于编辑图(graph)状态的核心方法,支持修改现有线程的状态或从特定检查点创建新的状态分支。它通过灵活的参数配置,实现对状态的精细调整,同时遵循图的状态通道(channel)规则(如 reducer 逻辑)。
   update_state() 允许开发者直接修改图的状态,既可以更新当前线程的最新状态,也可以基于历史检查点创建新的状态分支(fork)。其修改逻辑与节点(node)对状态的更新保持一致,确保状态变更的规范性。
关键参数解析
  1. graph.update_state(config, {"foo": 2, "bar": ["b"]})
复制代码
使用场景

LangGraph - Persistence 完整代码示例
  1. from typing import TypedDict, Annotated
  2. from langgraph.graph import StateGraph, START, END
  3. from langgraph.checkpoint.memory import MemorySaver
  4. import operator
  5. # 定义状态类型
  6. class State(TypedDict):
  7.     value: Annotated[list[str], operator.add]  # 累积值,用于跟踪执行轨迹
  8. # 节点函数:模拟超级步骤1
  9. def superstep_1(state: State):
  10.     new_value = "超级步骤1: 初始化数据"
  11.     return {"value": [new_value]}
  12. # 节点函数:模拟超级步骤2
  13. def superstep_2(state: State):
  14.     new_value = "超级步骤2: 处理数据"
  15.     return {"value": [new_value]}
  16. # 节点函数:模拟超级步骤3
  17. def superstep_3(state: State):
  18.     new_value = "超级步骤3: 完成计算"
  19.     return {"value": [new_value]}
  20. # 构建状态图
  21. workflow = StateGraph(State)
  22. workflow.add_node("superstep_1", superstep_1)
  23. workflow.add_node("superstep_2", superstep_2)
  24. workflow.add_node("superstep_3", superstep_3)
  25. # 定义边:按超级步骤顺序执行
  26. workflow.add_edge(START, "superstep_1")
  27. workflow.add_edge("superstep_1", "superstep_2")
  28. workflow.add_edge("superstep_2", "superstep_3")
  29. workflow.add_edge("superstep_3", END)
  30. # 使用MemorySaver启用持久化检查点
  31. checkpointer = MemorySaver()
  32. graph = workflow.compile(checkpointer=checkpointer)
  33. # 配置线程ID,形成执行轨迹
  34. config = {"configurable": {"thread_id": "example_thread"}}
  35. # 执行图:每个超级步骤后自动生成检查点
  36. input_data = {"value": []}
  37. final_state = graph.invoke(input_data, config)
  38. print("最终状态:", final_state)
  39. # 查询当前检查点(StateSnapshot)
  40. current_snapshot = graph.get_state(config)
  41. print("\n当前检查点快照:")
  42. print("值:", current_snapshot.values)
  43. print("下一个节点:", current_snapshot.next)
  44. print("检查点ID:", current_snapshot.config["configurable"]["checkpoint_id"])
  45. # 查询所有历史检查点,形成完整执行轨迹
  46. print("\n执行轨迹(所有历史检查点):")
  47. for snapshot in graph.get_state_history(config):
  48.     print("检查点ID:", snapshot.config["configurable"]["checkpoint_id"])
  49.     print("值:", snapshot.values)
  50.     print("下一个节点:", snapshot.next)
  51.     print("---")
  52. # 重放:从特定检查点恢复执行(例如,从超级步骤2后的检查点开始)
  53. # 假设我们取第二个检查点(超级步骤1后)
  54. history = list(graph.get_state_history(config))
  55. if len(history) > 2:
  56.     replay_config = history[2].config  # 取超级步骤1后的检查点
  57.     print("\n从检查点重放执行:")
  58.     replay_state = graph.invoke(None, replay_config)  # 从检查点继续执行
  59.     print("重放后状态:", replay_state)
  60. # 修改状态:更新特定检查点的值,然后继续执行
  61. update_config = current_snapshot.config
  62. graph.update_state(update_config, {"value": ["修改后的值: 自定义轨迹"]})
  63. print("\n修改后检查点快照:")
  64. updated_snapshot = graph.get_state(update_config)
  65. print("修改值:", updated_snapshot.values)
  66. # 从修改后的检查点继续执行剩余部分
  67. continued_state = graph.invoke(None, update_config)
  68. print("从修改检查点继续执行后的最终状态:", continued_state)
复制代码
参考文献

Pregel 算法与分布式计算 | langchain-ai/langgraph - KoalaWiki
Overview

原文地址:https://blog.csdn.net/weixin_41645817/article/details/149813502




欢迎光临 AI创想 (https://www.llms-ai.com/) Powered by Discuz! X3.4