作者:彬彬侠
目录
什么是 StateGraph?StateGraph 的核心功能StateGraph 的关键组件如何使用 StateGraph
- 代码示例
简单线性工作流带条件边的工作流支持循环和工具调用的复杂工作流
高级功能和注意事项与其他 LangGraph 类的关系常见问题和调试技巧总结和学习资源
1. 什么是 StateGraph?
langgraph.graph.StateGraph 是 LangGraph 库中的核心类,用于定义和管理基于状态的有向图工作流。它是 LangGraph 的主要构建块,允许开发者通过节点(Nodes)、边(Edges)和状态(State)构造复杂的工作流,特别适合与大型语言模型(LLM)结合的场景。
定义:StateGraph 是一个类,用于创建和管理一个有状态的图结构,其中每个节点表示一个任务或操作,边定义了执行顺序,状态在节点间传递和更新。用途:StateGraph 适用于需要动态逻辑、循环、条件分支或多步骤交互的应用程序,例如智能 Agent、对话系统、检索增强生成(RAG)工作流等。模块:位于 langgraph.graph 模块中,通常与 START 和 END 常量一起使用,表示图的入口和出口。
StateGraph 的设计目标是提供一个灵活、可扩展的框架,让开发者可以:
模块化地组织任务。维护跨步骤的状态(如上下文、用户输入、工具输出)。支持动态和循环逻辑,适应复杂的业务需求。
2. StateGraph 的核心功能
StateGraph 提供了以下主要功能:
状态管理:
通过用户定义的状态对象(通常是一个 TypedDict 或自定义类)跟踪工作流的上下文。状态在节点间传递,每个节点可以读取和更新状态。
节点定义:
允许添加节点,每个节点是一个函数,执行特定任务(如调用 LLM、处理数据、调用工具)。节点接收当前状态并返回更新后的状态。
边管理:
支持无条件边(直接跳转到下一个节点)。支持条件边(根据状态动态选择下一个节点)。支持循环边(返回到之前的节点)。
图编译:
通过 compile() 方法将图编译为可执行的 CompiledGraph 对象,支持 invoke、stream 和 astream 等运行方式。
入口和出口:
使用 set_entry_point 和 set_finish_point 或直接通过边连接 START 和 END 定义图的起点和终点。
配置支持:
支持运行时配置(RunnableConfig),例如线程 ID、用户 ID、递归限制等,方便多用户或多会话管理。
扩展性:
与 LangChain 的 LLM、工具和 Prompt 集成。支持异步执行(async 方法)。与 LangSmith 集成,用于调试和监控。
3. StateGraph 的关键组件
StateGraph 的工作流由以下核心组件组成:
状态(State):
状态是一个数据结构,用于存储工作流中的上下文信息。
- 通常使用 typing.TypedDict 或自定义类定义,例如:
- from typing import TypedDict
- classState(TypedDict):input:str
- output:str
复制代码 状态在节点间传递,每个节点可以读取和修改状态。
节点(Nodes):
节点是一个 Python 函数,接收状态(和可选的 RunnableConfig)并返回更新后的状态。
- 示例:
- defmy_node(state: State)-> State:
- state["output"]=f"处理: {state['input']}"return state
复制代码 边(Edges):
- 边定义了节点之间的执行顺序:
无条件边:使用 add_edge 添加,直接跳转到下一个节点。条件边:使用 add_conditional_edges 添加,根据条件函数动态选择下一个节点。
示例:- workflow.add_edge("node1","node2")# 无条件边
复制代码 入口和出口:
START:表示图的入口点,通常通过 add_edge(START, "first_node") 指定。END:表示图的终止点,通常通过 add_edge("last_node", END) 指定。
配置(RunnableConfig):
一个字典结构,包含运行时配置,如 configurable(用户自定义参数)、callbacks(用于跟踪)、recursion_limit 等。示例:- config ={"configurable":{"user_id":"user123","thread_id":"thread_001"}}
复制代码
4. 如何使用 StateGraph
以下是使用 StateGraph 构建工作流的标准步骤:
- 导入必要模块:
- version: python
- from langgraph.graph import StateGraph, START, END
- from typing import TypedDict
复制代码 - 定义状态:
创建一个状态类或字典,定义工作流中需要跟踪的字段。- version: python
- classState(TypedDict):input:str
- output:str
复制代码 - 定义节点:
编写节点函数,每个函数处理特定任务并更新状态。- version: python
- defprocess_input(state: State)-> State:
- state["output"]=f"处理输入: {state['input']}"return state
复制代码 - 创建 StateGraph 实例:
初始化 StateGraph,传入状态类。- version: python
- workflow = StateGraph(State)
复制代码 - 添加节点:
使用 add_node 方法将节点添加到图中。- version: python
- workflow.add_node("process_input", process_input)
复制代码 - 添加边:
使用 add_edge 或 add_conditional_edges 定义节点之间的连接。- version: python
- workflow.add_edge(START,"process_input")
- workflow.add_edge("process_input", END)
复制代码 - 编译图:
使用 compile() 方法将图编译为可执行对象。- version: python
- graph = workflow.compile()
复制代码 - 运行图:
使用 invoke、stream 或 astream 方法运行图,传入初始状态和可选配置。- version: python
- result = graph.invoke({"input":"Hello, LangGraph!","output":""})
复制代码
5. 代码示例
以下是三个不同复杂度的 StateGraph 示例,展示其灵活性。
5.1 简单线性工作流
一个简单的线性工作流,包含两个节点。- from langgraph.graph import StateGraph, START, END
- from typing import TypedDict
- # 定义状态classState(TypedDict):input:str
- output:str# 定义节点defprocess_input(state: State)-> State:
- state["output"]=f"处理输入: {state['input']}"return state
- deffinalize_output(state: State)-> State:
- state["output"]+=" -> 已完成"return state
- # 创建图
- workflow = StateGraph(State)
- workflow.add_node("process_input", process_input)
- workflow.add_node("finalize_output", finalize_output)
- workflow.add_edge(START,"process_input")
- workflow.add_edge("process_input","finalize_output")
- workflow.add_edge("finalize_output", END)# 编译和运行
- graph = workflow.compile()
- result = graph.invoke({"input":"Hello, LangGraph!","output":""})print(result)
复制代码 输出:- {'input':'Hello, LangGraph!','output':'处理输入: Hello, LangGraph! -> 已完成'}
复制代码 5.2 带条件边的工作流
一个工作流,根据输入长度决定是否处理。- from langgraph.graph import StateGraph, START, END
- from typing import TypedDict
- # 定义状态classState(TypedDict):input:str
- output:str
- needs_processing:bool# 定义节点defcheck_input(state: State)-> State:
- state["needs_processing"]=len(state["input"])>5return state
- defprocess_input(state: State)-> State:
- state["output"]=f"处理输入: {state['input']}"return state
- defskip_processing(state: State)-> State:
- state["output"]="输入太短,无需处理"return state
- # 条件函数defroute(state: State)->str:return"process_input"if state["needs_processing"]else"skip_processing"# 创建图
- workflow = StateGraph(State)
- workflow.add_node("check_input", check_input)
- workflow.add_node("process_input", process_input)
- workflow.add_node("skip_processing", skip_processing)
- workflow.add_edge(START,"check_input")
- workflow.add_conditional_edges("check_input", route,{"process_input":"process_input","skip_processing":"skip_processing"})
- workflow.add_edge("process_input", END)
- workflow.add_edge("skip_processing", END)# 编译和运行
- graph = workflow.compile()
- result = graph.invoke({"input":"Hi","output":"","needs_processing":False})print(result)
复制代码 输出:- {'input':'Hi','output':'输入太短,无需处理','needs_processing':False}
复制代码 5.3 支持循环和工具调用的复杂工作流
一个智能 Agent 工作流,结合 LLM 和工具调用,支持循环。- from langchain_openai import ChatOpenAI
- from langchain_core.tools import tool
- from langchain_core.runnables import RunnableConfig
- from langgraph.graph import StateGraph, START, END
- from typing import TypedDict, Optional
- # 定义工具@tooldefsearch(query:str, config: RunnableConfig)->str:"""模拟从网络上搜获相关的数据进行汇总"""returnf"搜索结果: {query} 的信息"# 定义状态classState(TypedDict):input:str
- output:str
- needs_search:bool
- search_count:int# 定义节点defdecide_action(state: State, config: RunnableConfig)-> State:
- llm = ChatOpenAI(model="gpt-4o-mini")
- prompt =f"用户问题: {state['input']}\n回答 'yes' 如果需要搜索,'no' 如果可以直接回答。"
- response = llm.invoke(prompt, config=config).content
- state["needs_search"]= response.lower()=="yes"return state
- defsearch_node(state: State, config: RunnableConfig)-> State:
- state["search_count"]+=1
- result = search.invoke({"query": state["input"]}, config=config)
- state["output"]= result
- return state
- defanswer_node(state: State, config: RunnableConfig)-> State:
- llm = ChatOpenAI(model="gpt-4o-mini")
- prompt =f"直接回答用户问题: {state['input']}"
- response = llm.invoke(prompt, config=config).content
- state["output"]= response
- return state
- # 条件函数defroute(state: State)->str:if state["needs_search"]and state["search_count"]<2:return"search_node"return"answer_node"# 创建图
- workflow = StateGraph(State)
- workflow.add_node("decide_action", decide_action)
- workflow.add_node("search_node", search_node)
- workflow.add_node("answer_node", answer_node)
- workflow.add_edge(START,"decide_action")
- workflow.add_conditional_edges("decide_action", route,{"search_node":"search_node","answer_node":"answer_node"})
- workflow.add_conditional_edges("search_node", route,{"search_node":"search_node","answer_node":"answer_node"})
- workflow.add_edge("answer_node", END)# 编译和运行
- graph = workflow.compile()
- result = graph.invoke({"input":"什么是 LangGraph?","output":"","needs_search":False,"search_count":0},
- config={"configurable":{"user_id":"user123"}})print(result)
复制代码 输出(假设 LLM 认为需要搜索):- {'input':'什么是 LangGraph?','output':'搜索结果: 什么是 LangGraph? 的信息','needs_search':True,'search_count':1}
复制代码 6. 高级功能和注意事项
6.1 高级功能
异步支持:
StateGraph 支持异步执行,使用 async def 定义节点函数,并通过 graph.ainvoke 或 graph.astream 运行。
- 示例:
- asyncdefasync_node(state: State)-> State:
- state["output"]="异步处理"return state
复制代码 检查点(Checkpoints):
StateGraph 支持检查点机制,通过 graph.compile(checkpointer=True) 保存中间状态,适合需要恢复或回溯的场景。示例:- graph = workflow.compile(checkpointer=True)
复制代码 配置模式(Config Schema):
定义 ConfigSchema 来约束 RunnableConfig 的字段,提高代码可维护性。
- 示例:
- classConfigSchema(TypedDict):
- user_id: Optional[str]
- model: Optional[str]
- workflow = StateGraph(State, config_schema=ConfigSchema)
复制代码 动态节点添加:
可以在运行时动态添加节点和边,适合需要根据上下文调整工作流的场景。
6.2 注意事项
状态不可变性:
默认情况下,状态是可变的,建议在节点中创建状态副本以避免意外修改。
- 示例:
- defmy_node(state: State)-> State:
- new_state = state.copy()
- new_state["output"]="新值"return new_state
复制代码 条件边映射:
确保 add_conditional_edges 的映射字典覆盖所有可能的条件返回值,否则可能引发运行时错误。
递归限制:
默认递归深度为 25,可以通过 config={"recursion_limit": 100} 调整,防止无限循环。
工具调用配置:
工具函数应声明 config: RunnableConfig 参数,避免使用已废弃的 get_config()(见前文)。
7. 与其他 LangGraph 类的关系
CompiledGraph:
StateGraph.compile() 返回一个 CompiledGraph 对象,提供 invoke、stream 等方法。CompiledGraph 是运行时对象,无法直接修改节点或边。
MessageGraph:
langgraph.graph.MessageGraph 是 StateGraph 的一个变种,专门用于处理消息列表(List[BaseMessage])的状态,适合对话系统。它简化了状态管理,但灵活性低于 StateGraph。
Pregel:
StateGraph 底层基于 LangGraph 的 Pregel 算法,用于分布式图计算。开发者通常无需直接操作 Pregel,但了解其存在有助于理解执行机制。
8. 常见问题和调试技巧
8.1 常见问题
问题:节点未正确更新状态。
原因:节点函数未返回状态,或状态字段拼写错误。解决方案:检查节点函数返回值,确保与状态定义一致。
问题:条件边跳转错误。
原因:条件函数返回值未正确映射到节点。解决方案:验证 add_conditional_edges 的映射字典。
问题:图执行卡住。
原因:可能是无限循环或递归深度超限。解决方案:检查循环条件,调整 recursion_limit。
8.2 调试技巧
使用 LangSmith:
启用 LangSmith 跟踪(设置 LANGCHAIN_TRACING_V2="true" 和 LANGCHAIN_API_KEY),可视化节点执行和状态变化。
- 示例:
- import os
- os.environ["LANGCHAIN_TRACING_V2"]="true"
- os.environ["LANGCHAIN_API_KEY"]="your-api-key"
复制代码 打印中间状态:
在节点中添加日志,打印状态内容。
- 示例:
- defmy_node(state: State)-> State:print(f"当前状态: {state}")
- state["output"]="处理完成"return state
复制代码 检查配置:
确保 RunnableConfig 正确传递,检查 configurable 字段是否包含预期值。
9. 总结和学习资源
9.1 总结
langgraph.graph.StateGraph 是 LangGraph 的核心类,用于构建和管理有状态的有向图工作流。它通过状态、节点和边提供了一个灵活的框架,适合复杂的 LLM 应用场景,如智能 Agent、对话系统和自动化工作流。关键点包括:
使用 TypedDict 或类定义状态。通过 add_node 和 add_edge 构建图。支持条件边和循环,适应动态逻辑。编译后通过 invoke 或 stream 执行。
通过代码示例,可以看到 StateGraph 如何从简单线性工作流扩展到复杂的工具调用和循环逻辑。结合 LangChain 和 LangSmith,可以进一步增强开发效率和调试能力。
9.2 学习资源
官方文档:https://langchain-ai.github.io/langgraph/LangChain 文档:https://python.langchain.com/docs/LangSmith:https://smith.langchain.com/GitHub 仓库:https://github.com/langchain-ai/langgraph社区讨论:GitHub Issues 和 LangChain 论坛。
原文地址:https://blog.csdn.net/u013172930/article/details/147962997 |