AI创想

标题: LangGraph 内存与人工介入深度解析:构建有记忆的智能交互系统 [打印本页]

作者: AI小编    时间: 4 小时前
标题: LangGraph 内存与人工介入深度解析:构建有记忆的智能交互系统
作者:佑瞻
在开发对话式 AI 应用时,我们常常面临两个核心挑战:如何让智能体记住用户的历史对话?当智能体执行敏感操作时如何引入人工审核?LangGraph 作为新一代智能体开发框架,通过完善的内存管理机制和人在回路功能,为这些问题提供了系统性解决方案。本文将从原理到实践,详细解析 LangGraph 的记忆系统与人工介入机制,帮助你构建更智能、更可靠的对话应用。
一、短期记忆:维持对话连续性的核心机制

1.1 短期记忆的本质与作用

想象一下,我们与朋友聊天时,大脑会自动保存最近的对话内容,这样才能理解上下文语义。智能体的短期记忆(Short-term memory)正是模拟这一机制,它能够跟踪会话中的消息历史,让智能体理解多轮对话的语境。在 LangGraph 中,短期记忆也被称为线程级记忆(thread-level memory),它以thread_id作为对话会话的唯一标识,就像给每段对话贴了一个专属标签。
短期记忆的核心价值在于:
1.2 短期记忆的技术实现

要启用短期记忆,需要完成两个关键步骤:状态持久化配置和会话标识管理。下面通过一个天气查询的例子,看看如何在 LangGraph 中实现短期记忆:
python
  1. from langgraph.prebuilt import create_react_agent
  2. from langgraph.checkpoint.memory import InMemorySaver
  3. # 1. 初始化内存保存器(checkpointer)
  4. # 这就像为智能体配备一个"即时笔记本",记录当前会话的状态
  5. checkpointer = InMemorySaver()
  6. def get_weather(city: str) -> str:
  7.     """获取指定城市的天气"""
  8.     return f"{city}的天气是晴朗"
  9. # 2. 创建带短期记忆的智能体
  10. # checkpointer参数让智能体具备记录会话状态的能力
  11. agent = create_react_agent(
  12.     model="anthropic:claude-3-7-sonnet-latest",
  13.     tools=[get_weather],
  14.     checkpointer=checkpointer
  15. )
  16. # 3. 第一次调用智能体,指定thread_id
  17. # thread_id就像对话的"身份证",相同ID表示同一段会话
  18. config = {"configurable": {"thread_id": "user_123"}}
  19. first_response = agent.invoke(
  20.     {"messages": [{"role": "user", "content": "旧金山天气如何"}]},
  21.     config
  22. )
  23. print("第一次响应:", first_response)  # 输出:旧金山的天气是晴朗
  24. # 4. 第二次调用,使用相同thread_id
  25. # 智能体会自动包含第一次对话的历史,理解用户是在继续询问
  26. second_response = agent.invoke(
  27.     {"messages": [{"role": "user", "content": "那纽约呢?"}]},
  28.     config
  29. )
  30. print("第二次响应:", second_response)  # 输出:纽约的天气是晴朗
复制代码
这段代码展示了短期记忆的核心工作流程:通过checkpointer保存会话状态,用thread_id标识对话上下文。当第二次调用使用相同thread_id时,智能体会自动携带第一次对话的历史消息,从而理解用户是在询问纽约的天气,而不是开启一个新话题。
1.3 长对话的历史管理策略

当对话持续进行时,消息历史可能会超出 LLM 的上下文窗口限制。LangGraph 提供了两种常用的解决方案,就像我们整理笔记时会采用摘要或删减策略:
1.3.1 对话总结(Summarization)

