AI创想
标题:
LangGraph 内存与人工介入深度解析:构建有记忆的智能交互系统
[打印本页]
作者:
AI小编
时间:
4 小时前
标题:
LangGraph 内存与人工介入深度解析:构建有记忆的智能交互系统
作者:佑瞻
在开发对话式 AI 应用时,我们常常面临两个核心挑战:如何让智能体记住用户的历史对话?当智能体执行敏感操作时如何引入人工审核?LangGraph 作为新一代智能体开发框架,通过完善的内存管理机制和人在回路功能,为这些问题提供了系统性解决方案。本文将从原理到实践,详细解析 LangGraph 的记忆系统与人工介入机制,帮助你构建更智能、更可靠的对话应用。
一、短期记忆:维持对话连续性的核心机制
1.1 短期记忆的本质与作用
想象一下,我们与朋友聊天时,大脑会自动保存最近的对话内容,这样才能理解上下文语义。智能体的短期记忆(Short-term memory)正是模拟这一机制,它能够跟踪会话中的消息历史,让智能体理解多轮对话的语境。在 LangGraph 中,短期记忆也被称为线程级记忆(thread-level memory),它以thread_id作为对话会话的唯一标识,就像给每段对话贴了一个专属标签。
短期记忆的核心价值在于:
实现多轮对话的上下文理解保存当前会话中的临时状态数据为工具调用提供即时上下文支持
1.2 短期记忆的技术实现
要启用短期记忆,需要完成两个关键步骤:状态持久化配置和会话标识管理。下面通过一个天气查询的例子,看看如何在 LangGraph 中实现短期记忆:
python
from langgraph.prebuilt import create_react_agent
from langgraph.checkpoint.memory import InMemorySaver
# 1. 初始化内存保存器(checkpointer)
# 这就像为智能体配备一个"即时笔记本",记录当前会话的状态
checkpointer = InMemorySaver()
def get_weather(city: str) -> str:
"""获取指定城市的天气"""
return f"{city}的天气是晴朗"
# 2. 创建带短期记忆的智能体
# checkpointer参数让智能体具备记录会话状态的能力
agent = create_react_agent(
model="anthropic:claude-3-7-sonnet-latest",
tools=[get_weather],
checkpointer=checkpointer
)
# 3. 第一次调用智能体,指定thread_id
# thread_id就像对话的"身份证",相同ID表示同一段会话
config = {"configurable": {"thread_id": "user_123"}}
first_response = agent.invoke(
{"messages": [{"role": "user", "content": "旧金山天气如何"}]},
config
)
print("第一次响应:", first_response) # 输出:旧金山的天气是晴朗
# 4. 第二次调用,使用相同thread_id
# 智能体会自动包含第一次对话的历史,理解用户是在继续询问
second_response = agent.invoke(
{"messages": [{"role": "user", "content": "那纽约呢?"}]},
config
)
print("第二次响应:", second_response) # 输出:纽约的天气是晴朗
复制代码
这段代码展示了短期记忆的核心工作流程:通过checkpointer保存会话状态,用thread_id标识对话上下文。当第二次调用使用相同thread_id时,智能体会自动携带第一次对话的历史消息,从而理解用户是在询问纽约的天气,而不是开启一个新话题。
1.3 长对话的历史管理策略
当对话持续进行时,消息历史可能会超出 LLM 的上下文窗口限制。LangGraph 提供了两种常用的解决方案,就像我们整理笔记时会采用摘要或删减策略:
1.3.1 对话总结(Summarization)
通过持续生成对话摘要,既能保留关键信息,又不会超出上下文限制:
python
from langchain_anthropic import ChatAnthropic
from langmem.short_term import SummarizationNode
from langchain_core.messages.utils import count_tokens_approximately
from langgraph.prebuilt import create_react_agent
from langgraph.prebuilt.chat_agent_executor import AgentState
# 初始化模型,就像找一个"速记员"来总结对话
model = ChatAnthropic(model="claude-3-7-sonnet-latest")
# 配置总结节点,设置摘要规则
summarization_node = SummarizationNode(
token_counter=count_tokens_approximately, # 计算令牌数的工具
model=model, # 使用的LLM模型
max_tokens=384, # 原始消息最大令牌数
max_summary_tokens=128, # 总结后的最大令牌数
output_messages_key="llm_input_messages" # 输出消息的键
)
class State(AgentState):
# 保存上下文信息,避免每次都重新总结
context: dict[str, any]
# 创建带总结功能的智能体
# pre_model_hook参数让智能体在调用模型前先进行对话总结
agent = create_react_agent(
model=model,
tools=[...], # 工具列表
pre_model_hook=summarization_node,
state_schema=State,
checkpointer=InMemorySaver()
)
复制代码
这个方案就像有一个助理在对话过程中不断记录要点,当对话太长时,助理会生成一份摘要,确保智能体不会遗漏关键信息。
1.3.2 消息修剪(Trimming)
通过移除部分历史消息,保留最近的关键内容:
python
from langchain_core.messages.utils import trim_messages, count_tokens_approximately
from langgraph.prebuilt import create_react_agent
# 定义修剪函数,每次调用模型前自动执行
def pre_model_hook(state):
# 修剪消息历史,strategy="last"表示保留最近的消息
trimmed_messages = trim_messages(
state["messages"],
strategy="last",
token_counter=count_tokens_approximately,
max_tokens=384,
start_on="human", # 从人类消息开始
end_on=("human", "tool") # 到人类或工具消息结束
)
return {"llm_input_messages": trimmed_messages}
# 创建智能体,应用修剪函数
agent = create_react_agent(
model=...,
tools=[...],
pre_model_hook=pre_model_hook,
checkpointer=InMemorySaver()
)
复制代码
这种方式类似我们整理聊天记录时,删除较早的无关对话,只保留最近的重要内容,确保智能体处理的是最相关的信息。
1.4 工具与短期记忆的交互
1.4.1 从工具读取记忆
工具可以直接访问智能体的短期记忆,就像从 "即时笔记本" 中查阅当前会话的信息:
python
from typing import Annotated
from langgraph.prebuilt import InjectedState, create_react_agent
class CustomState(AgentState):
user_id: str # 定义状态中包含用户ID
def get_user_info(
state: Annotated[CustomState, InjectedState]
) -> str:
"""根据用户ID查找信息,state参数直接获取当前状态"""
user_id = state["user_id"]
return "用户: 张三" if user_id == "user_123" else "未知用户"
# 创建智能体并指定状态模式
# state_schema参数定义了智能体状态的结构
agent = create_react_agent(
model="anthropic:claude-3-7-sonnet-latest",
tools=[get_user_info],
state_schema=CustomState
)
# 调用智能体并传入状态数据
response = agent.invoke({
"messages": "查询用户信息",
"user_id": "user_123" # 传入用户ID到状态中
})
复制代码
1.4.2 从工具更新记忆
工具可以修改短期记忆,影响后续对话,就像在 "即时笔记本" 中添加新的笔记:
python
from typing import Annotated
from langchain_core.tools import InjectedToolCallId
from langchain_core.runnables import RunnableConfig
from langgraph.types import Command
from langgraph.prebuilt import InjectedState, create_react_agent
class CustomState(AgentState):
user_name: str # 定义状态中包含用户名
def update_user_info(
tool_call_id: Annotated[str, InjectedToolCallId],
config: RunnableConfig
) -> Command:
"""更新用户信息并保存到状态"""
user_id = config["configurable"].get("user_id")
# 根据用户ID获取用户名
name = "张三" if user_id == "user_123" else "未知用户"
# Command对象用于更新状态
return Command(update={
"user_name": name,
"messages": [f"成功查询到用户信息: {name}"] # 同时更新消息历史
})
def greet(
state: Annotated[CustomState, InjectedState]
) -> str:
"""根据用户信息打招呼,直接读取状态中的用户名"""
user_name = state["user_name"]
return f"你好,{user_name}!"
# 创建智能体,定义状态结构
agent = create_react_agent(
model="anthropic:claude-3-7-sonnet-latest",
tools=[update_user_info, greet],
state_schema=CustomState
)
# 调用智能体,传入用户ID
response = agent.invoke(
{"messages": [{"role": "user", "content": "向用户打招呼"}]},
config={"configurable": {"user_id": "user_123"}}
)
复制代码
这两段代码展示了工具与短期记忆的双向交互:工具可以读取当前状态来决定行为,也可以更新状态来影响后续对话,形成一个动态的交互循环。
二、长期记忆:跨会话的数据持久化
2.1 长期记忆的核心价值
如果说短期记忆是智能体的 "即时笔记本",那么长期记忆(Long-term memory)就是 "永久档案库"。它用于跨会话存储用户特定数据,如用户偏好、历史交互记录等。例如,聊天机器人需要记住用户的默认语言设置,即使用户第二天再次访问,也不需要重新设置。
长期记忆的关键应用场景包括:
保存用户偏好设置(如主题模式、通知频率)存储历史交互记录(如订单历史、咨询记录)维护应用级数据(如系统配置、共享知识库)
2.2 长期记忆的基础操作
2.2.1 数据读取
从长期记忆中检索数据,就像从档案库中查找特定文件:
python
from langchain_core.runnables import RunnableConfig
from langgraph.config import get_store
from langgraph.prebuilt import create_react_agent
from langgraph.store.memory import InMemoryStore
# 初始化内存存储,相当于创建一个简易的"档案库"
store = InMemoryStore()
# 预先存储用户信息,就像在档案库中归档文件
store.put(
("users",), # 存储路径,类似文件夹结构
"user_123", # 键,类似文件名
{
"name": "张三",
"language": "中文",
"preferences": {"theme": "dark"}
}
)
def get_user_info(config: RunnableConfig) -> str:
"""从长期记忆获取用户信息"""
store = get_store() # 获取当前智能体使用的存储实例
user_id = config["configurable"].get("user_id")
# 读取用户信息,就像从档案库中取出文件
user_info = store.get(("users",), user_id)
return f"用户信息: {user_info.value}" if user_info else "用户不存在"
# 创建智能体并关联存储
# store参数让智能体具备访问长期记忆的能力
agent = create_react_agent(
model="anthropic:claude-3-7-sonnet-latest",
tools=[get_user_info],
store=store
)
# 调用智能体查询用户信息
response = agent.invoke(
{"messages": [{"role": "user", "content": "查询我的用户信息"}]},
config={"configurable": {"user_id": "user_123"}}
)
复制代码
2.2.2 数据写入
向长期记忆保存数据,如同在档案库中新增文件:
python
from typing_extensions import TypedDict
from langgraph.config import get_store
from langgraph.prebuilt import create_react_agent
from langgraph.store.memory import InMemoryStore
# 定义用户信息的数据结构
class UserInfo(TypedDict):
name: str
def save_user_info(user_info: UserInfo, config: RunnableConfig) -> str:
"""保存用户信息到长期记忆"""
store = get_store() # 获取存储实例
user_id = config["configurable"].get("user_id")
# 保存用户信息,就像在档案库中新增文件
store.put(("users",), user_id, user_info)
return "用户信息保存成功"
# 创建智能体并关联存储
store = InMemoryStore()
agent = create_react_agent(
model="anthropic:claude-3-7-sonnet-latest",
tools=[save_user_info],
store=store
)
# 调用智能体保存用户信息
response = agent.invoke(
{"messages": [{"role": "user", "content": "我的名字是张三"}],
config={"configurable": {"user_id": "user_123"}}
)
# 直接访问存储验证数据,就像直接查阅档案库
saved_info = store.get(("users",), "user_123").value
print("保存的用户信息:", saved_info)
复制代码
这两段代码展示了长期记忆的基本操作流程:通过store对象进行数据的读取和写入,实现跨会话的数据持久化。需要注意的是,实际生产环境中应使用数据库等持久化存储,而不是示例中的内存存储。
2.3 高级记忆功能
2.3.1 语义搜索
LangGraph 支持通过语义相似度搜索长期记忆,就像在档案库中通过内容查找相关文件,而不是仅通过文件名:
python
from langchain.embeddings import init_embeddings
from langgraph.store.memory import InMemoryStore
embeddings = init_embeddings("openai:text-embedding-3-small")
store = InMemoryStore(
index={
"embed": embeddings,
"dims": 1536,
}
)
store.put(("user_123", "memories"), "1", {"text": "I love pizza"})
store.put(("user_123", "memories"), "2", {"text": "I am a plumber"})
items = store.search(
("user_123", "memories"), query="I'm hungry", limit=1
)
复制代码
LangMem 提供了更抽象的记忆操作接口,封装了底层存储细节,使记忆管理更加便捷。
三、人在回路:智能体操作的人工审核机制
3.1 人工介入的应用场景
当智能体执行敏感操作时,如预订酒店、进行转账、修改用户资料等,需要引入人工审核环节。LangGraph 的人在回路(Human-in-the-loop, HIL)功能允许智能体在执行过程中无限期暂停,等待人工输入,就像现实中的审批流程一样。
3.2 人工审核的技术实现
3.2.1 直接在工具中添加审核
通过interrupt()函数暂停执行并等待人工审核:
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import interrupt
from langgraph.prebuilt import create_react_agent
def book_hotel(hotel_name: str):
"""预订酒店的工具,包含人工审核环节"""
# 暂停执行并生成审核请求
# 这个操作就像智能体在执行前先提交一个审批单
response = interrupt(
f"尝试预订酒店: {hotel_name},请审核"
)
if response["type"] == "accept":
# 审核通过,执行预订
return f"成功预订{hotel_name}"
elif response["type"] == "edit":
# 审核修改,使用新的酒店名称
new_hotel = response["args"]["hotel_name"]
return f"已修改并预订{new_hotel}"
else:
raise ValueError(f"未知审核响应: {response['type']}")
# 创建智能体,配置checkpointer用于保存状态
checkpointer = InMemorySaver()
agent = create_react_agent(
model="anthropic:claude-3-5-sonnet-latest",
tools=[book_hotel],
checkpointer=checkpointer
)
# 调用智能体,指定thread_id以便恢复会话
config = {"configurable": {"thread_id": "booking_123"}}
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "预订麦克基特里克酒店"}]},
config
):
print(chunk)
# 此时智能体暂停,等待人工审核
# 人工审核后恢复执行
from langgraph.types import Command
for chunk in agent.stream(
Command(resume={"type": "accept"}), # 审核通过
config
):
print(chunk)
复制代码
这段代码展示了人工审核的完整流程:智能体在执行敏感操作前先提交审核请求,暂停执行等待人工决策,根据审核结果(接受、修改、拒绝)进行相应处理。interrupt()函数是实现这一机制的关键,它会持久化保存当前状态,允许系统在人工审核后从断点继续执行。
3.2.2 通用审核包装器
无需修改工具代码,通过包装器为任意工具添加审核功能:
python
from typing import Callable
from langchain_core.tools import BaseTool, tool as create_tool
from langchain_core.runnables import RunnableConfig
from langgraph.types import interrupt
from langgraph.prebuilt.interrupt import HumanInterruptConfig
def add_human_in_the_loop(
tool: Callable | BaseTool,
*,
interrupt_config: HumanInterruptConfig = None
) -> BaseTool:
"""为工具添加人工审核的包装器函数"""
if not isinstance(tool, BaseTool):
tool = create_tool(tool)
if interrupt_config is None:
# 设置默认审核配置,允许接受、修改、响应
interrupt_config = {
"allow_accept": True,
"allow_edit": True,
"allow_respond": True
}
@create_tool(
tool.name,
description=tool.description,
args_schema=tool.args_schema
)
def call_tool_with_interrupt(config: RunnableConfig, **tool_input):
# 生成审核请求,包含工具名称和参数
request = {
"action_request": {
"action": tool.name,
"args": tool_input
},
"config": interrupt_config,
"description": "请审核工具调用"
}
# 触发中断,等待人工审核响应
response = interrupt([request])[0]
if response["type"] == "accept":
# 审核通过,执行原始工具
return tool.invoke(tool_input, config)
elif response["type"] == "edit":
# 审核修改,使用新参数
tool_input = response["args"]["args"]
return tool.invoke(tool_input, config)
elif response["type"] == "response":
# 直接返回用户反馈,不执行工具
return response["args"]
else:
raise ValueError(f"不支持的审核响应: {response['type']}")
return call_tool_with_interrupt
# 使用包装器为酒店预订工具添加审核
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.prebuilt import create_react_agent
def book_hotel(hotel_name: str):
"""原始预订酒店工具,无需修改"""
return f"成功预订{hotel_name}"
# 应用包装器,生成带审核功能的新工具
checked_tool = add_human_in_the_loop(book_hotel)
# 创建智能体
checkpointer = InMemorySaver()
agent = create_react_agent(
model="anthropic:claude-3-5-sonnet-latest",
tools=[checked_tool],
checkpointer=checkpointer
)
# 调用智能体,流程中会自动触发人工审核
config = {"configurable": {"thread_id": "booking_123"}}
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "预订麦克基特里克酒店"}]},
config
):
print(chunk)
复制代码
这种方式的优势在于解耦了审核逻辑和工具逻辑,无需修改工具代码即可为任意工具添加审核功能,提高了代码的可复用性和可维护性。
四、总结
核心技术点回顾
短期记忆
:基于checkpointer和thread_id实现会话级上下文管理,适用于维持对话连续性
长期记忆
:通过store实现跨会话数据持久化,适用于保存用户偏好、历史记录等
人工介入
:利用interrupt()实现敏感操作的人工审核,确保系统安全性
如果本文对你有帮助,别忘了点赞收藏,关注我,一起探索更高效的开发方式~
原文地址:https://blog.csdn.net/The_Thieves/article/details/148796700
欢迎光临 AI创想 (https://www.llms-ai.com/)
Powered by Discuz! X3.4