作者:CSDN博客
LangGraph事件流相关笔记
一、LangGraph流功能基础
stream和astream的局限性:stream和astream作为流式传输默认实现,仅输出流式传输链最后一步内容。在聊天应用中,能实时反馈模型生成内容,但当AI Agent涉及多模型调用和复杂工作流时,无法返回中间状态信息。事件流的引入:在复杂流程(如RAG对话应用)中,为将最终响应与检索到的源文档等中间值一同返回,需借助事件流概念。
二、LangGraph中的事件流
事件流的概念:事件流用于记录大模型问答或构建agent过程中的各个阶段状态,如模型输入前、输出过程中、输出完成等,每个状态都构成一个事件。提取事件流的方法:利用astream_events方法提取事件流,该方法仅支持异步。因LangGraph基于LangChain构建,沿用其回调机制。事件流涵盖聊天模型、LLM、链、工具、agent、检索器等相关事件,具体如下:
| 序号 | Event | Event Trigger | Associated Method | | 1 | Chat model start | 聊天模型启动时 | on_chat_model_start | | 2 | LLM start | LLM启动时 | on_llm_start | | 3 | LLM new token | LLM或聊天模型生成新token时 | on_llm_new_token | | 4 | LLM ends | LLM或聊天模型结束时 | on_llm_end | | 5 | LLM errors | LLM或聊天模型出错时 | on_llm_error | | 6 | Chain start | 链开始运行时 | on_chain_start | | 7 | Chain end | 链结束时 | on_chain_end | | 8 | Chain error | 链出错时 | on_chain_error | | 9 | Tool start | 工具开始运行时 | on_tool_start | | 10 | Tool end | 工具结束时 | on_tool_end | | 11 | Tool error | 工具出错时 | on_tool_error | | 12 | Agent action | agent执行动作时 | on_agent_action | | 13 | Agent finish | agent结束时 | on_agent_finish | | 14 | Retriever start | 检索器启动时 | on_retriever_start | | 15 | Retriever end | 检索器结束时 | on_retriever_end | | 16 | Retriever error | 检索器出错时 | on_retriever_error | | 17 | Text | 任意文本运行时 | on_text | | 18 | Retry | 重试事件发生时 | on_retry | 三、事件流的应用示例
查看事件流:
以ReAct代理框架为例,通过astream_events方法传入问题和指定API版本(如version = “v2”)查看事件流。运行代码可看到从链启动到结束,包括agent决策、模型调用、流式输出等一系列事件。相关代码及输出解释如下:- asyncfor event in graph.astream_events({"messages":["你好,请你介绍一下你自己"]},version="v2"):
- kind = event["event"]print(f"{kind}:{event['name']}")
复制代码on_chain_start:LangGraph:表示整个LangGraph图的链开始启动,LangGraph作为起始节点,标志着整个事件流处理流程正式开启,后续操作基于此节点逐步展开。
on_chain_start:_start_:_start_节点是链启动过程中的关键标识,负责接收和处理初始输入数据,为后续节点执行做准备,是流程起始阶段重要部分。
on_chain_start:agent:事件流进入agent决策阶段,agent开始根据输入进行决策,决定后续操作方向,如选择合适工具或模型调用。
on_chain_start:call_model:表明即将调用模型处理输入数据,call_model节点负责协调模型调用过程,包括传递输入参数、接收模型输出等。
on_chain_start:RunnableSequence:RunnableSequence可能代表可运行序列,此阶段可能按特定顺序执行一系列操作,如多个模型调用或工具操作有序执行,是处理流程中的有序执行环节。
on_chain_start:StateModifier:StateModifier用于修改或调整系统状态,此事件阶段可能对输入数据、模型参数或其他相关状态信息进行修改,以满足后续处理需求。
on_chat_model_start:ChatOpenAI:表示开始调用ChatOpenAI聊天模型,该模型将根据输入生成相应回复,是生成最终回答的关键步骤。
on_chat_model_stream:ChatOpenAI:该事件多次出现,每次ChatOpenAI模型生成新token时触发。模型生成回复时按token增量流式返回,每次返回新token记录此事件,展示模型逐步生成回复内容过程。
on_chat_model_end:ChatOpenAI:标志着ChatOpenAI模型完成回复生成,输出全部内容后停止,此时模型工作阶段结束。
on_chain_end:StateModifier:在链即将结束时,StateModifier再次发挥作用,可能对最终状态进行最后调整或处理,确保输出结果符合预期。
on_chain_end:call_model:表示call_model节点工作结束,其负责的模型调用流程完成,包括接收模型最终输出并传递给后续环节。
on_chain_end:RunnableSequence:标志着RunnableSequence中所有操作执行完毕,整个有序执行序列结束,为链最终结束做准备。
on_chain_end:agent:意味着agent完成所有相关决策和操作,agent阶段结束,将最终结果返回给整个图的状态。
on_chain_end:LangGraph:表示整个LangGraph图的链执行结束,所有操作完成,整个事件流处理流程结束。
这个过程明确标识了Agent执行的每个阶段。从on_chain_start: LangGraph开始,写入_start_节点,启动call_model节点(on_chain_start: call_model )。然后开始聊天模型调用(on_chat_model_start: ChatOpenAI),按token的增量流式返回(on_chat_model_stream: ChatOpenAI ),直到聊天模型(on_chat_model_end: ChatOpenAI )输出完全部内容后停止。继而将结果写回通道(ChannelWrite<call_model.messages> ),再次回到call_model节点做决策,最终完成整个图的运行流程。 这一系列事件完整呈现了从流程启动到结束,各个关键节点和模型调用的具体执行顺序和状态变化,有助于更深入理解LangGraph中事件流驱动下的Agent运行逻辑。
提取具体事件信息:
定义events列表存储事件,通过索引访问具体事件。每个事件包含event(事件类型)、name(事件名称,指向处理方法)和data(事件相关数据)。例如在on_chat_model_stream事件中,可从data中获取流式输出的token。代码示例:- events =[]asyncfor event in graph.astream_events({"messages":["你好,请你介绍一下你自己"]},version="v2"):
- events.append(event)# 查看第0个事件
- events[0]# 查看第10个事件
- events[10]
复制代码
过滤和处理事件信息:
依据事件名称等字段过滤事件,如仅打印on_chat_model_stream事件信息,可灵活获取和展示所需数据。LangGraph的stream_mode = "messages"模式基于事件流进行格式化提取,也支持自定义数据流。代码如下:
自定义
- # 仅打印聊天模型输出的事件asyncfor event in graph.astream_events({"messages":["你好,请你介绍一下你自己"]},version="v2"):
- kind = event["event"]if kind =="on_chat_model_stream":print(event, end="|", flush=True)# 自定义数据流,打印聊天模型输出的内容asyncfor event in graph.astream_events({"messages":["你好,请你介绍一下你自己"]},version="v2"):
- kind = event["event"]if kind =="on_chat_model_stream":print(event["data"]["chunk"].content, end="]", flush=True)
复制代码 "messages"模式代码
- first =Trueasyncfor msg, metadata in graph.astream({"messages":["你好,请你介绍一下你自己"]}, stream_mode="all"):if msg.content andnotisinstance(msg, HumanMessage):print(msg.content, end="|", flush=True)ifisinstance(msg, AIMessageChunk):if first:
- gathered = msg
- first =Falseelse:
- gathered = gathered + msg
- if msg.tool_call_chunks:print(gathered.tool_calls)
复制代码 输出内容
你好||我是|一个|智能|助手|,|在|协|助|回答|问题|、|提供|信息|和|执行|各种|任务|。|我|能够|处理|自然|语言|查询|,|并|使用|多个|工具|和|API|来|获取|实时|数据|、|查询|天气|信息|、|执行|数据库|操作|等|。|我的|目标|是|为|您|提供|准确|和|及时|的|帮助|。|如果|您|有|任何|问题|或|需要|帮助|,|请|随时|告诉|我|!
四、总结与展望
事件流的作用:事件流在构建复杂应用程序时提供强大控制灵活性,能在不同阶段获取对应数据,如大模型生成的token、检索器检索到的内容等。ReAct组件的扩展:ReAct组件除支持工具调用外,还可接入不同规划方式(planning)。目前ReAct流程不具备上下文记忆能力,后续课程将围绕memory和storage技术点展开,拓展ReAct在多轮对话场景中的应用。
原文地址:https://blog.csdn.net/qq_43588095/article/details/147635246 |