作者:CSDN博客
interrupt
LangGraph 把“人机交互”当成 一等公民 来设计,核心思想只有一句话:
在图的任意节点先“暂停”,等人类给反馈后再继续运行
1. 核心机制:interrupt / Command 一对原语
- interrupt("提示语")
节点里调用它,图立即暂停并把提示语抛给上层(控制台/前端/业务系统)。
- Command(resume=...)
人类输入完成后,用这条命令把值重新塞回图,流程精准续跑。
整个状态(含历史消息、变量)在暂停期间持久化到 Checkpoint,即使隔几天再恢复也不丢。- from typing import TypedDict
- from langgraph.checkpoint.memory import MemorySaver
- from langgraph.constants import START
- from langgraph.graph import StateGraph
- from langgraph.types import interrupt, Command
- class State(TypedDict):
- """图形状态。"""
- some_text: str
- def human_node(state: State):
- value = interrupt(
- # 任何可序列化为 JSON 的值,供人类查看。
- # 例如,一个问题、一段文本或状态中的一组键
- {
- "text_to_revise": state["some_text"]
- }
- )
- return {
- # 使用人类的输入更新状态
- "some_text": value
- }
- # 构建图形
- graph_builder = StateGraph(State)
- # 将人类节点添加到图形中
- graph_builder.add_node("human_node", human_node)
- graph_builder.add_edge(START, "human_node")
- # 使用中断功能需要一个检查点。
- checkpointer = MemorySaver()
- graph = graph_builder.compile(
- checkpointer=checkpointer
- )
- thread_config = {"configurable": {"thread_id": "abc123"}}# 使用 stream() 直接展示 `__interrupt__` 信息。
- for chunk in graph.stream({"some_text": "原始文本"}, config=thread_config):
- print(chunk)
- # 使用 Command 恢复
- for chunk in graph.stream(Command(resume="编辑后的文本"), config=thread_config):
- print(chunk)
复制代码 graph.stream: 原始文本 执行然后进入到human_node节点,节点执行interrupt,直接返回
chunk内容为
第二次交互 command
直接进入到 human_node 即中断的那个节点,会从节点开头执行(非断点处执行)。
此时的Value=interrput(..) 接收到值就是Command传递进来的值 “编辑后的文本”
客服验证示例:- from __future__ import annotations
- from typing import TypedDict
- from langgraph.checkpoint.memory import MemorySaver
- from langgraph.constants import START, END
- from langgraph.graph import StateGraph
- from langgraph.types import interrupt, Command
- from pyexpat.errors import messages
- # 1. 状态定义
- class State(TypedDict):
- messages: list
- customer_id: str | None
- user_reply: str | None # 用于接住中断回复
- # 2. 节点:未验证则中断
- def verify_info(state: State):
- if state["customer_id"] is not None:
- return state
- reply = interrupt({"user_reply": "请输入手机号"}) # key 必须和 State 对应
- return {"messages":"需要输入手机号","user_reply": reply, "customer_id": reply}
- # 3. 节点:根据手机号前缀决定回复
- def supervisor(state: State):
- if state["customer_id"].startswith(("138", "188")):
- return {"messages": ["超级碗"]}
- else:
- return {"messages": ["啥也不是"]}
- # 4. 构图
- builder = StateGraph(State)
- builder.add_node("verify", verify_info)
- builder.add_node("supervisor", supervisor)
- builder.add_edge(START, "verify")
- builder.add_edge("verify", "supervisor")
- builder.add_edge("supervisor", END)
- graph = builder.compile(checkpointer=MemorySaver())
- # 5. 第一次运行(中断前)
- thread = {"configurable": {"thread_id": "1"}}
- state1 = graph.invoke({"messages": [], "customer_id": None, "user_reply": None}, thread)
- print("中断前状态 ->", state1)
- # 6. 人工输入后续跑
- resume_cmd = Command(resume="13800138000")
- final_state = graph.invoke(resume_cmd, thread)
- print("最终状态 ->", final_state)
复制代码 扩展
graph = builder.compile(checkpointer=MemorySaver(),interrupt_before=["verify"] )interrupt_before=["verify"] 是 LangGraph 在“图编译”阶段就声明的“断点”,含义一句话:
只要执行流即将进入名为 verify 的节点,就先暂停整个图,把控制权交回给调用方(或前端),等人手动输入后再继续。
invoke (vs stream)
.invoke()只返回最终状态字典,不 yield 事件。
中断/续跑逻辑完全一致,只是一次性拿结果,不再循环打印中间态。
messages: list messages: Annotated[list, add_messages]
messages: list → 纯字段,节点返回什么就完全替换旧列表。
messages: Annotated[list, add_messages] → 带 reducer,节点返回的新消息会被追加到旧列表,不会丢失历史。
原文地址:https://blog.csdn.net/jianlee1991/article/details/155032086 |