作者:佑瞻
在开发智能应用的过程中,我们常常会面临一个核心挑战:如何高效管理复杂的流程控制与状态更新?当业务逻辑涉及多步骤处理、并行任务或动态分支时,传统的线性编程模式往往显得力不从心。今天我们要聊的 LangGraph,正是为解决这类问题而生的强大工具 —— 它通过状态图(StateGraph)的方式,让我们能够以更直观、更灵活的方式定义和执行复杂流程。接下来,我们将从基础构建到高级应用,一步步拆解 LangGraph 的核心能力,尤其会深入讲解归约器(Reducer)这一关键机制。
一、构建基础步骤序列:从手动到快捷的实现之道
1.1 手动构建顺序图的核心逻辑
当我们需要定义一个线性执行的流程时,最基础的方式是通过add_node和add_edge方法逐步搭建。来看一个简单的三步流程示例:
python
运行- from langgraph.graph import START, StateGraph
- # 定义状态结构
- class State(TypedDict):
- value_1: str
- value_2: int
- # 初始化状态图构建器
- builder = StateGraph(State)
- # 添加节点(每个节点都是一个处理函数)
- def step_1(state: State):
- return {"value_1": "a"}
- def step_2(state: State):
- current_value_1 = state["value_1"]
- return {"value_1": f"{current_value_1} b"}
- def step_3(state: State):
- return {"value_2": 10}
- builder.add_node(step_1)
- builder.add_node(step_2)
- builder.add_node(step_3)
- # 定义执行顺序(从START开始,依次连接节点)
- builder.add_edge(START, "step_1")
- builder.add_edge("step_1", "step_2")
- builder.add_edge("step_2", "step_3")
- # 编译图并执行
- graph = builder.compile()
- result = graph.invoke({"value_1": "c"})
- print(result) # 输出: {'value_1': 'a b', 'value_2': 10}
复制代码 这里需要注意:默认情况下,节点返回的状态更新会直接覆盖对应键的值。但如果我们需要更复杂的更新逻辑(如累加、合并),就需要引入归约器。
二、归约器(Reducer)机制:精细控制状态更新的核心
2.1 归约器的基本概念与作用
归约器是 LangGraph 中控制状态更新行为的关键机制。默认情况下,节点对状态的更新是 "覆盖式" 的,但通过归约器,我们可以实现:
列表的追加(而非替换)数值的累加字典的合并其他自定义的更新逻辑
2.2 归约器的使用场景与实现方式
以列表追加场景为例,我们可以通过operator.add作为归约器来实现:
python
运行- import operator
- from typing import Annotated
- from langgraph.graph import StateGraph, START, END
- # 使用Annotated类型指定归约器
- class State(TypedDict):
- aggregate: Annotated[list, operator.add] # 使用operator.add实现列表追加
- def a(state: State):
- return {"aggregate": ["A"]}
- def b(state: State):
- return {"aggregate": ["B"]}
- def c(state: State):
- return {"aggregate": ["C"]}
- def d(state: State):
- return {"aggregate": ["D"]}
- builder = StateGraph(State)
- builder.add_node(a).add_node(b).add_node(c).add_node(d)
- # 定义并行分支流程
- builder.add_edge(START, "a")
- builder.add_edge("a", "b")
- builder.add_edge("a", "c")
- builder.add_edge("b", "d")
- builder.add_edge("c", "d")
- builder.add_edge("d", END)
- graph = builder.compile()
- result = graph.invoke({"aggregate": []})
- print(result) # 输出: {'aggregate': ['A', 'B', 'C', 'D']}
复制代码 这里的核心在于Annotated[list, operator.add]的定义:
第一个参数list是状态值的类型第二个参数operator.add是归约器函数,它告诉 LangGraph:对这个键的更新应该使用加法操作(列表场景下即追加)
2.3 归约器的工作原理
当多个节点同时更新同一个启用了归约器的状态键时,LangGraph 会:
收集所有节点对该键的更新值使用归约器函数将这些值与原状态值合并将合并结果作为新的状态值
以并行分支为例,节点 b 和 c 同时更新aggregate列表时,归约器会将两个列表合并,而不是用后一个覆盖前一个。
三、并行分支处理:归约器与并行执行的完美配合
3.1 归约器在并行流程中的关键作用
在并行执行场景中,归约器尤为重要。假设我们有两个并行节点都要更新同一个列表:
没有归约器时,后执行的节点会覆盖前一个的结果有归约器时,两个节点的更新会被合并
python
运行- # 并行分支中使用归约器的完整示例
- import operator
- from langgraph.graph import StateGraph, START, END
- class State(TypedDict):
- data: Annotated[list, operator.add] # 定义列表追加归约器
- def producer1(state: State):
- return {"data": [1, 2]}
- def producer2(state: State):
- return {"data": [3, 4]}
- def consumer(state: State):
- return {"data": [sum(state["data"])]} # 计算总和
- builder = StateGraph(State)
- builder.add_node(producer1).add_node(producer2).add_node(consumer)
- # 定义并行流程:两个生产者并行,结果汇总到消费者
- builder.add_edge(START, "producer1")
- builder.add_edge(START, "producer2")
- builder.add_edge("producer1", "consumer")
- builder.add_edge("producer2", "consumer")
- graph = builder.compile()
- result = graph.invoke({"data": []})
- print(result) # 输出: {'data': [10]}(1+2+3+4=10)
复制代码 这里归约器确保了producer1和producer2的列表更新被合并,而不是覆盖,最终consumer节点能获取完整的数据集。
四、构建基础步骤序列:归约器在顺序流程中的应用
4.1 顺序流程中的归约器使用
归约器不仅适用于并行场景,在顺序流程中也能发挥作用。例如,我们需要逐步构建一个复杂数据结构:
python
运行- class State(TypedDict):
- log: Annotated[list, operator.add] # 日志列表追加归约器
- def step1(state: State):
- return {"log": ["任务开始"]}
- def step2(state: State):
- return {"log": ["数据加载完成"]}
- def step3(state: State):
- return {"log": ["处理完毕"]}
- builder = StateGraph(State).add_sequence([step1, step2, step3])
- builder.add_edge(START, "step1")
- graph = builder.compile()
- result = graph.invoke({"log": []})
- print(result) # 输出: {'log': ['任务开始', '数据加载完成', '处理完毕']}
复制代码 通过归约器,每个步骤的日志都被追加到列表中,形成完整的执行记录。
五、其他核心特性:从分支到循环的全场景覆盖
5.1 动态条件分支:基于状态与归约器的综合应用
当分支逻辑依赖于归约器处理后的状态时,两者的结合能实现更复杂的业务逻辑:
python
运行- import operator
- from typing import Literal
- from langgraph.graph import StateGraph, START, END
- class State(TypedDict):
- scores: Annotated[list, operator.add] # 分数列表追加归约器
- decision: str
- def calculate_score(state: State):
- # 模拟计算分数
- return {"scores": [random.randint(1, 10)]}
- def make_decision(state: State) -> Literal["pass", "fail", END]:
- total = sum(state["scores"])
- if total >= 15:
- return "pass"
- elif total >= 5:
- return "fail"
- else:
- return END # 分数太低,直接结束
- def pass_node(state: State):
- return {"decision": "通过"}
- def fail_node(state: State):
- return {"decision": "不通过"}
- builder = StateGraph(State)
- builder.add_node(calculate_score).add_node(pass_node).add_node(fail_node)
- # 定义流程:计算分数→根据总分决策→执行对应节点
- builder.add_edge(START, "calculate_score")
- builder.add_conditional_edges("calculate_score", make_decision)
- builder.add_edge("pass", "pass_node")
- builder.add_edge("fail", "fail_node")
- graph = builder.compile()
- result = graph.invoke({"scores": []})
- print(result) # 输出包含归约后的scores和决策结果
复制代码 这里归约器负责累加分数,条件分支根据归约后的总分决定执行路径,两者结合实现了动态决策流程。
5.2 循环控制:归约器与循环的结合
在循环场景中,归约器能高效处理多次迭代的状态累积:
python
运行- import operator
- from langgraph.graph import StateGraph, START, END
- class State(TypedDict):
- results: Annotated[list, operator.add] # 结果列表追加归约器
- counter: int
- def process_item(state: State):
- # 模拟处理数据
- return {"results": [state["counter"] * 2], "counter": state["counter"] + 1}
- def should_continue(state: State) -> Literal["process_item", END]:
- if state["counter"] < 5:
- return "process_item"
- else:
- return END
- builder = StateGraph(State)
- builder.add_node(process_item)
- # 定义循环:处理数据→检查是否继续
- builder.add_edge(START, "process_item")
- builder.add_conditional_edges("process_item", should_continue)
- graph = builder.compile()
- result = graph.invoke({"results": [], "counter": 1})
- print(result) # 输出: {'results': [2, 4, 6, 8, 10], 'counter': 5}
复制代码 归约器确保每次循环的结果都被累积到results列表中,避免了手动维护累加逻辑的复杂性。
六、高级特性:异步处理与 Command 的强大组合
6.1 Command 与归约器的协同
Command 对象允许在节点中同时处理状态更新和流程控制,而归约器则负责状态更新的具体逻辑:
python
运行- from langgraph.types import Command
- import operator
- class State(TypedDict):
- items: Annotated[list, operator.add] # 列表追加归约器
- next_action: str
- def process_node(state: State) -> Command[Literal["add_item", "finish"]]:
- # 1. 决定下一步动作
- if len(state["items"]) < 3:
- next_node = "add_item"
- else:
- next_node = "finish"
-
- # 2. 生成状态更新(归约器会处理列表追加)
- return Command(
- update={"items": ["新元素"]},
- goto=next_node
- )
- def add_item(state: State):
- return {"items": ["临时元素"]} # 归约器会合并到items列表
- builder = StateGraph(State)
- builder.add_node(process_node).add_node(add_item)
- builder.add_edge(START, "process_node")
- graph = builder.compile()
- result = graph.invoke({"items": [], "next_action": "process"})
- print(result) # items列表会包含所有追加的元素
复制代码 这里 Command 负责流程控制,而归约器确保多次items更新被正确合并,两者分工明确又紧密协作。
七、总结与实践建议
通过 LangGraph,我们获得了一种声明式定义复杂流程的能力,而其中归约器是实现状态高效管理的核心机制。在实践中,我们建议:
优先设计状态结构与归约器:在构建图之前,先明确每个状态键的更新规则,通过 Annotated 类型指定合适的归约器并行场景必用归约器:只要有多个节点可能更新同一状态键,就应该考虑使用归约器避免覆盖善用 operator 模块的归约器:operator.add(列表追加、数值累加)、operator.iconcat(字符串拼接)等是最常用的归约器复杂场景自定义归约器:如果内置归约器不满足需求,可以定义自定义函数作为归约器归约器与条件分支结合:利用归约器处理后的状态进行动态决策,实现更智能的流程控制
如果本文对你有帮助,别忘了点赞收藏,关注我,一起探索更高效的开发方式~
原文地址:https://blog.csdn.net/The_Thieves/article/details/148799017 |