AI创想
标题:
LangGraph工作流人机交互
[打印本页]
作者:
创想小编
时间:
昨天 23:13
标题:
LangGraph工作流人机交互
作者:CSDN博客
interrupt
LangGraph 把“人机交互”当成
一等公民
来设计,核心思想只有一句话:
在图的任意节点先“暂停”,等人类给反馈后再继续运行
1. 核心机制:interrupt / Command 一对原语
interrupt("提示语")
节点里调用它,图
立即暂停
并把提示语抛给上层(控制台/前端/业务系统)。
Command(resume=...)
人类输入完成后,用这条命令把值重新塞回图,流程
精准续跑
。
整个状态(含历史消息、变量)在暂停期间
持久化到 Checkpoint
,即使隔几天再恢复也不丢。
from typing import TypedDict
from langgraph.checkpoint.memory import MemorySaver
from langgraph.constants import START
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
class State(TypedDict):
"""图形状态。"""
some_text: str
def human_node(state: State):
value = interrupt(
# 任何可序列化为 JSON 的值,供人类查看。
# 例如,一个问题、一段文本或状态中的一组键
{
"text_to_revise": state["some_text"]
}
)
return {
# 使用人类的输入更新状态
"some_text": value
}
# 构建图形
graph_builder = StateGraph(State)
# 将人类节点添加到图形中
graph_builder.add_node("human_node", human_node)
graph_builder.add_edge(START, "human_node")
# 使用中断功能需要一个检查点。
checkpointer = MemorySaver()
graph = graph_builder.compile(
checkpointer=checkpointer
)
thread_config = {"configurable": {"thread_id": "abc123"}}# 使用 stream() 直接展示 `__interrupt__` 信息。
for chunk in graph.stream({"some_text": "原始文本"}, config=thread_config):
print(chunk)
# 使用 Command 恢复
for chunk in graph.stream(Command(resume="编辑后的文本"), config=thread_config):
print(chunk)
复制代码
graph.stream: 原始文本 执行然后进入到human_node节点,节点执行interrupt,直接返回
chunk内容为
(, 下载次数: 0)
上传
点击文件名下载附件
第二次交互 command
直接进入到 human_node 即中断的那个节点,会从节点开头执行(非断点处执行)。
此时的Value=interrput(..) 接收到值就是Command传递进来的值 “编辑后的文本”
(, 下载次数: 0)
上传
点击文件名下载附件
客服验证示例:
from __future__ import annotations
from typing import TypedDict
from langgraph.checkpoint.memory import MemorySaver
from langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
from pyexpat.errors import messages
# 1. 状态定义
class State(TypedDict):
messages: list
customer_id: str | None
user_reply: str | None # 用于接住中断回复
# 2. 节点:未验证则中断
def verify_info(state: State):
if state["customer_id"] is not None:
return state
reply = interrupt({"user_reply": "请输入手机号"}) # key 必须和 State 对应
return {"messages":"需要输入手机号","user_reply": reply, "customer_id": reply}
# 3. 节点:根据手机号前缀决定回复
def supervisor(state: State):
if state["customer_id"].startswith(("138", "188")):
return {"messages": ["超级碗"]}
else:
return {"messages": ["啥也不是"]}
# 4. 构图
builder = StateGraph(State)
builder.add_node("verify", verify_info)
builder.add_node("supervisor", supervisor)
builder.add_edge(START, "verify")
builder.add_edge("verify", "supervisor")
builder.add_edge("supervisor", END)
graph = builder.compile(checkpointer=MemorySaver())
# 5. 第一次运行(中断前)
thread = {"configurable": {"thread_id": "1"}}
state1 = graph.invoke({"messages": [], "customer_id": None, "user_reply": None}, thread)
print("中断前状态 ->", state1)
# 6. 人工输入后续跑
resume_cmd = Command(resume="13800138000")
final_state = graph.invoke(resume_cmd, thread)
print("最终状态 ->", final_state)
复制代码
扩展
graph = builder.compile(checkpointer=MemorySaver(),interrupt_before=["verify"] )interrupt_before=["verify"] 是
LangGraph 在“图编译”阶段就声明的“断点”
,含义一句话:
只要执行流即将进入名为 verify 的节点,就先暂停整个图,把控制权交回给调用方(或前端),等人手动输入后再继续。
invoke (vs stream)
.invoke()
只返回最终状态字典
,不 yield 事件。
中断/续跑逻辑完全一致,只是
一次性拿结果
,不再循环打印中间态。
messages: list messages: Annotated[list, add_messages]
messages: list →
纯字段
,节点返回什么就
完全替换
旧列表。
messages: Annotated[list, add_messages] →
带 reducer
,节点返回的新消息会被
追加
到旧列表,
不会丢失历史
。
原文地址:https://blog.csdn.net/jianlee1991/article/details/155032086
欢迎光临 AI创想 (https://www.llms-ai.com/)
Powered by Discuz! X3.4