通过持续生成对话摘要,既能保留关键信息,又不会超出上下文限制:
python
  1. from langchain_anthropic import ChatAnthropic
  2. from langmem.short_term import SummarizationNode
  3. from langchain_core.messages.utils import count_tokens_approximately
  4. from langgraph.prebuilt import create_react_agent
  5. from langgraph.prebuilt.chat_agent_executor import AgentState
  6. # 初始化模型,就像找一个"速记员"来总结对话
  7. model = ChatAnthropic(model="claude-3-7-sonnet-latest")
  8. # 配置总结节点,设置摘要规则
  9. summarization_node = SummarizationNode(
  10.     token_counter=count_tokens_approximately,  # 计算令牌数的工具
  11.     model=model,                              # 使用的LLM模型
  12.     max_tokens=384,                           # 原始消息最大令牌数
  13.     max_summary_tokens=128,                   # 总结后的最大令牌数
  14.     output_messages_key="llm_input_messages"  # 输出消息的键
  15. )
  16. class State(AgentState):
  17.     # 保存上下文信息,避免每次都重新总结
  18.     context: dict[str, any]
  19. # 创建带总结功能的智能体
  20. # pre_model_hook参数让智能体在调用模型前先进行对话总结
  21. agent = create_react_agent(
  22.     model=model,
  23.     tools=[...],  # 工具列表
  24.     pre_model_hook=summarization_node,
  25.     state_schema=State,
  26.     checkpointer=InMemorySaver()
  27. )
复制代码
这个方案就像有一个助理在对话过程中不断记录要点,当对话太长时,助理会生成一份摘要,确保智能体不会遗漏关键信息。
1.3.2 消息修剪(Trimming)

通过移除部分历史消息,保留最近的关键内容:
python
  1. from langchain_core.messages.utils import trim_messages, count_tokens_approximately
  2. from langgraph.prebuilt import create_react_agent
  3. # 定义修剪函数,每次调用模型前自动执行
  4. def pre_model_hook(state):
  5.     # 修剪消息历史,strategy="last"表示保留最近的消息
  6.     trimmed_messages = trim_messages(
  7.         state["messages"],
  8.         strategy="last",
  9.         token_counter=count_tokens_approximately,
  10.         max_tokens=384,
  11.         start_on="human",    # 从人类消息开始
  12.         end_on=("human", "tool")  # 到人类或工具消息结束
  13.     )
  14.     return {"llm_input_messages": trimmed_messages}
  15. # 创建智能体,应用修剪函数
  16. agent = create_react_agent(
  17.     model=...,
  18.     tools=[...],
  19.     pre_model_hook=pre_model_hook,
  20.     checkpointer=InMemorySaver()
  21. )
复制代码
这种方式类似我们整理聊天记录时,删除较早的无关对话,只保留最近的重要内容,确保智能体处理的是最相关的信息。
1.4 工具与短期记忆的交互

1.4.1 从工具读取记忆

工具可以直接访问智能体的短期记忆,就像从 "即时笔记本" 中查阅当前会话的信息:
python
  1. from typing import Annotated
  2. from langgraph.prebuilt import InjectedState, create_react_agent
  3. class CustomState(AgentState):
  4.     user_id: str  # 定义状态中包含用户ID
  5. def get_user_info(
  6.     state: Annotated[CustomState, InjectedState]
  7. ) -> str:
  8.     """根据用户ID查找信息,state参数直接获取当前状态"""
  9.     user_id = state["user_id"]
  10.     return "用户: 张三" if user_id == "user_123" else "未知用户"
  11. # 创建智能体并指定状态模式
  12. # state_schema参数定义了智能体状态的结构
  13. agent = create_react_agent(
  14.     model="anthropic:claude-3-7-sonnet-latest",
  15.     tools=[get_user_info],
  16.     state_schema=CustomState
  17. )
  18. # 调用智能体并传入状态数据
  19. response = agent.invoke({
  20.     "messages": "查询用户信息",
  21.     "user_id": "user_123"  # 传入用户ID到状态中
  22. })
复制代码
1.4.2 从工具更新记忆

