AI创想

标题: LangGraph认知篇-Command函数 [打印本页]

作者: AI小编    时间: 14 小时前
标题: LangGraph认知篇-Command函数
作者:找了一圈尾巴
Command简述

        在 LangGraph 中,Command 是一个极具实用性的功能,它能够将控制流(边)和状态更新(节点)巧妙地结合起来。这意味着开发者可以在同一个节点中,既执行状态更新操作,又决定下一个要前往的节点,为工作流的构建带来了极大的灵活性。
Command基本用法

Command 允许你在单个节点函数中完成两件关键任务:
  1. def my_node(state: State) -> Command[Literal["my_other_node"]]:​
  2.     return Command(​
  3.         # 状态更新​
  4.         update={"foo": "bar"},​
  5.         # 控制流​
  6.         goto="my_other_node"​
  7.     )
复制代码
       在这个示例中,my_node 函数返回一个 Command 对象,
关键约束

        在节点函数中返回 Command 时,必须添加返回类型注释,注明该节点可路由到的节点名称列表,如 Command[Literal["my_other_node"]]。这一点至关重要,它不仅是 graph 渲染所必需的,还能明确告知 LangGraph 该节点可以导航到 "my_other_node"。
Command 核心应用场景

动态控制流(替代条件边)

        直接在节点逻辑中根据状态判断跳转路径,代码更内聚:
  1. def check_threshold(state: State) -> Command[Literal["proceed", "halt"]]:
  2.     if state["value"] > state["threshold"]:
  3.         return Command(goto="halt")  # 无需更新状态时update可省略
  4.     return Command(goto="proceed")
复制代码
Command 与条件边的决策

跨层级节点跳转(子图 → 父图)

        实现跨子图的工作流跳转,需指定 graph=Command.PARENT,代码示例如下:
  1. from typing import Literal, Optional, Annotated
  2. from operator import add
  3. from langgraph.checkpoint.memory import MemorySaver
  4. from langgraph.graph import StateGraph, START, END
  5. from langgraph.types import Command
  6. from pydantic import BaseModel
  7. # 定义子图状态
  8. class SubgraphState(BaseModel):
  9.     """子图状态定义"""
  10.     issue: str
  11.     escalated: bool = False  # 与父图共享的状态字段
  12. # 定义父图状态 reducer(处理子图与父图的状态冲突)
  13. def escalated_reducer(left: bool, right: bool) -> bool:
  14.     """共享字段escalated的合并策略:右侧值(子图更新)优先"""
  15.     return right
  16. # 定义父图状态
  17. class ParentGraphState(BaseModel):
  18.     """父图状态定义"""
  19.     issue: str
  20.     escalated: Annotated[bool, escalated_reducer] = False  # 使用reducer注解
  21.     resolution: Optional[str] = None
  22. # 子图节点
  23. def subgraph_worker(state: SubgraphState) -> Command[Literal["manager_approval", END]]:
  24.     """子图中的工作节点,决定是否升级到父图"""
  25.     if "critical" in state.issue.lower():
  26.         return Command(
  27.             update={"escalated": True},  # 更新共享状态
  28.             goto="manager_approval",  # 父图中的目标节点
  29.             graph=Command.PARENT  # 关键:指定跳转到父图
  30.         )
  31.     return Command(goto=END)  # 不升级则直接结束子图
  32. # 构建子图
  33. subgraph_builder = StateGraph(SubgraphState)
  34. subgraph_builder.add_node("worker", subgraph_worker)
  35. subgraph_builder.add_edge(START, "worker")
  36. subgraph = subgraph_builder.compile()
  37. # 父图节点
  38. def manager_approval_node(state: ParentGraphState) -> ParentGraphState:
  39.     """父图中的经理审批节点"""
  40.     return ParentGraphState(**state.model_dump(), resolution="经理已审批处理")
  41. # 构建父图(包含子图)
  42. parent_builder = StateGraph(ParentGraphState)
  43. parent_builder.add_node("subgraph", subgraph)  # 嵌入子图
  44. parent_builder.add_node("manager_approval", manager_approval_node)
  45. # 定义父图边
  46. parent_builder.add_edge(START, "subgraph")
  47. parent_builder.add_edge("manager_approval", END)
  48. # 编译父图
  49. parent_graph = parent_builder.compile(
  50.     checkpointer=MemorySaver()
  51. )
  52. # 运行示例
  53. if __name__ == "__main__":
  54.     print("=== 跨层级跳转测试 ===")
  55.     # 测试会触发升级的情况
  56.     result1 = parent_graph.invoke({"issue": "Critical error: system down"})
  57.     print(f"测试1 - 触发升级: {result1}")
  58.     # 应输出包含escalated=True和经理审批结果的状态
  59.     # 测试不会触发升级的情况
  60.     result2 = parent_graph.invoke({"issue": "Minor issue: slow response"})
  61.     print(f"测试2 - 不触发升级: {result2}")
  62.     # 应输出escalated=False且无经理审批结果的状态
