作者:CSDN博客
特点、功能与应用场景
核心特点:
有状态处理: 支持维护和更新共享状态(State),适合需要保留上下文的多步骤任务循环与分支: 内置对循环流程(Loop)和条件分支(Conditional Branching)的支持多参与者协作: 可协调多个LLM/工具/人工的协作流程灵活的状态管理: 使用JSON-like的共享状态对象,支持动态字段修改LangChain集成: 与LangChain生态无缝衔接,可直接使用LCEL语法
主要功能:
构建带循环/分支的复杂工作流实现带记忆的对话系统创建多AI协同的复杂推理链开发需要多次迭代优化的生成流程管理需要人工介入的混合工作流
典型应用场景:
多轮对话系统(带记忆和上下文管理)代码生成与迭代优化复杂问题的分步求解多专家协同推理系统带人工审核的生成流程
介绍
一、LangGraph的背景
定位:LangGraph是LangChain的扩展库,专注于构建带有状态循环和多参与者协作的应用(如复杂代理系统、自治工作流)。它弥补了LangChain在处理动态流程控制和长时状态跟踪的不足。
典型场景:
多步骤决策(如AI根据反馈循环调整回答)多AI协作系统(如多个专家模型接力处理任务)需持久化中间状态的对话系统
二、核心概念解析
1. 状态(State)
本质:贯穿整个工作流的共享数据结构特性:每次节点执行后会更新,类型通常为dict示例状态:
- {"user_input":"如何学习机器学习?","search_results":[...],"draft_answer":"首先需要掌握线性代数...","revised_answer":None}
复制代码 2. 节点(Node)
定义:执行具体任务的函数
- 特征:
接收当前state作为输入返回state的更新部分(partial updates)
示例节点:
- defsearch_node(state):
- query = state["user_input"]
- results = web_search(query)return{"search_results": results}
复制代码 3. 边(Edge)
作用:决定流程走向
- 类型:
普通边:无条件跳转(如always_go_to)条件边:根据状态决定路径(通过conditional_edge)
三、核心组件详解
1. StateGraph 类
职责:管理图结构和工作流状态
- 关键方法:
add_node(name, func):添加命名节点add_edge(start_node, end_node):连接节点add_conditional_edges():添加条件分支set_entry_point():设置入口节点compile():编译为可执行图
2. 条件判断
实现方式:通过RunnableLambda定义判断逻辑示例:
- from langgraph.checkpoint import BaseCheckpointSaver
- defshould_revise(state):return"revise"if needs_revision(state)else"finalize"
复制代码 四、完整工作流示例
场景:问答系统优化流程
- from langgraph.graph import StateGraph
- # 定义状态类型from typing import TypedDict, List, Annotated
- classQaState(TypedDict):
- question:str
- draft:str
- revisions: List[str]
- final: Annotated[str,"最终回答"]# 构建节点defdraft_answer(state: QaState):return{"draft": llm.generate(f"回答:{state['question']}")}defreview_answer(state: QaState):return{"revisions":[llm.generate(f"优化建议:{state['draft']}")]}deffinalize_answer(state: QaState):return{"final": state['revisions'][-1]}# 构建图
- graph = StateGraph(QaState)
- graph.add_node("draft", draft_answer)
- graph.add_node("review", review_answer)
- graph.add_node("final", finalize_answer)# 设置边
- graph.add_edge("draft","review")
- graph.add_conditional_edges("review",lambda s:"final"iflen(s['revisions'])>=2else"review",{"final":"final","review":"review"})
- graph.set_entry_point("draft")# 编译执行
- app = graph.compile()
- result = app.invoke({"question":"解释量子计算原理"})print(result["final"])
复制代码 五、高级功能
1. 持久化检查点
- from langgraph.checkpoint import FileCheckpoint
- app = graph.compile(
- checkpointer=FileCheckpoint("./checkpoints/"))
复制代码 2. 并行执行
通过ToolNode实现并行调用:- from langgraph.prebuilt import ToolNode
- parallel_tools = ToolNode([web_search, docs_lookup])
复制代码 3. 人工干预
配置人工审核节点:- defhuman_review(state):
- show_gui(state["draft"])return{"approved": get_user_input()}
复制代码 六、设计模式建议
状态精简原则:只保留必要字段,使用Annotated添加元数据节点原子化:每个节点完成单一职责错误处理:
- try:
- app.invoke(...)except LangGraphError as e:
- handle_error(e.current_state)
复制代码 断点保存和恢复
在 LangGraph 中,中断执行和保存后重新载入执行是一个重要的功能,尤其是在处理长时间运行的任务或需要暂停任务的场景时。
1. LangGraph 的状态管理机制
LangGraph 的执行状态通常包括以下信息:
当前执行到的节点(current_node)。每个节点的输出结果(node_outputs)。工具调用的结果(如 API 返回的数据)。用户输入和上下文信息(context)。
为了支持中断和恢复执行,LangGraph 需要将这些状态信息持久化存储,并在恢复时重新加载。
2. 中断执行与保存状态
步骤 1:设计中断点
在 LangGraph 中,可以通过以下方式定义中断点:
手动触发:用户主动请求中断。自动触发:某些节点(如工具调用或等待用户输入的节点)完成后自动保存状态并暂停。
步骤 2:保存状态
保存状态的核心是将当前执行的状态序列化为一个可持久化的格式(如 JSON 或数据库记录)。以下是需要保存的关键信息:- {"graph_id":"example_graph_001",// 图的唯一标识"current_node":"node3",// 当前执行到的节点"node_outputs":{// 每个节点的输出结果"node1":{"output":"weather","timestamp":"2023-10-01T12:00:00Z"},"node2":{"output":{"location":"Beijing","weather":"Sunny"},"timestamp":"2023-10-01T12:05:00Z"}},"context":{// 上下文信息"user_input":"What's the weather in Beijing?","session_id":"session_123456"},"status":"paused"// 当前状态(running, paused, completed)}
复制代码 步骤 3:存储状态
可以将上述状态信息存储到以下媒介之一:
文件系统:将状态保存为 JSON 文件。数据库:使用关系型数据库(如 MySQL)或 NoSQL 数据库(如 MongoDB)存储状态。缓存系统:对于短期中断,可以使用 Redis 等缓存系统。
3. 重新载入执行
步骤 1:加载状态
从存储媒介中读取之前保存的状态信息。例如,从数据库中查询状态记录:- SELECT*FROM graph_states WHERE session_id ='session_123456';
复制代码 或者从文件中加载:- import json
- withopen('state.json','r')as f:
- state = json.load(f)
复制代码 步骤 2:恢复执行
根据加载的状态信息,恢复执行流程:
定位当前节点:根据 current_node 字段找到图中当前执行到的节点。恢复上下文:将 context 和 node_outputs 恢复到内存中。继续执行:从 current_node 开始继续执行后续节点。
示例代码(伪代码):- defresume_execution(graph, state):# 恢复上下文
- graph.set_context(state["context"])# 恢复节点输出for node_id, output in state["node_outputs"].items():
- graph.set_node_output(node_id, output)# 定位当前节点并继续执行
- current_node = state["current_node"]
- graph.start_from_node(current_node)# 示例调用
- state = load_state_from_storage()# 加载状态
- resume_execution(my_graph, state)# 恢复执行
复制代码 4. 示例场景
假设我们有一个任务图,用于处理用户提问并调用外部 API 获取天气信息。任务流程如下:
用户提问:“北京的天气怎么样?”系统分析问题类型(classify_question),判断为天气问题。调用天气 API 获取北京的天气信息。生成回复并返回给用户。
执行过程中断
假设在步骤 3 调用天气 API 时,系统需要等待 API 返回结果。在等待期间,系统保存当前状态(current_node 为 node2,context 包含用户提问和会话 ID)。
保存状态
保存的状态信息可能如下:- {"graph_id":"weather_graph","current_node":"node2","node_outputs":{"node1":{"output":"weather","timestamp":"2023-10-01T12:00:00Z"}},"context":{"user_input":"What's the weather in Beijing?","session_id":"session_123456"},"status":"paused"}
复制代码 恢复执行
当天气 API 返回结果后,系统从存储中加载状态并恢复执行:
定位到 current_node(node2)。使用 API 返回的天气数据更新上下文。继续执行后续节点(node3 生成回复)。
5. 关键实现细节
状态持久化的设计
唯一标识:为每个会话分配唯一的 session_id,便于区分不同用户的任务。版本控制:如果图结构可能发生变化,建议记录 graph_version,确保状态与图版本匹配。加密与安全:敏感信息(如用户输入或工具调用结果)应加密存储。
并发控制
如果多个用户同时执行任务,需确保状态存储的并发安全性(如使用数据库事务或分布式锁)。
工具调用的异步处理
- 对于耗时较长的工具调用(如 API 请求),可以采用异步机制:
在工具调用节点保存状态并暂停执行。使用回调或轮询机制检测工具调用完成状态。恢复执行时加载最新状态。
6. 总结
LangGraph 的中断执行与保存/恢复功能通过以下步骤实现:
中断时:保存当前执行状态(包括节点位置、输出结果和上下文)。恢复时:从存储中加载状态信息,定位当前节点并继续执行。
这种机制不仅提高了系统的灵活性,还支持长时间运行任务的分段处理和容错能力。在实际应用中,可以根据具体需求选择合适的存储媒介和状态管理策略。
原文地址:https://blog.csdn.net/weixin_44714085/article/details/146520470 |