AI创想
标题:
【LangGraph】langgraph.graph.StateGraph 类的属性和方法
[打印本页]
作者:
创想小编
时间:
昨天 17:11
标题:
【LangGraph】langgraph.graph.StateGraph 类的属性和方法
作者:彬彬侠
1. StateGraph 类的概述
StateGraph 是 LangGraph 库的核心类,位于 langgraph.graph 模块中,用于定义和构建基于状态的工作流图。它以
有向图
的形式组织任务,其中:
节点(Nodes)
表示执行的具体操作(如调用 LLM、处理数据)。
边(Edges)
定义节点之间的执行顺序(可以是有条件的)。
状态(State)
是一个贯穿图执行的数据结构,用于存储和传递上下文信息。
StateGraph 的主要作用是:
提供一个结构化的方式来定义复杂的工作流,支持分支、循环和动态逻辑。管理状态的更新和传递,确保节点之间信息的一致性。支持编译为可执行的图对象,用于运行工作流。
StateGraph 通常用于构建智能 Agent、对话系统、自动化流程等需要多步骤交互的场景。
2. StateGraph 类的定义和初始化
2.1 类定义
StateGraph 类的完整定义如下:
from langgraph.graph import StateGraph
from typing import Type, Optional, Any, Dict, Union
from langchain_core.runnables import RunnableConfig
classStateGraph:def__init__(
self,
state_schema: Optional[Type[Any]]=None,
config_schema: Optional[Type[Any]]=None)->None:...
复制代码
参数
:
state_schema:定义状态的数据结构,通常是一个 TypedDict 或自定义类。如果未提供,默认为 dict。config_schema:定义运行时配置的结构,通常是一个 TypedDict,用于验证 RunnableConfig 中的字段。如果未提供,默认为任意配置。
2.2 状态和配置的作用
状态(State)
:state_schema 定义了图中传递的数据结构,例如用户输入、LLM 输出、工具调用结果等。状态在节点间共享,节点可以读取和更新状态。
配置(Config)
:config_schema 定义了运行时的配置参数(如 thread_id、user_id),通过 RunnableConfig 传递,节点和工具可以访问这些参数。
2.3 初始化示例
以下是一个简单的初始化示例:
from typing import TypedDict
from langgraph.graph import StateGraph
# 定义状态classState(TypedDict):input:str
output:str# 定义配置(可选)classConfigSchema(TypedDict):
user_id:str# 创建 StateGraph
workflow = StateGraph(state_schema=State, config_schema=ConfigSchema)
复制代码
3. StateGraph 的属性
StateGraph 类的属性主要用于存储图的结构和元数据。以下是其核心属性(部分属性是内部使用的,开发者通常通过方法间接访问):
nodes
:
类型:Dict[str, Callable]描述:存储图中所有节点的字典,键是节点名称(字符串),值是节点的执行函数(通常是 Callable[[State, Optional[RunnableConfig]], State])。用途:记录已添加的节点及其逻辑。
edges
:
类型:Dict[str, List[str]]描述:存储无条件边的映射,键是源节点名称,值是目标节点名称的列表。用途:定义节点之间的直接跳转关系。
conditional_edges
:
类型:Dict[str, Tuple[Callable, Dict[str, str]]]描述:存储条件边的映射,键是源节点名称,值是一个元组,包含条件函数和条件值到目标节点的映射。用途:支持动态跳转逻辑。
entry_point
:
类型:str描述:图的入口节点名称,通常由 set_entry_point 设置。用途:指定工作流开始执行的节点。
finish_points
:
类型:Set[str]描述:图的终止节点集合,通常由 set_finish_point 或 add_edge 到 END 设置。用途:定义工作流的结束点。
state_schema
:
类型:Type[Any]描述:状态的数据结构,初始化时通过 state_schema 参数指定。用途:确保状态的类型安全和一致性。
config_schema
:
类型:Type[Any]描述:配置的数据结构,初始化时通过 config_schema 参数指定。用途:验证运行时配置的字段。
注意
:这些属性通常是内部管理的,开发者不直接修改它们,而是通过 StateGraph 的方法(如 add_node、add_edge)来操作图结构。
4. StateGraph 的方法
StateGraph 提供了丰富的 API 用于构建和配置工作流图。以下是其主要方法,分为
构建图
、
设置入口/出口
和
编译图
三类:
4.1 构建图的方法
add_node(name: str, action: Callable) -> None
功能
:向图中添加一个节点。
参数
:
name:节点名称(字符串,唯一标识)。action:节点的执行函数,签名通常为 Callable[[State, Optional[RunnableConfig]], State],接收状态和可选配置,返回更新后的状态。
用途
:定义工作流中的操作单元。
示例
:
defprocess_input(state: State, config: RunnableConfig)-> State:
state["output"]=f"处理: {state['input']}"return state
workflow.add_node("process_input", process_input)
复制代码
add_edge(source: Union[str, START, END], target: Union[str, END]) -> None
功能
:添加一条无条件边,指定从源节点到目标节点的跳转。
参数
:
source:源节点名称,可以是字符串、特殊常量 START(表示图入口)或 END(表示终止)。target:目标节点名称,可以是字符串或 END。
用途
:定义节点的执行顺序。
示例
:
from langgraph.graph import START, END
workflow.add_edge(START,"process_input")
workflow.add_edge("process_input", END)
复制代码
add_conditional_edges(source: str, condition: Callable, edge_mapping: Dict[Any, Union[str, END]]) -> None
功能
:添加条件边,根据条件函数的返回值动态选择目标节点。
参数
:
source:源节点名称(字符串)。condition:条件函数,签名通常为 Callable[[State], Any],接收状态并返回一个值(通常是字符串)。edge_mapping:条件值到目标节点的映射,键是条件函数的可能返回值,值是目标节点名称或 END。
用途
:实现动态分支逻辑。
示例
:
defroute(state: State)->str:return"process_input"iflen(state["input"])>5else"skip"
workflow.add_conditional_edges("check_input",
route,{"process_input":"process_input","skip":"skip"})
复制代码
4.2 设置入口和出口的方法
set_entry_point(name: str) -> None
功能
:设置图的入口节点。
参数
:
name:入口节点的名称(字符串)。
用途
:指定工作流开始执行的节点。
示例
:
workflow.set_entry_point("process_input")
复制代码
注意
:等效于 add_edge(START, name)。
set_finish_point(name: str) -> None
功能
:设置图的终止节点。
参数
:
name:终止节点的名称(字符串)。
用途
:指定工作流的结束点。
示例
:
workflow.set_finish_point("finalize_output")
复制代码
注意
:等效于 add_edge(name, END)。
4.3 编译图的方法
compile(checkpointer: Optional[BaseCheckpointSaver] = None, interrupt_after: Optional[List[str]] = None, interrupt_before: Optional[List[str]] = None) -> CompiledGraph
功能
:将 StateGraph 编译为可执行的 CompiledGraph 对象。
参数
:
checkpointer:检查点保存器,用于持久化状态(例如 MemorySaver 或数据库检查点)。如果为 None,状态不会持久化。interrupt_after:指定在哪些节点执行后中断,等待外部输入(用于交互式工作流)。interrupt_before:指定在哪些节点执行前中断。
返回值
:CompiledGraph 对象,支持 invoke、stream、astream 等方法运行工作流。
用途
:将图的定义转换为可执行形式。
示例
:
from langgraph.checkpoint.memory import MemorySaver
graph = workflow.compile(checkpointer=MemorySaver())
复制代码
5. 完整示例
以下是一个完整的示例,展示如何使用 StateGraph 构建一个具有条件分支的工作流:
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langchain_core.runnables import RunnableConfig
# 定义状态classState(TypedDict):input:str
output:str
needs_processing:bool# 定义配置(可选)classConfigSchema(TypedDict):
user_id:str# 节点函数defcheck_input(state: State, config: RunnableConfig)-> State:
state["needs_processing"]=len(state["input"])>5
user_id = config.get("configurable",{}).get("user_id","default_user")
state["output"]=f"检查输入 by {user_id}"return state
defprocess_input(state: State, config: RunnableConfig)-> State:
state["output"]=f"处理输入: {state['input']}"return state
defskip_processing(state: State, config: RunnableConfig)-> State:
state["output"]="输入太短,无需处理"return state
# 条件函数defroute(state: State)->str:return"process_input"if state["needs_processing"]else"skip_processing"# 创建 StateGraph
workflow = StateGraph(state_schema=State, config_schema=ConfigSchema)# 添加节点
workflow.add_node("check_input", check_input)
workflow.add_node("process_input", process_input)
workflow.add_node("skip_processing", skip_processing)# 添加边
workflow.add_edge(START,"check_input")
workflow.add_conditional_edges("check_input",
route,{"process_input":"process_input","skip_processing":"skip_processing"})
workflow.add_edge("process_input", END)
workflow.add_edge("skip_processing", END)# 编译图
graph = workflow.compile()# 运行工作流
result = graph.invoke({"input":"Hi","output":"","needs_processing":False},
config={"configurable":{"user_id":"user123"}})print(result)
复制代码
输出
(输入 “Hi”):
{'input':'Hi','output':'输入太短,无需处理','needs_processing':False}
复制代码
输出
(输入 “Hello, LangGraph!”):
{'input':'Hello, LangGraph!','output':'处理输入: Hello, LangGraph!','needs_processing':True}
复制代码
这个示例展示了:
使用 StateGraph 初始化状态和配置。添加节点、条件边和无条件边。编译和运行图。
6. CompiledGraph 的关键方法
编译后的 CompiledGraph 对象(通过 workflow.compile() 获得)支持以下主要方法,用于执行工作流:
invoke(input: Dict[str, Any], config: Optional[RunnableConfig] = None) -> Dict[str, Any]
同步执行整个图,返回最终状态。示例:
result = graph.invoke({"input":"test"}, config={"configurable":{"user_id":"user123"}})
复制代码
stream(input: Dict[str, Any], config: Optional[RunnableConfig] = None) -> Iterator[Dict[str, Any]]
流式执行图,逐节点返回中间状态。示例:
for state in graph.stream({"input":"test"}):print(state)
复制代码
astream(input: Dict[str, Any], config: Optional[RunnableConfig] = None) -> AsyncIterator[Dict[str, Any]]
异步流式执行图,适合高并发场景。示例:
asyncfor state in graph.astream({"input":"test"}):print(state)
复制代码
astream_events(input: Dict[str, Any], config: Optional[RunnableConfig] = None, version: str = "v1") -> AsyncIterator[Dict[str, Any]]
异步流式返回图执行的事件(如节点开始、结束),适合与 LangSmith 集成。示例:
asyncfor event in graph.astream_events({"input":"test"}, version="v1"):print(event)
复制代码
7. 高级功能和注意事项
7.1 检查点(Checkpointing)
使用 checkpointer 参数(如 MemorySaver 或数据库检查点),可以在图执行过程中保存状态,支持中断和恢复。
示例:
from langgraph.checkpoint.memory import MemorySaver
graph = workflow.compile(checkpointer=MemorySaver())
复制代码
7.2 中断(Interrupts)
使用 interrupt_after 或 interrupt_before 参数,可以在指定节点处暂停执行,等待外部输入。示例:
graph = workflow.compile(interrupt_after=["process_input"])
复制代码
7.3 异步支持
节点函数可以定义为异步函数(async def),配合 astream 或 astream_events 使用。
示例:
asyncdefasync_node(state: State, config: RunnableConfig)-> State:
state["output"]=await some_async_function(state["input"])return state
复制代码
7.4 注意事项
状态一致性
:确保节点函数返回的状态与 state_schema 一致,否则可能导致类型错误。
条件边逻辑
:add_conditional_edges 的 edge_mapping 必须覆盖条件函数的所有可能返回值。
版本兼容性
:使用最新版本的 LangGraph(如 0.2.x)以获得最佳性能和功能支持。
8. 与其他类的关系
CompiledGraph
:StateGraph.compile() 的返回值,是可执行的图对象。
RunnableConfig
:运行时配置,传递给节点和工具,包含 configurable、callbacks 等字段。
BaseCheckpointSaver
:用于状态持久化的检查点类,与 compile 的 checkpointer 参数配合使用。
9. 总结
langgraph.graph.StateGraph 是 LangGraph 的核心类,用于定义基于状态的工作流图。其主要功能包括:
通过 add_node 和 add_edge 构建图结构。通过 add_conditional_edges 实现动态分支。通过 set_entry_point 和 set_finish_point 指定执行路径。通过 compile 生成可执行的 CompiledGraph。
其属性(如 nodes、edges)管理图的内部结构,方法(如 add_node、compile)提供灵活的 API。结合状态管理和检查点功能,StateGraph 适合构建复杂的 LLM 驱动应用。
参考文献
:
LangGraph 官方文档:https://langchain-ai.github.io/langgraph/GitHub 仓库:https://github.com/langchain-ai/langgraphLangChain 文档:https://python.langchain.com/docs/
原文地址:https://blog.csdn.net/u013172930/article/details/147963138
欢迎光临 AI创想 (https://www.llms-ai.com/)
Powered by Discuz! X3.4