复制代码
工具调用与状态注入

        一个常见的工具调用场景是从工具内部更新图谱状态。例如,在客户支持应用中,您可能希望在对话开始时根据客户的账号或 ID 查找客户信息。要从工具中更新图谱状态,您可以从工具中返回 Command(update={"my_custom_key": "foo", "messages": [...]})。代码示例如下:
  1. from typing import Annotated, Dict, Any, List
  2. from pydantic import BaseModel
  3. from operator import add
  4. from langgraph.graph import StateGraph, START, END, Command
  5. from langgraph.prebuilt import ToolNode
  6. from langgraph.checkpoint.memory import MemorySaver
  7. from langchain_core.tools import tool
  8. from langchain_core.messages import AIMessage, ToolMessage
  9. # 定义消息列表的reducer
  10. def add_messages(left: List[Any], right: List[Any]) -> List[Any]:
  11.     """消息列表的合并策略"""
  12.     return left + right
  13. # 1. 定义状态模型
  14. class SupportState(BaseModel):
  15.     user_id: str  # 客户ID
  16.     user_info: Annotated[Dict[str, Any], add] = {}  # 工具返回的用户信息
  17.     messages: Annotated[List[Any], add_messages] = []  # 消息历史(必须包含工具调用记录)
  18.     query_status: str = "pending"  # 流程状态标记
  19. # 2. 定义工具函数
  20. @tool
  21. def lookup_user_info(tool_call_id: Annotated[str, "tool_call_id"]) -> Command:
  22.     """根据用户ID查询客户信息(内部工具)"""
  23.     # 假设从线程配置中获取user_id(实际中可从config获取)
  24.     # 注意:此处简化处理,实际需注入config
  25.     user_id = "CUST-789"  # 模拟获取
  26.    
  27.     print(f"正在查询用户信息: {user_id}")
  28.    
  29.     # 模拟实际查询逻辑(可替换为数据库/API调用)
  30.     user_data = {
  31.         "user_id": user_id,
  32.         "name": "陈明亮",
  33.         "membership": "白金会员",
  34.         "contact": "chen@example.com",
  35.         "recent_tickets": "订单延迟问题"
  36.     }
  37.    
  38.     print(f"查询结果: {user_data}")
  39.    
  40.     # 返回Command更新状态
  41.     return Command(
  42.         update={
  43.             "user_info": user_data,
  44.             "messages": [ToolMessage(
  45.                 content=f"已获取用户 {user_id} 的详细信息",
  46.                 tool_call_id=tool_call_id
  47.             )],
  48.             "query_status": "user_info_fetched"
  49.         }
  50.     )
  51. # 3. 定义触发工具调用的节点
  52. def call_tool(state: SupportState) -> SupportState:
  53.     """创建工具调用消息的节点"""
  54.     print(f"\n===== 触发工具调用 =====")
  55.     tool_call_message = AIMessage(
  56.         content="",
  57.         tool_calls=[{
  58.             "name": "lookup_user_info",
  59.             "args": {},  # 无额外参数
  60.             "id": "call_123"
  61.         }]
  62.     )
  63.     return SupportState(messages=state.messages + [tool_call_message])
  64. # 4. 定义后续处理节点
  65. def handle_support_request(state: SupportState) -> SupportState:
  66.     """处理客户请求的节点(依赖工具返回的用户信息)"""
  67.     print(f"\n===== 开始处理客户请求 =====")
  68.     print(f"客户ID: {state.user_id}")
  69.     print(f"客户信息: {state.user_info}")
  70.     print(f"当前状态: {state.query_status}")
  71.     print(f"消息记录: {state.messages}")
  72.    
  73.     # 基于用户信息进行业务处理
  74.     if state.user_info.get("membership") == "白金会员":
  75.         priority_msg = "优先处理白金会员请求"
  76.     else:
  77.         priority_msg = "标准流程处理请求"
  78.    
  79.     # 更新最终状态
  80.     return SupportState(
  81.         user_id=state.user_id,
  82.         user_info=state.user_info,
  83.         messages=state.messages + [priority_msg, "客户请求处理完成"],
  84.         query_status="completed"
  85.     )
  86. # 5. 构建图谱(使用ToolNode处理Command)
  87. def build_support_graph():
  88.     builder = StateGraph(SupportState)
  89.    
  90.     # 创建ToolNode
  91.     tool_node = ToolNode([lookup_user_info])
  92.    
  93.     # 添加节点
  94.     builder.add_node("call_tool", call_tool)
  95.     builder.add_node("tools", tool_node)
  96.     builder.add_node("process_request", handle_support_request)
  97.    
  98.     # 定义边
  99.     builder.add_edge(START, "call_tool")
  100.     builder.add_edge("call_tool", "tools")
  101.     builder.add_edge("tools", "process_request")
  102.     builder.add_edge("process_request", END)
  103.    
  104.     return builder.compile(checkpointer=MemorySaver())
  105. # 6. 运行示例
  106. if __name__ == "__main__":
  107.     support_graph = build_support_graph()
  108.     print("=== 客户支持流程开始 ===")
  109.     result = support_graph.invoke({"user_id": "CUST-789"})
  110.     print("\n===== 流程结束 =====")
  111.     print(f"最终用户信息: {result['user_info']['name']} ({result['user_info']['membership']})")
  112.     print(f"流程完成状态: {result['query_status']}")
  113.     print(f"完整消息历史: {result['messages']}")
