AI创想
标题:
LangGraph 人工介入机制实战:interrupt 功能设计与应用模式解析
[打印本页]
作者:
AI小编
时间:
5 小时前
标题:
LangGraph 人工介入机制实战:interrupt 功能设计与应用模式解析
作者:佑瞻
在开发智能体流程时,我们常常会遇到这样的场景:当智能体需要做出关键决策或处理敏感信息时,必须引入人工审核环节。比如金融交易审批、医疗诊断复核等场景,单纯的自动化流程可能存在风险。这时候,如何在智能体流程中无缝嵌入人工介入机制,就成为了一个关键挑战。今天,我们就来聊聊 LangGraph 框架中强大的 interrupt 功能,看看它是如何优雅地实现 "人在回路" 的工作流设计。
interrupt 功能基础:流程控制的 "暂停键"
LangGraph 中的 interrupt 函数就像是流程控制的 "暂停键",它能在指定节点暂停图的执行,将信息呈现给人工,然后根据人工输入继续执行。这种机制非常适合需要人工干预的场景,比如内容审核、数据校正等。
中断机制的核心流程:从定义到触发
human_node 的完整工作流程
先来看一个完整的中断示例,注意代码中三个关键阶段的衔接:
python
from langgraph.types import interrupt, Command
from langgraph.graph import GraphBuilder
from langgraph.checkpoint import Checkpointer
# 1. 定义包含interrupt的节点函数
def human_node(state: dict):
"""当图执行到该节点时会暂停,等待人工输入"""
# 传递需要人工处理的文本内容
revised_text = interrupt({
"text_to_revise": state["original_text"],
"instruction": "请将这段文本简化为50字以内"
})
# 返回人工处理后的结果,更新图状态
return {"processed_text": revised_text}
# 2. 构建并编译图
graph_builder = GraphBuilder()
graph_builder.add_node("human_review", human_node)
graph_builder.add_start_node("start", next_node="human_review")
checkpointer = Checkpointer() # 必须指定检查点保存状态
graph = graph_builder.compile(checkpointer=checkpointer)
# 3. 执行图并处理中断
if __name__ == "__main__":
# 首次调用触发中断
thread_config = {"configurable": {"thread_id": "review_001"}}
first_result = graph.invoke(
{"original_text": "这是一段需要简化的长文本,包含了许多细节信息"},
config=thread_config
)
# 解析中断结果
if "__interrupt__" in first_result:
print("检测到中断,需要人工处理")
interrupt_info = first_result["__interrupt__"][0]
print(f"中断节点: {interrupt_info.ns}")
print(f"待处理文本: {interrupt_info.value['text_to_revise']}")
# 4. 模拟人工输入并恢复流程
human_input = "简化后的文本:长文本包含细节信息"
resumed_result = graph.invoke(
Command(resume=human_input),
config=thread_config
)
print("流程恢复后结果:")
print(resumed_result) # 输出: {'processed_text': '简化后的文本:长文本包含细节信息'}
复制代码
这段代码展示了中断机制的完整生命周期,关键要理解:
human_node并不是直接调用的,而是通过图的执行流程触发的
。当图运行到 "human_review" 节点时,interrupt 函数会被激活,导致流程暂停。
中断恢复的核心原理:重新执行节点的底层逻辑
恢复执行的关键特性
LangGraph 的中断恢复机制与 Python 的input()函数有本质区别:
当使用Command恢复流程时,节点会从函数开头重新执行,而非从中断位置继续
。我们可以通过一个计数器示例直观理解:
python
counter = 0
def node_with_interrupt(state: dict):
global counter
counter += 1
print(f"节点第{counter}次执行")
user_input = interrupt("请输入内容")
print(f"当前counter值: {counter}")
return {"input": user_input}
# 首次调用触发中断
graph.invoke({"initial": "data"}, config=thread_config)
# 输出: 节点第1次执行
# 恢复后再次执行
graph.invoke(Command(resume="用户输入"), config=thread_config)
# 输出: 节点第2次执行
# 输出: 当前counter值: 2
复制代码
核心结论
:每次恢复都会导致节点从头执行,这是理解中断机制的关键前提。
恢复执行的流程示意图
(, 下载次数: 0)
上传
点击文件名下载附件
三大设计模式:人工介入的场景化实现
通常,在有人参与的工作流程中,可以执行三种不同的
操作:
1.批准或拒绝
:在关键步骤(如API调用)之前暂停图表,以审查并批准该操作。如果该操作被拒绝,你可以阻止图表执行该步骤,并可能采取替代行动。这种模式通常涉及根据人工输入对图表进行
路由
。
2.
编辑图状态
:暂停图表以查看和编辑图状态。这对于纠正错误或使用其他信息更新状态很有用。此模式通常涉及根据人类输入
更新
状态。
3.
获取输入
:在图形的特定步骤中明确请求人工输入。这对于收集更多信息或上下文以辅助智能体的决策过程很有用。
[color=rgba(0, 0, 0, 0.87)]Approve or reject¶ 批准或拒绝
在涉及敏感操作时,我们需要在执行前让人工确认。比如调用外部 API 发送营销邮件前,先让运营人员审核内容:
(, 下载次数: 0)
上传
点击文件名下载附件
from langgraph.types import Command
from typing import Literal # 若未导入需补充
def human_approval(state: State) -> Command[Literal["some_node", "another_node"]]:
# 通过interrupt函数暂停图执行,向人工展示待审核内容并获取审批结果
is_approved = interrupt(
{
"question": "这是否正确?", # 审核问题
# 展示需要人工审核和批准的输出(如LLM生成结果)
"llm_output": state["llm_output"] # 待审核的LLM输出
}
)
# 根据人工审批结果返回不同的流程控制指令(Command对象)
if is_approved:
return Command(goto="some_node") # 批准则跳转到"some_node"节点
else:
return Command(goto="another_node") # 拒绝则跳转到"another_node"节点
# 在合适的位置将节点添加到图中,并连接相关节点
graph_builder.add_node("human_approval", human_approval)
graph = graph_builder.compile(checkpointer=checkpointer)
# 运行图并触发interrupt后,图会暂停执行
# 使用批准(resume=True)或拒绝(resume=False)操作来恢复执行
thread_config = {"configurable": {"thread_id": "some_id"}}
graph.invoke(Command(resume=True), config=thread_config) # 示例:模拟批准操作
复制代码
这种模式的核心是根据人工输入进行流程路由,确保敏感操作的安全性。在金融、医疗等对合规性要求高的场景中尤为重要。
[color=rgba(0, 0, 0, 0.87)]Review & edit state¶ 查看与编辑状态
当智能体生成的内容需要人工修正时,我们可以使用编辑状态模式。比如 LLM 生成的新闻摘要需要编辑优化:
(, 下载次数: 0)
上传
点击文件名下载附件
from langgraph.types import interrupt
def human_editing(state: State):
...
result = interrupt(
# 要呈现给客户端的中断信息,可为任意 JSON 可序列化的值
{
"task": "审核 LLM 输出内容并进行必要编辑",
"llm_generated_summary": state["llm_generated_summary"]
}
)
# 使用编辑后的文本更新状态
return {
"llm_generated_summary": result["edited_text"]
}
# 在合适位置将节点添加到图中,并连接相关节点
graph_builder.add_node("human_editing", human_editing)
graph = graph_builder.compile(checkpointer=checkpointer)
...
# 运行图直至触发中断,此时图将暂停
# 使用编辑后的文本恢复图执行
thread_config = {"configurable": {"thread_id": "some_id"}}
graph.invoke(
Command(resume={"edited_text": "The edited text"}),
config=thread_config
)
复制代码
这种模式常用于内容生成场景,人工可以修正智能体的输出,确保结果符合要求。
获取输入模式:补充缺失的关键信息
当智能体需要额外信息才能继续时,可以主动请求人工输入。比如客服系统中获取用户的具体需求:
python
def get_user_requirement(state: State):
# 循环获取有效的用户需求
while True:
requirement = interrupt("请详细描述您的需求")
# 简单验证输入
if len(requirement) < 10:
interrupt("需求描述过短,请补充更多细节")
else:
break
return {"user_requirement": requirement}
# 在流程中使用
graph_builder.add_node("gather_requirement", get_user_requirement)
graph_builder.connect("start", "gather_requirement")
复制代码
这种模式通过多次 interrupt 调用,确保获取到足够的信息,常用于客服、工单系统等需要人机交互的场景。
高级技巧与常见陷阱
输入验证:打造鲁棒的人机交互
有时我们需要确保人工输入的有效性,比如年龄必须是正整数:
python
def validate_age(state: State):
question = "您的年龄是多少?"
while True:
answer = interrupt(question)
try:
age = int(answer)
if age <= 0:
raise ValueError
break
except:
question = f"'{answer}'不是有效年龄,请重新输入"
continue
return {"age": age}
# 调用时会循环直到输入有效年龄
graph.invoke({"initial": "data"}, config=config)
# 输入"25"后正常返回
复制代码
这种循环验证机制可以确保流程获取到合法输入,避免因错误输入导致后续处理出错。
副作用处理:避免重复执行的坑
由于 interrupt 恢复时会重新执行节点,具有副作用的操作(如 API 调用)必须放在 interrupt 之后:
python
# 错误做法:API调用在interrupt之前
def bad_design(state: State):
# 每次恢复都会重新调用API
result = api_call("sensitive_operation")
answer = interrupt("是否继续?")
return {"result": result}
# 正确做法:API调用在interrupt之后
def good_design(state: State):
answer = interrupt("是否允许调用API?")
if answer:
result = api_call("sensitive_operation")
return {"result": result if answer else None}
# 更好的做法:将副作用操作放在单独节点
def separate_node(state: State):
return {"should_call": interrupt("是否调用API?")}
graph_builder.add_node("decision", separate_node)
graph_builder.add_node("api_call", api_node)
graph_builder.connect("decision", "api_call", condition=lambda x: x["should_call"])
复制代码
将副作用操作放在单独节点或 interrupt 之后,可以避免重复执行带来的性能问题或数据不一致。
多中断处理:保持顺序一致性
当一个节点包含多个 interrupt 时,必须注意调用顺序,因为恢复时是按索引匹配的:
python
def multiple_interrupts(state: State):
# 两个interrupt按顺序调用
name = interrupt("您的姓名?")
age = interrupt("您的年龄?")
return {"name": name, "age": age}
# 恢复时按顺序传入两个值
graph.invoke(
Command(resume=["张三", 25]),
config=config
)
复制代码
如果在节点中动态改变 interrupt 的顺序或数量,会导致索引不匹配,因此应避免在执行期间修改节点结构。
结语:让智能体流程更灵活可控
通过 LangGraph 的 interrupt 机制,我们可以轻松构建 "人在回路" 的智能体流程,既发挥了 AI 的自动化优势,又保留了人工干预的灵活性。从审批流程到内容修正,从信息收集到决策支持,interrupt 功能为我们提供了强大的工具。
如果本文对你有帮助,别忘了点赞收藏,关注我,一起探索更高效的开发方式~
原文地址:https://blog.csdn.net/The_Thieves/article/details/148828318
欢迎光临 AI创想 (https://www.llms-ai.com/)
Powered by Discuz! X3.4