工具可以修改短期记忆,影响后续对话,就像在 "即时笔记本" 中添加新的笔记:
python
  1. from typing import Annotated
  2. from langchain_core.tools import InjectedToolCallId
  3. from langchain_core.runnables import RunnableConfig
  4. from langgraph.types import Command
  5. from langgraph.prebuilt import InjectedState, create_react_agent
  6. class CustomState(AgentState):
  7.     user_name: str  # 定义状态中包含用户名
  8. def update_user_info(
  9.     tool_call_id: Annotated[str, InjectedToolCallId],
  10.     config: RunnableConfig
  11. ) -> Command:
  12.     """更新用户信息并保存到状态"""
  13.     user_id = config["configurable"].get("user_id")
  14.     # 根据用户ID获取用户名
  15.     name = "张三" if user_id == "user_123" else "未知用户"
  16.     # Command对象用于更新状态
  17.     return Command(update={
  18.         "user_name": name,
  19.         "messages": [f"成功查询到用户信息: {name}"]  # 同时更新消息历史
  20.     })
  21. def greet(
  22.     state: Annotated[CustomState, InjectedState]
  23. ) -> str:
  24.     """根据用户信息打招呼,直接读取状态中的用户名"""
  25.     user_name = state["user_name"]
  26.     return f"你好,{user_name}!"
  27. # 创建智能体,定义状态结构
  28. agent = create_react_agent(
  29.     model="anthropic:claude-3-7-sonnet-latest",
  30.     tools=[update_user_info, greet],
  31.     state_schema=CustomState
  32. )
  33. # 调用智能体,传入用户ID
  34. response = agent.invoke(
  35.     {"messages": [{"role": "user", "content": "向用户打招呼"}]},
  36.     config={"configurable": {"user_id": "user_123"}}
  37. )
复制代码
这两段代码展示了工具与短期记忆的双向交互:工具可以读取当前状态来决定行为,也可以更新状态来影响后续对话,形成一个动态的交互循环。
二、长期记忆:跨会话的数据持久化

2.1 长期记忆的核心价值

如果说短期记忆是智能体的 "即时笔记本",那么长期记忆(Long-term memory)就是 "永久档案库"。它用于跨会话存储用户特定数据,如用户偏好、历史交互记录等。例如,聊天机器人需要记住用户的默认语言设置,即使用户第二天再次访问,也不需要重新设置。
长期记忆的关键应用场景包括:
2.2 长期记忆的基础操作

2.2.1 数据读取

从长期记忆中检索数据,就像从档案库中查找特定文件:
python
  1. from langchain_core.runnables import RunnableConfig
  2. from langgraph.config import get_store
  3. from langgraph.prebuilt import create_react_agent
  4. from langgraph.store.memory import InMemoryStore
  5. # 初始化内存存储,相当于创建一个简易的"档案库"
  6. store = InMemoryStore()
  7. # 预先存储用户信息,就像在档案库中归档文件
  8. store.put(
  9.     ("users",),  # 存储路径,类似文件夹结构
  10.     "user_123",  # 键,类似文件名
  11.     {
  12.         "name": "张三",
  13.         "language": "中文",
  14.         "preferences": {"theme": "dark"}
  15.     }
  16. )
  17. def get_user_info(config: RunnableConfig) -> str:
  18.     """从长期记忆获取用户信息"""
  19.     store = get_store()  # 获取当前智能体使用的存储实例
  20.     user_id = config["configurable"].get("user_id")
  21.     # 读取用户信息,就像从档案库中取出文件
  22.     user_info = store.get(("users",), user_id)
  23.     return f"用户信息: {user_info.value}" if user_info else "用户不存在"
  24. # 创建智能体并关联存储
  25. # store参数让智能体具备访问长期记忆的能力
  26. agent = create_react_agent(
  27.     model="anthropic:claude-3-7-sonnet-latest",
  28.     tools=[get_user_info],
  29.     store=store
  30. )
  31. # 调用智能体查询用户信息
  32. response = agent.invoke(
  33.     {"messages": [{"role": "user", "content": "查询我的用户信息"}]},
  34.     config={"configurable": {"user_id": "user_123"}}
  35. )
复制代码
2.2.2 数据写入