复制代码

人机协同工作流(Human-in-the-Loop)

        Command还可以与 interrupt() 结合实现人工审核中断与恢复:
  1. from typing import TypedDict
  2. from langgraph.checkpoint.memory import MemorySaver
  3. from langgraph.graph import StateGraph
  4. from langgraph.types import Command, interrupt
  5. # 新增模型相关导入
  6. from langchain_core.messages import HumanMessage
  7. from langchain_openai import ChatOpenAI
  8. from dotenv import load_dotenv
  9. load_dotenv()
  10. # 定义状态类型
  11. class State(TypedDict):
  12.     some_text: str
  13. # 初始化大模型(需要设置OPENAI_API_KEY环境变量)
  14. model = ChatOpenAI(model="gpt-4o-mini")
  15. # 初始化检查点存储
  16. checkpointer = MemorySaver()
  17. # 定义人类介入节点
  18. def human_node(state: State):
  19.     value = interrupt(
  20.         {
  21.             "text_to_revise": state["some_text"],
  22.             "instructions": "请修改以下文本:"
  23.         }
  24.     )
  25.     return {"some_text": value}
  26. # 修改后的自动处理节点(调用大模型)
  27. def process_text(state: State):
  28.     # 构造模型请求
  29.     message = model.invoke([
  30.         HumanMessage(content=f"请处理以下请求:{state['some_text']}。保持回答简洁。")
  31.     ])
  32.     # 返回模型生成的文本
  33.     return {"some_text": message.content}
  34. # 构建工作流
  35. graph_builder = StateGraph(State)
  36. # 添加节点
  37. graph_builder.add_node("human_review", human_node)
  38. graph_builder.add_node("auto_process", process_text)
  39. # 设置流程
  40. graph_builder.set_entry_point("auto_process")
  41. graph_builder.add_edge("auto_process", "human_review")
  42. # 编译图表
  43. graph = graph_builder.compile(
  44.     checkpointer=checkpointer,
  45.     interrupt_before=["human_review"]
  46. )
  47. # 使用示例(保持不变)
  48. if __name__ == "__main__":
  49.     thread_id = "thread_123"
  50.     thread_config = {"configurable": {"thread_id": thread_id}}
  51.     initial_state = {"some_text": "输出一个五五乘法表"}
  52.     result = graph.invoke(initial_state, config=thread_config)
  53.     print("自动处理结果:", result["some_text"])
  54.     human_input = input("请输入人类输入:")
  55.     resume_result = graph.invoke(
  56.         Command(resume=human_input),
  57.         config=thread_config
  58.     )
  59.     print("最终结果:", resume_result["some_text"])
复制代码
最佳实践与注意事项

参考文献

Overview

原文地址:https://blog.csdn.net/weixin_41645817/article/details/149855143




欢迎光临 AI创想 (https://www.llms-ai.com/) Powered by Discuz! X3.4