作者:佑瞻
在开发智能应用的过程中,我们常常会面临一个核心挑战:如何高效管理复杂的流程控制与状态更新?当业务逻辑涉及多步骤处理、并行任务或动态分支时,传统的线性编程模式往往显得力不从心。今天我们要聊的 LangGraph,正是为解决这类问题而生的强大工具 —— 它通过状态图(StateGraph)的方式,让我们能够以更直观、更灵活的方式定义和执行复杂流程。接下来,我们将从基础构建到高级应用,一步步拆解 LangGraph 的核心能力。
一、构建基础步骤序列:从手动到快捷的实现之道
1.1 手动构建顺序图的核心逻辑
当我们需要定义一个线性执行的流程时,最基础的方式是通过add_node和add_edge方法逐步搭建。来看一个简单的三步流程示例:
python
运行- from langgraph.graph import START, StateGraph
- # 定义状态结构
- class State(TypedDict):
- value_1: str
- value_2: int
- # 初始化状态图构建器
- builder = StateGraph(State)
- # 添加节点(每个节点都是一个处理函数)
- def step_1(state: State):
- return {"value_1": "a"}
- def step_2(state: State):
- current_value_1 = state["value_1"]
- return {"value_1": f"{current_value_1} b"}
- def step_3(state: State):
- return {"value_2": 10}
- builder.add_node(step_1)
- builder.add_node(step_2)
- builder.add_node(step_3)
- # 定义执行顺序(从START开始,依次连接节点)
- builder.add_edge(START, "step_1")
- builder.add_edge("step_1", "step_2")
- builder.add_edge("step_2", "step_3")
- # 编译图并执行
- graph = builder.compile()
- result = graph.invoke({"value_1": "c"})
- print(result) # 输出: {'value_1': 'a b', 'value_2': 10}
复制代码 这里的核心逻辑在于:状态图通过节点定义处理逻辑,通过边定义执行顺序。每个节点函数接收当前状态,返回状态更新值,默认会覆盖对应键的值。
1.2 快捷方式:add_sequence的妙用
对于线性流程,LangGraph 提供了更简洁的构建方式:
python
运行- builder = StateGraph(State).add_sequence([step_1, step_2, step_3])
- builder.add_edge(START, "step_1")
- graph = builder.compile()
复制代码 add_sequence方法会自动按顺序添加节点并建立边连接,大大减少了代码量。这种方式特别适合处理步骤明确的流水线任务。
二、并行分支处理:提升流程执行效率的关键
2.1 并行执行的基本实现
在需要同时处理多个任务时,并行分支能显著提升效率。来看一个典型的 "扇出 - 扇入" 结构:
python
运行- import operator
- from langgraph.graph import StateGraph, START, END
- class State(TypedDict):
- # 使用operator.add作为Reducer,使列表变为追加模式
- aggregate: Annotated[list, operator.add]
- def a(state: State):
- return {"aggregate": ["A"]}
- def b(state: State):
- return {"aggregate": ["B"]}
- def c(state: State):
- return {"aggregate": ["C"]}
- def d(state: State):
- return {"aggregate": ["D"]}
- builder = StateGraph(State)
- builder.add_node(a).add_node(b).add_node(c).add_node(d)
- # 定义并行分支:从a扇出到b和c,再扇入到d
- builder.add_edge(START, "a")
- builder.add_edge("a", "b")
- builder.add_edge("a", "c")
- builder.add_edge("b", "d")
- builder.add_edge("c", "d")
- builder.add_edge("d", END)
- graph = builder.compile()
- result = graph.invoke({"aggregate": []})
- print(result) # 输出: {'aggregate': ['A', 'B', 'C', 'D']}
复制代码 这里的关键点是:节点 b 和 c 会在同一超级步骤(superstep)中并发执行,而节点 d 会在 b 和 c 都完成后执行。通过 Reducer 的设置(如operator.add),我们实现了状态值的累加而非覆盖。
2.2 分支长度不一致时的处理:延迟执行
当分支长度不同时,我们需要确保后续节点等待所有前置任务完成。通过defer=True参数可以实现这一点:
python
运行- def b_2(state: State):
- return {"aggregate": ["B_2"]}
- builder.add_node(b_2).add_node(d, defer=True) # d节点设置为延迟执行
- builder.add_edge("b", "b_2").add_edge("b_2", "d")
- graph.invoke({"aggregate": []})
- # 输出顺序:A → B → C → B_2 → D,确保d等待b分支完成
复制代码 2.3 动态条件分支:基于状态的运行时决策
有时候分支路径需要根据运行时状态动态决定,这时可以使用add_conditional_edges:
python
运行- class State(TypedDict):
- aggregate: list
- which: str # 决定分支的状态键
- def a(state: State):
- return {"aggregate": ["A"], "which": "c"} # 动态设置which为"c"
- def conditional_edge(state: State):
- return state["which"] # 根据which的值决定下一个节点
- builder.add_conditional_edges("a", conditional_edge)
- builder.add_edge("b", END).add_edge("c", END)
- graph.invoke({"aggregate": []})
- # 输出: 执行a → c,因为which的值为"c"
复制代码 条件函数conditional_edge会根据当前状态返回目标节点名,实现动态路由。
三、循环控制:解决重复任务的优雅方案
3.1 带终止条件的循环实现
循环在数据处理、迭代计算等场景中必不可少。LangGraph 通过条件边实现循环终止:
python
运行- def route(state: State) -> Literal["b", END]:
- if len(state["aggregate"]) < 7:
- return "b" # 继续循环
- else:
- return END # 终止循环
- builder.add_edge(START, "a")
- builder.add_conditional_edges("a", route)
- builder.add_edge("b", "a") # b节点执行后回到a,形成循环
- graph.invoke({"aggregate": []})
- # 输出:A和B交替执行,直到aggregate长度≥7时终止
复制代码 3.2 递归限制:防止无限循环
为避免异常情况,我们可以设置递归限制:
python
运行- from langgraph.errors import GraphRecursionError
- try:
- graph.invoke({"aggregate": []}, {"recursion_limit": 4})
- except GraphRecursionError:
- print("达到递归限制")
复制代码 默认递归限制为 25 次,超出时会抛出异常,确保程序不会因死循环而崩溃。
四、高级特性:异步处理与 Command 的强大组合
4.1 异步执行:提升 IO 密集型任务效率
对于需要调用 LLM、API 请求等 IO 操作的场景,异步执行能大幅提升性能:
python
运行- import os
- from langchain.chat_models import init_chat_model
- from langgraph.graph import MessagesState, StateGraph
- # 初始化异步LLM
- os.environ["OPENAI_API_KEY"] = "sk-..."
- llm = init_chat_model("openai:gpt-4.1")
- # 定义异步节点
- async def node(state: MessagesState):
- new_message = await llm.ainvoke(state["messages"])
- return {"messages": [new_message]}
- builder = StateGraph(MessagesState).add_node(node).set_entry_point("node")
- graph = builder.compile()
- # 异步调用
- input_message = {"role": "user", "content": "Hello"}
- result = await graph.ainvoke({"messages": [input_message]})
复制代码 通过async def定义异步节点,使用.ainvoke/.astream执行,实现并发 IO 操作。
4.2 Command:状态更新与流程控制的统一
Command 对象允许在同一个节点中同时处理状态更新和流程控制:
python
运行- from langgraph.types import Command
- def node_a(state: State) -> Command[Literal["node_b", "node_c"]]:
- value = random.choice(["a", "b"])
- # 同时设置状态更新和下一个节点
- return Command(
- update={"foo": value},
- goto="node_b" if value == "a" else "node_c"
- )
- builder.add_edge(START, "node_a")
- # 注意:无需定义a到b/c的边,Command已包含路由逻辑
复制代码 Command 的强大之处在于:将原本需要通过边定义的流程控制,整合到节点的返回结果中,使复杂逻辑的封装更紧凑。
4.3 子图导航与工具集成
在嵌套图结构中,我们可以从子图导航到父图节点:
python
运行- def my_node(state: State) -> Command[Literal["parent_node"]]:
- return Command(
- update={"foo": "bar"},
- goto="parent_node",
- graph=Command.PARENT # 导航到父图
- )
复制代码 在工具集成场景中,Command 能方便地处理状态更新与消息记录:
python
运行- from langgraph.types import ToolMessage
- @tool
- def lookup_user_info(tool_call_id: str):
- user_info = get_user_info(...)
- return Command(
- update={
- "user_info": user_info,
- "messages": [ToolMessage("查询成功", tool_call_id=tool_call_id)]
- }
- )
复制代码 五、总结与实践建议
通过 LangGraph,我们获得了一种声明式定义复杂流程的能力:从线性步骤到并行分支,从条件路由到循环控制,再到异步处理与命令式编程的结合,这套框架几乎覆盖了智能应用开发中的所有流程控制场景。在实践中,我们建议:
从状态建模开始:先定义清晰的 State 结构,通过 Reducer 控制状态更新方式善用快捷方式:add_sequence等方法能大幅减少线性流程的代码量注意并行执行的顺序性:当需要严格顺序时,通过状态字段或延迟执行控制始终设置循环终止条件:结合递归限制,避免程序异常在 IO 密集场景使用异步:充分利用ainvoke提升并发性能
如果本文对你有帮助,别忘了点赞收藏,关注我,一起探索更高效的开发方式~
原文地址:https://blog.csdn.net/The_Thieves/article/details/148798950 |