向长期记忆保存数据,如同在档案库中新增文件:
python
  1. from typing_extensions import TypedDict
  2. from langgraph.config import get_store
  3. from langgraph.prebuilt import create_react_agent
  4. from langgraph.store.memory import InMemoryStore
  5. # 定义用户信息的数据结构
  6. class UserInfo(TypedDict):
  7.     name: str
  8. def save_user_info(user_info: UserInfo, config: RunnableConfig) -> str:
  9.     """保存用户信息到长期记忆"""
  10.     store = get_store()  # 获取存储实例
  11.     user_id = config["configurable"].get("user_id")
  12.     # 保存用户信息,就像在档案库中新增文件
  13.     store.put(("users",), user_id, user_info)
  14.     return "用户信息保存成功"
  15. # 创建智能体并关联存储
  16. store = InMemoryStore()
  17. agent = create_react_agent(
  18.     model="anthropic:claude-3-7-sonnet-latest",
  19.     tools=[save_user_info],
  20.     store=store
  21. )
  22. # 调用智能体保存用户信息
  23. response = agent.invoke(
  24.     {"messages": [{"role": "user", "content": "我的名字是张三"}],
  25.     config={"configurable": {"user_id": "user_123"}}
  26. )
  27. # 直接访问存储验证数据,就像直接查阅档案库
  28. saved_info = store.get(("users",), "user_123").value
  29. print("保存的用户信息:", saved_info)
复制代码
这两段代码展示了长期记忆的基本操作流程:通过store对象进行数据的读取和写入,实现跨会话的数据持久化。需要注意的是,实际生产环境中应使用数据库等持久化存储,而不是示例中的内存存储。
2.3 高级记忆功能

2.3.1 语义搜索

LangGraph 支持通过语义相似度搜索长期记忆,就像在档案库中通过内容查找相关文件,而不是仅通过文件名:
python
  1. from langchain.embeddings import init_embeddings
  2. from langgraph.store.memory import InMemoryStore
  3. embeddings = init_embeddings("openai:text-embedding-3-small")
  4. store = InMemoryStore(
  5.     index={
  6.         "embed": embeddings,
  7.         "dims": 1536,
  8.     }
  9. )
  10. store.put(("user_123", "memories"), "1", {"text": "I love pizza"})
  11. store.put(("user_123", "memories"), "2", {"text": "I am a plumber"})
  12. items = store.search(
  13.     ("user_123", "memories"), query="I'm hungry", limit=1
  14. )
复制代码
LangMem 提供了更抽象的记忆操作接口,封装了底层存储细节,使记忆管理更加便捷。
三、人在回路:智能体操作的人工审核机制

3.1 人工介入的应用场景

当智能体执行敏感操作时,如预订酒店、进行转账、修改用户资料等,需要引入人工审核环节。LangGraph 的人在回路(Human-in-the-loop, HIL)功能允许智能体在执行过程中无限期暂停,等待人工输入,就像现实中的审批流程一样。
3.2 人工审核的技术实现

3.2.1 直接在工具中添加审核

通过interrupt()函数暂停执行并等待人工审核:
  1. from langgraph.checkpoint.memory import InMemorySaver
  2. from langgraph.types import interrupt
  3. from langgraph.prebuilt import create_react_agent
  4. def book_hotel(hotel_name: str):
  5.     """预订酒店的工具,包含人工审核环节"""
  6.     # 暂停执行并生成审核请求
  7.     # 这个操作就像智能体在执行前先提交一个审批单
  8.     response = interrupt(
  9.         f"尝试预订酒店: {hotel_name},请审核"
  10.     )
  11.     if response["type"] == "accept":
  12.         # 审核通过,执行预订
  13.         return f"成功预订{hotel_name}"
  14.     elif response["type"] == "edit":
  15.         # 审核修改,使用新的酒店名称
  16.         new_hotel = response["args"]["hotel_name"]
  17.         return f"已修改并预订{new_hotel}"
  18.     else:
  19.         raise ValueError(f"未知审核响应: {response['type']}")
  20. # 创建智能体,配置checkpointer用于保存状态
  21. checkpointer = InMemorySaver()
  22. agent = create_react_agent(
  23.     model="anthropic:claude-3-5-sonnet-latest",
  24.     tools=[book_hotel],
  25.     checkpointer=checkpointer
  26. )
  27. # 调用智能体,指定thread_id以便恢复会话
  28. config = {"configurable": {"thread_id": "booking_123"}}
  29. for chunk in agent.stream(
  30.     {"messages": [{"role": "user", "content": "预订麦克基特里克酒店"}]},
  31.     config
  32. ):
  33.     print(chunk)
  34.     # 此时智能体暂停,等待人工审核
  35. # 人工审核后恢复执行
  36. from langgraph.types import Command
  37. for chunk in agent.stream(
  38.     Command(resume={"type": "accept"}),  # 审核通过
  39.     config
  40. ):
  41.     print(chunk)
复制代码
这段代码展示了人工审核的完整流程:智能体在执行敏感操作前先提交审核请求,暂停执行等待人工决策,根据审核结果(接受、修改、拒绝)进行相应处理。interrupt()函数是实现这一机制的关键,它会持久化保存当前状态,允许系统在人工审核后从断点继续执行。
3.2.2 通用审核包装器

无需修改工具代码,通过包装器为任意工具添加审核功能:
python
  1. from typing import Callable
  2. from langchain_core.tools import BaseTool, tool as create_tool
  3. from langchain_core.runnables import RunnableConfig
  4. from langgraph.types import interrupt
  5. from langgraph.prebuilt.interrupt import HumanInterruptConfig
  6. def add_human_in_the_loop(
  7.     tool: Callable | BaseTool,
  8.     *,
  9.     interrupt_config: HumanInterruptConfig = None
  10. ) -> BaseTool:
  11.     """为工具添加人工审核的包装器函数"""
  12.     if not isinstance(tool, BaseTool):
  13.         tool = create_tool(tool)
  14.     if interrupt_config is None:
  15.         # 设置默认审核配置,允许接受、修改、响应
  16.         interrupt_config = {
  17.             "allow_accept": True,
  18.             "allow_edit": True,
  19.             "allow_respond": True
  20.         }
  21.    
  22.     @create_tool(
  23.         tool.name,
  24.         description=tool.description,
  25.         args_schema=tool.args_schema
  26.     )
  27.     def call_tool_with_interrupt(config: RunnableConfig, **tool_input):
  28.         # 生成审核请求,包含工具名称和参数
  29.         request = {
  30.             "action_request": {
  31.                 "action": tool.name,
  32.                 "args": tool_input
  33.             },
  34.             "config": interrupt_config,
  35.             "description": "请审核工具调用"
  36.         }
  37.         # 触发中断,等待人工审核响应
  38.         response = interrupt([request])[0]
  39.         
  40.         if response["type"] == "accept":
  41.             # 审核通过,执行原始工具
  42.             return tool.invoke(tool_input, config)
  43.         elif response["type"] == "edit":
  44.             # 审核修改,使用新参数
  45.             tool_input = response["args"]["args"]
  46.             return tool.invoke(tool_input, config)
  47.         elif response["type"] == "response":
  48.             # 直接返回用户反馈,不执行工具
  49.             return response["args"]
  50.         else:
  51.             raise ValueError(f"不支持的审核响应: {response['type']}")
  52.    
  53.     return call_tool_with_interrupt
  54. # 使用包装器为酒店预订工具添加审核
  55. from langgraph.checkpoint.memory import InMemorySaver
  56. from langgraph.prebuilt import create_react_agent
  57. def book_hotel(hotel_name: str):
  58.     """原始预订酒店工具,无需修改"""
  59.     return f"成功预订{hotel_name}"
  60. # 应用包装器,生成带审核功能的新工具
  61. checked_tool = add_human_in_the_loop(book_hotel)
  62. # 创建智能体
  63. checkpointer = InMemorySaver()
  64. agent = create_react_agent(
  65.     model="anthropic:claude-3-5-sonnet-latest",
  66.     tools=[checked_tool],
  67.     checkpointer=checkpointer
  68. )
  69. # 调用智能体,流程中会自动触发人工审核
  70. config = {"configurable": {"thread_id": "booking_123"}}
  71. for chunk in agent.stream(
  72.     {"messages": [{"role": "user", "content": "预订麦克基特里克酒店"}]},
  73.     config
  74. ):
  75.     print(chunk)
复制代码
这种方式的优势在于解耦了审核逻辑和工具逻辑,无需修改工具代码即可为任意工具添加审核功能,提高了代码的可复用性和可维护性。
四、总结

核心技术点回顾

如果本文对你有帮助,别忘了点赞收藏,关注我,一起探索更高效的开发方式~

原文地址:https://blog.csdn.net/The_Thieves/article/details/148796700




欢迎光临 AI创想 (https://www.llms-ai.com/) Powered by Discuz! X3.4