作者:彬彬侠
1. StateGraph 类的概述
StateGraph 是 LangGraph 库的核心类,位于 langgraph.graph 模块中,用于定义和构建基于状态的工作流图。它以 有向图 的形式组织任务,其中:
节点(Nodes) 表示执行的具体操作(如调用 LLM、处理数据)。边(Edges) 定义节点之间的执行顺序(可以是有条件的)。状态(State) 是一个贯穿图执行的数据结构,用于存储和传递上下文信息。
StateGraph 的主要作用是:
提供一个结构化的方式来定义复杂的工作流,支持分支、循环和动态逻辑。管理状态的更新和传递,确保节点之间信息的一致性。支持编译为可执行的图对象,用于运行工作流。
StateGraph 通常用于构建智能 Agent、对话系统、自动化流程等需要多步骤交互的场景。
2. StateGraph 类的定义和初始化
2.1 类定义
StateGraph 类的完整定义如下:- from langgraph.graph import StateGraph
- from typing import Type, Optional, Any, Dict, Union
- from langchain_core.runnables import RunnableConfig
- classStateGraph:def__init__(
- self,
- state_schema: Optional[Type[Any]]=None,
- config_schema: Optional[Type[Any]]=None)->None:...
复制代码
- 参数:
state_schema:定义状态的数据结构,通常是一个 TypedDict 或自定义类。如果未提供,默认为 dict。config_schema:定义运行时配置的结构,通常是一个 TypedDict,用于验证 RunnableConfig 中的字段。如果未提供,默认为任意配置。
2.2 状态和配置的作用
状态(State):state_schema 定义了图中传递的数据结构,例如用户输入、LLM 输出、工具调用结果等。状态在节点间共享,节点可以读取和更新状态。配置(Config):config_schema 定义了运行时的配置参数(如 thread_id、user_id),通过 RunnableConfig 传递,节点和工具可以访问这些参数。
2.3 初始化示例
以下是一个简单的初始化示例:- from typing import TypedDict
- from langgraph.graph import StateGraph
- # 定义状态classState(TypedDict):input:str
- output:str# 定义配置(可选)classConfigSchema(TypedDict):
- user_id:str# 创建 StateGraph
- workflow = StateGraph(state_schema=State, config_schema=ConfigSchema)
复制代码 3. StateGraph 的属性
StateGraph 类的属性主要用于存储图的结构和元数据。以下是其核心属性(部分属性是内部使用的,开发者通常通过方法间接访问):
nodes:
类型:Dict[str, Callable]描述:存储图中所有节点的字典,键是节点名称(字符串),值是节点的执行函数(通常是 Callable[[State, Optional[RunnableConfig]], State])。用途:记录已添加的节点及其逻辑。
edges:
类型:Dict[str, List[str]]描述:存储无条件边的映射,键是源节点名称,值是目标节点名称的列表。用途:定义节点之间的直接跳转关系。
conditional_edges:
类型:Dict[str, Tuple[Callable, Dict[str, str]]]描述:存储条件边的映射,键是源节点名称,值是一个元组,包含条件函数和条件值到目标节点的映射。用途:支持动态跳转逻辑。
entry_point:
类型:str描述:图的入口节点名称,通常由 set_entry_point 设置。用途:指定工作流开始执行的节点。
finish_points:
类型:Set[str]描述:图的终止节点集合,通常由 set_finish_point 或 add_edge 到 END 设置。用途:定义工作流的结束点。
state_schema:
类型:Type[Any]描述:状态的数据结构,初始化时通过 state_schema 参数指定。用途:确保状态的类型安全和一致性。
config_schema:
类型:Type[Any]描述:配置的数据结构,初始化时通过 config_schema 参数指定。用途:验证运行时配置的字段。
注意:这些属性通常是内部管理的,开发者不直接修改它们,而是通过 StateGraph 的方法(如 add_node、add_edge)来操作图结构。
4. StateGraph 的方法
StateGraph 提供了丰富的 API 用于构建和配置工作流图。以下是其主要方法,分为 构建图、设置入口/出口 和 编译图 三类:
4.1 构建图的方法
add_node(name: str, action: Callable) -> None
功能:向图中添加一个节点。
- 参数:
name:节点名称(字符串,唯一标识)。action:节点的执行函数,签名通常为 Callable[[State, Optional[RunnableConfig]], State],接收状态和可选配置,返回更新后的状态。
用途:定义工作流中的操作单元。
- 示例:
- defprocess_input(state: State, config: RunnableConfig)-> State:
- state["output"]=f"处理: {state['input']}"return state
- workflow.add_node("process_input", process_input)
复制代码 add_edge(source: Union[str, START, END], target: Union[str, END]) -> None
功能:添加一条无条件边,指定从源节点到目标节点的跳转。
- 参数:
source:源节点名称,可以是字符串、特殊常量 START(表示图入口)或 END(表示终止)。target:目标节点名称,可以是字符串或 END。
用途:定义节点的执行顺序。
- 示例:
- from langgraph.graph import START, END
- workflow.add_edge(START,"process_input")
- workflow.add_edge("process_input", END)
复制代码 add_conditional_edges(source: str, condition: Callable, edge_mapping: Dict[Any, Union[str, END]]) -> None
功能:添加条件边,根据条件函数的返回值动态选择目标节点。
- 参数:
source:源节点名称(字符串)。condition:条件函数,签名通常为 Callable[[State], Any],接收状态并返回一个值(通常是字符串)。edge_mapping:条件值到目标节点的映射,键是条件函数的可能返回值,值是目标节点名称或 END。
用途:实现动态分支逻辑。
- 示例:
- defroute(state: State)->str:return"process_input"iflen(state["input"])>5else"skip"
- workflow.add_conditional_edges("check_input",
- route,{"process_input":"process_input","skip":"skip"})
复制代码 4.2 设置入口和出口的方法
set_entry_point(name: str) -> None
功能:设置图的入口节点。
- 参数:
用途:指定工作流开始执行的节点。示例:- workflow.set_entry_point("process_input")
复制代码 注意:等效于 add_edge(START, name)。
set_finish_point(name: str) -> None
功能:设置图的终止节点。
- 参数:
用途:指定工作流的结束点。示例:- workflow.set_finish_point("finalize_output")
复制代码 注意:等效于 add_edge(name, END)。
4.3 编译图的方法
compile(checkpointer: Optional[BaseCheckpointSaver] = None, interrupt_after: Optional[List[str]] = None, interrupt_before: Optional[List[str]] = None) -> CompiledGraph
功能:将 StateGraph 编译为可执行的 CompiledGraph 对象。
- 参数:
checkpointer:检查点保存器,用于持久化状态(例如 MemorySaver 或数据库检查点)。如果为 None,状态不会持久化。interrupt_after:指定在哪些节点执行后中断,等待外部输入(用于交互式工作流)。interrupt_before:指定在哪些节点执行前中断。
返回值:CompiledGraph 对象,支持 invoke、stream、astream 等方法运行工作流。用途:将图的定义转换为可执行形式。
- 示例:
- from langgraph.checkpoint.memory import MemorySaver
- graph = workflow.compile(checkpointer=MemorySaver())
复制代码 5. 完整示例
以下是一个完整的示例,展示如何使用 StateGraph 构建一个具有条件分支的工作流:- from typing import TypedDict
- from langgraph.graph import StateGraph, START, END
- from langchain_core.runnables import RunnableConfig
- # 定义状态classState(TypedDict):input:str
- output:str
- needs_processing:bool# 定义配置(可选)classConfigSchema(TypedDict):
- user_id:str# 节点函数defcheck_input(state: State, config: RunnableConfig)-> State:
- state["needs_processing"]=len(state["input"])>5
- user_id = config.get("configurable",{}).get("user_id","default_user")
- state["output"]=f"检查输入 by {user_id}"return state
- defprocess_input(state: State, config: RunnableConfig)-> State:
- state["output"]=f"处理输入: {state['input']}"return state
- defskip_processing(state: State, config: RunnableConfig)-> State:
- state["output"]="输入太短,无需处理"return state
- # 条件函数defroute(state: State)->str:return"process_input"if state["needs_processing"]else"skip_processing"# 创建 StateGraph
- workflow = StateGraph(state_schema=State, config_schema=ConfigSchema)# 添加节点
- workflow.add_node("check_input", check_input)
- workflow.add_node("process_input", process_input)
- workflow.add_node("skip_processing", skip_processing)# 添加边
- workflow.add_edge(START,"check_input")
- workflow.add_conditional_edges("check_input",
- route,{"process_input":"process_input","skip_processing":"skip_processing"})
- workflow.add_edge("process_input", END)
- workflow.add_edge("skip_processing", END)# 编译图
- graph = workflow.compile()# 运行工作流
- result = graph.invoke({"input":"Hi","output":"","needs_processing":False},
- config={"configurable":{"user_id":"user123"}})print(result)
复制代码 输出(输入 “Hi”):- {'input':'Hi','output':'输入太短,无需处理','needs_processing':False}
复制代码 输出(输入 “Hello, LangGraph!”):- {'input':'Hello, LangGraph!','output':'处理输入: Hello, LangGraph!','needs_processing':True}
复制代码 这个示例展示了:
使用 StateGraph 初始化状态和配置。添加节点、条件边和无条件边。编译和运行图。
6. CompiledGraph 的关键方法
编译后的 CompiledGraph 对象(通过 workflow.compile() 获得)支持以下主要方法,用于执行工作流:
invoke(input: Dict[str, Any], config: Optional[RunnableConfig] = None) -> Dict[str, Any]
同步执行整个图,返回最终状态。示例:- result = graph.invoke({"input":"test"}, config={"configurable":{"user_id":"user123"}})
复制代码 stream(input: Dict[str, Any], config: Optional[RunnableConfig] = None) -> Iterator[Dict[str, Any]]
流式执行图,逐节点返回中间状态。示例:- for state in graph.stream({"input":"test"}):print(state)
复制代码 astream(input: Dict[str, Any], config: Optional[RunnableConfig] = None) -> AsyncIterator[Dict[str, Any]]
异步流式执行图,适合高并发场景。示例:- asyncfor state in graph.astream({"input":"test"}):print(state)
复制代码 astream_events(input: Dict[str, Any], config: Optional[RunnableConfig] = None, version: str = "v1") -> AsyncIterator[Dict[str, Any]]
异步流式返回图执行的事件(如节点开始、结束),适合与 LangSmith 集成。示例:- asyncfor event in graph.astream_events({"input":"test"}, version="v1"):print(event)
复制代码
7. 高级功能和注意事项
7.1 检查点(Checkpointing)
使用 checkpointer 参数(如 MemorySaver 或数据库检查点),可以在图执行过程中保存状态,支持中断和恢复。
- 示例:
- from langgraph.checkpoint.memory import MemorySaver
- graph = workflow.compile(checkpointer=MemorySaver())
复制代码 7.2 中断(Interrupts)
使用 interrupt_after 或 interrupt_before 参数,可以在指定节点处暂停执行,等待外部输入。示例:- graph = workflow.compile(interrupt_after=["process_input"])
复制代码 7.3 异步支持
节点函数可以定义为异步函数(async def),配合 astream 或 astream_events 使用。
- 示例:
- asyncdefasync_node(state: State, config: RunnableConfig)-> State:
- state["output"]=await some_async_function(state["input"])return state
复制代码 7.4 注意事项
状态一致性:确保节点函数返回的状态与 state_schema 一致,否则可能导致类型错误。条件边逻辑:add_conditional_edges 的 edge_mapping 必须覆盖条件函数的所有可能返回值。版本兼容性:使用最新版本的 LangGraph(如 0.2.x)以获得最佳性能和功能支持。
8. 与其他类的关系
CompiledGraph:StateGraph.compile() 的返回值,是可执行的图对象。RunnableConfig:运行时配置,传递给节点和工具,包含 configurable、callbacks 等字段。BaseCheckpointSaver:用于状态持久化的检查点类,与 compile 的 checkpointer 参数配合使用。
9. 总结
langgraph.graph.StateGraph 是 LangGraph 的核心类,用于定义基于状态的工作流图。其主要功能包括:
通过 add_node 和 add_edge 构建图结构。通过 add_conditional_edges 实现动态分支。通过 set_entry_point 和 set_finish_point 指定执行路径。通过 compile 生成可执行的 CompiledGraph。
其属性(如 nodes、edges)管理图的内部结构,方法(如 add_node、compile)提供灵活的 API。结合状态管理和检查点功能,StateGraph 适合构建复杂的 LLM 驱动应用。
参考文献:
LangGraph 官方文档:https://langchain-ai.github.io/langgraph/GitHub 仓库:https://github.com/langchain-ai/langgraphLangChain 文档:https://python.langchain.com/docs/
原文地址:https://blog.csdn.net/u013172930/article/details/147963138 |