作者:找了一圈尾巴
Command简述
在 LangGraph 中,Command 是一个极具实用性的功能,它能够将控制流(边)和状态更新(节点)巧妙地结合起来。这意味着开发者可以在同一个节点中,既执行状态更新操作,又决定下一个要前往的节点,为工作流的构建带来了极大的灵活性。
Command基本用法
Command 允许你在单个节点函数中完成两件关键任务:
更新图状态 (update): 修改共享的 State 对象。
指定下一节点 (goto): 显式决定工作流下一步执行哪个节点。
- def my_node(state: State) -> Command[Literal["my_other_node"]]:
- return Command(
- # 状态更新
- update={"foo": "bar"},
- # 控制流
- goto="my_other_node"
- )
复制代码 在这个示例中,my_node 函数返回一个 Command 对象,
update 参数用于指定状态的更新内容,将状态中的 "foo" 键值设为 "bar";
goto 参数则决定了下一个要前往的节点是 "my_other_node"。
关键约束
在节点函数中返回 Command 时,必须添加返回类型注释,注明该节点可路由到的节点名称列表,如 Command[Literal["my_other_node"]]。这一点至关重要,它不仅是 graph 渲染所必需的,还能明确告知 LangGraph 该节点可以导航到 "my_other_node"。
Command 核心应用场景
动态控制流(替代条件边)
直接在节点逻辑中根据状态判断跳转路径,代码更内聚:- def check_threshold(state: State) -> Command[Literal["proceed", "halt"]]:
- if state["value"] > state["threshold"]:
- return Command(goto="halt") # 无需更新状态时update可省略
- return Command(goto="proceed")
复制代码 Command 与条件边的决策
- Command当您需要同时更新图形状态和路由到其他节点时使用。例如,在实现多代理切换时,需要路由到其他代理并向该代理传递一些信息。
- 当仅需根据当前状态选择分支且不涉及状态修改(如循环判断、简单路由),使用条件边更清晰。
跨层级节点跳转(子图 → 父图)
实现跨子图的工作流跳转,需指定 graph=Command.PARENT,代码示例如下:- from typing import Literal, Optional, Annotated
- from operator import add
- from langgraph.checkpoint.memory import MemorySaver
- from langgraph.graph import StateGraph, START, END
- from langgraph.types import Command
- from pydantic import BaseModel
- # 定义子图状态
- class SubgraphState(BaseModel):
- """子图状态定义"""
- issue: str
- escalated: bool = False # 与父图共享的状态字段
- # 定义父图状态 reducer(处理子图与父图的状态冲突)
- def escalated_reducer(left: bool, right: bool) -> bool:
- """共享字段escalated的合并策略:右侧值(子图更新)优先"""
- return right
- # 定义父图状态
- class ParentGraphState(BaseModel):
- """父图状态定义"""
- issue: str
- escalated: Annotated[bool, escalated_reducer] = False # 使用reducer注解
- resolution: Optional[str] = None
- # 子图节点
- def subgraph_worker(state: SubgraphState) -> Command[Literal["manager_approval", END]]:
- """子图中的工作节点,决定是否升级到父图"""
- if "critical" in state.issue.lower():
- return Command(
- update={"escalated": True}, # 更新共享状态
- goto="manager_approval", # 父图中的目标节点
- graph=Command.PARENT # 关键:指定跳转到父图
- )
- return Command(goto=END) # 不升级则直接结束子图
- # 构建子图
- subgraph_builder = StateGraph(SubgraphState)
- subgraph_builder.add_node("worker", subgraph_worker)
- subgraph_builder.add_edge(START, "worker")
- subgraph = subgraph_builder.compile()
- # 父图节点
- def manager_approval_node(state: ParentGraphState) -> ParentGraphState:
- """父图中的经理审批节点"""
- return ParentGraphState(**state.model_dump(), resolution="经理已审批处理")
- # 构建父图(包含子图)
- parent_builder = StateGraph(ParentGraphState)
- parent_builder.add_node("subgraph", subgraph) # 嵌入子图
- parent_builder.add_node("manager_approval", manager_approval_node)
- # 定义父图边
- parent_builder.add_edge(START, "subgraph")
- parent_builder.add_edge("manager_approval", END)
- # 编译父图
- parent_graph = parent_builder.compile(
- checkpointer=MemorySaver()
- )
- # 运行示例
- if __name__ == "__main__":
- print("=== 跨层级跳转测试 ===")
- # 测试会触发升级的情况
- result1 = parent_graph.invoke({"issue": "Critical error: system down"})
- print(f"测试1 - 触发升级: {result1}")
- # 应输出包含escalated=True和经理审批结果的状态
- # 测试不会触发升级的情况
- result2 = parent_graph.invoke({"issue": "Minor issue: slow response"})
- print(f"测试2 - 不触发升级: {result2}")
- # 应输出escalated=False且无经理审批结果的状态
复制代码状态同步注意:若父子图共享状态字段,需在父图状态中为该字段定义 reducer 函数处理冲突。
工具调用与状态注入
一个常见的工具调用场景是从工具内部更新图谱状态。例如,在客户支持应用中,您可能希望在对话开始时根据客户的账号或 ID 查找客户信息。要从工具中更新图谱状态,您可以从工具中返回 Command(update={"my_custom_key": "foo", "messages": [...]})。代码示例如下:- from typing import Annotated, Dict, Any, List
- from pydantic import BaseModel
- from operator import add
- from langgraph.graph import StateGraph, START, END, Command
- from langgraph.prebuilt import ToolNode
- from langgraph.checkpoint.memory import MemorySaver
- from langchain_core.tools import tool
- from langchain_core.messages import AIMessage, ToolMessage
- # 定义消息列表的reducer
- def add_messages(left: List[Any], right: List[Any]) -> List[Any]:
- """消息列表的合并策略"""
- return left + right
- # 1. 定义状态模型
- class SupportState(BaseModel):
- user_id: str # 客户ID
- user_info: Annotated[Dict[str, Any], add] = {} # 工具返回的用户信息
- messages: Annotated[List[Any], add_messages] = [] # 消息历史(必须包含工具调用记录)
- query_status: str = "pending" # 流程状态标记
- # 2. 定义工具函数
- @tool
- def lookup_user_info(tool_call_id: Annotated[str, "tool_call_id"]) -> Command:
- """根据用户ID查询客户信息(内部工具)"""
- # 假设从线程配置中获取user_id(实际中可从config获取)
- # 注意:此处简化处理,实际需注入config
- user_id = "CUST-789" # 模拟获取
-
- print(f"正在查询用户信息: {user_id}")
-
- # 模拟实际查询逻辑(可替换为数据库/API调用)
- user_data = {
- "user_id": user_id,
- "name": "陈明亮",
- "membership": "白金会员",
- "contact": "chen@example.com",
- "recent_tickets": "订单延迟问题"
- }
-
- print(f"查询结果: {user_data}")
-
- # 返回Command更新状态
- return Command(
- update={
- "user_info": user_data,
- "messages": [ToolMessage(
- content=f"已获取用户 {user_id} 的详细信息",
- tool_call_id=tool_call_id
- )],
- "query_status": "user_info_fetched"
- }
- )
- # 3. 定义触发工具调用的节点
- def call_tool(state: SupportState) -> SupportState:
- """创建工具调用消息的节点"""
- print(f"\n===== 触发工具调用 =====")
- tool_call_message = AIMessage(
- content="",
- tool_calls=[{
- "name": "lookup_user_info",
- "args": {}, # 无额外参数
- "id": "call_123"
- }]
- )
- return SupportState(messages=state.messages + [tool_call_message])
- # 4. 定义后续处理节点
- def handle_support_request(state: SupportState) -> SupportState:
- """处理客户请求的节点(依赖工具返回的用户信息)"""
- print(f"\n===== 开始处理客户请求 =====")
- print(f"客户ID: {state.user_id}")
- print(f"客户信息: {state.user_info}")
- print(f"当前状态: {state.query_status}")
- print(f"消息记录: {state.messages}")
-
- # 基于用户信息进行业务处理
- if state.user_info.get("membership") == "白金会员":
- priority_msg = "优先处理白金会员请求"
- else:
- priority_msg = "标准流程处理请求"
-
- # 更新最终状态
- return SupportState(
- user_id=state.user_id,
- user_info=state.user_info,
- messages=state.messages + [priority_msg, "客户请求处理完成"],
- query_status="completed"
- )
- # 5. 构建图谱(使用ToolNode处理Command)
- def build_support_graph():
- builder = StateGraph(SupportState)
-
- # 创建ToolNode
- tool_node = ToolNode([lookup_user_info])
-
- # 添加节点
- builder.add_node("call_tool", call_tool)
- builder.add_node("tools", tool_node)
- builder.add_node("process_request", handle_support_request)
-
- # 定义边
- builder.add_edge(START, "call_tool")
- builder.add_edge("call_tool", "tools")
- builder.add_edge("tools", "process_request")
- builder.add_edge("process_request", END)
-
- return builder.compile(checkpointer=MemorySaver())
- # 6. 运行示例
- if __name__ == "__main__":
- support_graph = build_support_graph()
- print("=== 客户支持流程开始 ===")
- result = support_graph.invoke({"user_id": "CUST-789"})
- print("\n===== 流程结束 =====")
- print(f"最终用户信息: {result['user_info']['name']} ({result['user_info']['membership']})")
- print(f"流程完成状态: {result['query_status']}")
- print(f"完整消息历史: {result['messages']}")
复制代码@Tool 工具Command对象返回值:
update参数需包含业务数据(如用户信息、查询结果)和消息历史
messages字段必须包含ToolMessage,且需关联对应的tool_call_id
(这是 LLM 提供商要求的格式,确保工具调用与结果在消息链中正确关联)
ToolNode 的核心作用:
自动 Command 处理:解析工具返回的 Command 对象并更新状态
消息历史维护:自动添加 ToolMessage 到消息序列
错误处理:内置工具执行异常捕获机制
ID 管理:自动处理 tool_call_id 的生成和关联
人机协同工作流(Human-in-the-Loop)
Command还可以与 interrupt() 结合实现人工审核中断与恢复:- from typing import TypedDict
- from langgraph.checkpoint.memory import MemorySaver
- from langgraph.graph import StateGraph
- from langgraph.types import Command, interrupt
- # 新增模型相关导入
- from langchain_core.messages import HumanMessage
- from langchain_openai import ChatOpenAI
- from dotenv import load_dotenv
- load_dotenv()
- # 定义状态类型
- class State(TypedDict):
- some_text: str
- # 初始化大模型(需要设置OPENAI_API_KEY环境变量)
- model = ChatOpenAI(model="gpt-4o-mini")
- # 初始化检查点存储
- checkpointer = MemorySaver()
- # 定义人类介入节点
- def human_node(state: State):
- value = interrupt(
- {
- "text_to_revise": state["some_text"],
- "instructions": "请修改以下文本:"
- }
- )
- return {"some_text": value}
- # 修改后的自动处理节点(调用大模型)
- def process_text(state: State):
- # 构造模型请求
- message = model.invoke([
- HumanMessage(content=f"请处理以下请求:{state['some_text']}。保持回答简洁。")
- ])
- # 返回模型生成的文本
- return {"some_text": message.content}
- # 构建工作流
- graph_builder = StateGraph(State)
- # 添加节点
- graph_builder.add_node("human_review", human_node)
- graph_builder.add_node("auto_process", process_text)
- # 设置流程
- graph_builder.set_entry_point("auto_process")
- graph_builder.add_edge("auto_process", "human_review")
- # 编译图表
- graph = graph_builder.compile(
- checkpointer=checkpointer,
- interrupt_before=["human_review"]
- )
- # 使用示例(保持不变)
- if __name__ == "__main__":
- thread_id = "thread_123"
- thread_config = {"configurable": {"thread_id": thread_id}}
- initial_state = {"some_text": "输出一个五五乘法表"}
- result = graph.invoke(initial_state, config=thread_config)
- print("自动处理结果:", result["some_text"])
- human_input = input("请输入人类输入:")
- resume_result = graph.invoke(
- Command(resume=human_input),
- config=thread_config
- )
- print("最终结果:", resume_result["some_text"])
复制代码 最佳实践与注意事项
类型标注不可少:-> Command[Literal["node_a", "node_b"]] 是 LangGraph 静态检查和绘图的基础。
跨图状态设计:子图跳转父图更新共享状态时,父图需定义 reducer 处理冲突(如 lambda current, update: update)。
命名一致性:goto 指定的节点名必须与图中注册的节点名完全一致。
工具安全调用:在工具中使用 Command 时,确保状态更新不会破坏图的一致性。
参考文献
Overview
原文地址:https://blog.csdn.net/weixin_41645817/article/details/149855143 |