开启左侧

LangGraph中的事件流

[复制链接]
AI小编 发表于 11 小时前 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题
作者:CSDN博客
LangGraph事件流相关笔记

一、LangGraph流功能基础

    stream和astream的局限性:stream和astream作为流式传输默认实现,仅输出流式传输链最后一步内容。在聊天应用中,能实时反馈模型生成内容,但当AI Agent涉及多模型调用和复杂工作流时,无法返回中间状态信息。事件流的引入:在复杂流程(如RAG对话应用)中,为将最终响应与检索到的源文档等中间值一同返回,需借助事件流概念。
二、LangGraph中的事件流

    事件流的概念:事件流用于记录大模型问答或构建agent过程中的各个阶段状态,如模型输入前、输出过程中、输出完成等,每个状态都构成一个事件。提取事件流的方法:利用astream_events方法提取事件流,该方法仅支持异步。因LangGraph基于LangChain构建,沿用其回调机制。事件流涵盖聊天模型、LLM、链、工具、agent、检索器等相关事件,具体如下:
序号EventEvent TriggerAssociated Method
1Chat model start聊天模型启动时on_chat_model_start
2LLM startLLM启动时on_llm_start
3LLM new tokenLLM或聊天模型生成新token时on_llm_new_token
4LLM endsLLM或聊天模型结束时on_llm_end
5LLM errorsLLM或聊天模型出错时on_llm_error
6Chain start链开始运行时on_chain_start
7Chain end链结束时on_chain_end
8Chain error链出错时on_chain_error
9Tool start工具开始运行时on_tool_start
10Tool end工具结束时on_tool_end
11Tool error工具出错时on_tool_error
12Agent actionagent执行动作时on_agent_action
13Agent finishagent结束时on_agent_finish
14Retriever start检索器启动时on_retriever_start
15Retriever end检索器结束时on_retriever_end
16Retriever error检索器出错时on_retriever_error
17Text任意文本运行时on_text
18Retry重试事件发生时on_retry
三、事件流的应用示例

查看事件流

以ReAct代理框架为例,通过astream_events方法传入问题和指定API版本(如version = “v2”)查看事件流。运行代码可看到从链启动到结束,包括agent决策、模型调用、流式输出等一系列事件。相关代码及输出解释如下:
  1. asyncfor event in graph.astream_events({"messages":["你好,请你介绍一下你自己"]},version="v2"):
  2.     kind = event["event"]print(f"{kind}:{event['name']}")
复制代码
    on_chain_start:LangGraph:表示整个LangGraph图的链开始启动,LangGraph作为起始节点,标志着整个事件流处理流程正式开启,后续操作基于此节点逐步展开。
    on_chain_start:_start_:_start_节点是链启动过程中的关键标识,负责接收和处理初始输入数据,为后续节点执行做准备,是流程起始阶段重要部分。
    on_chain_start:agent:事件流进入agent决策阶段,agent开始根据输入进行决策,决定后续操作方向,如选择合适工具或模型调用。
    on_chain_start:call_model:表明即将调用模型处理输入数据,call_model节点负责协调模型调用过程,包括传递输入参数、接收模型输出等。
    on_chain_start:RunnableSequence:RunnableSequence可能代表可运行序列,此阶段可能按特定顺序执行一系列操作,如多个模型调用或工具操作有序执行,是处理流程中的有序执行环节。
    on_chain_start:StateModifier:StateModifier用于修改或调整系统状态,此事件阶段可能对输入数据、模型参数或其他相关状态信息进行修改,以满足后续处理需求。
    on_chat_model_start:ChatOpenAI:表示开始调用ChatOpenAI聊天模型,该模型将根据输入生成相应回复,是生成最终回答的关键步骤。
    on_chat_model_stream:ChatOpenAI:该事件多次出现,每次ChatOpenAI模型生成新token时触发。模型生成回复时按token增量流式返回,每次返回新token记录此事件,展示模型逐步生成回复内容过程。
    on_chat_model_end:ChatOpenAI:标志着ChatOpenAI模型完成回复生成,输出全部内容后停止,此时模型工作阶段结束。
    on_chain_end:StateModifier:在链即将结束时,StateModifier再次发挥作用,可能对最终状态进行最后调整或处理,确保输出结果符合预期。
    on_chain_end:call_model:表示call_model节点工作结束,其负责的模型调用流程完成,包括接收模型最终输出并传递给后续环节。
    on_chain_end:RunnableSequence:标志着RunnableSequence中所有操作执行完毕,整个有序执行序列结束,为链最终结束做准备。
    on_chain_end:agent:意味着agent完成所有相关决策和操作,agent阶段结束,将最终结果返回给整个图的状态。
    on_chain_end:LangGraph:表示整个LangGraph图的链执行结束,所有操作完成,整个事件流处理流程结束。
    这个过程明确标识了Agent执行的每个阶段。从on_chain_start: LangGraph开始,写入_start_节点,启动call_model节点(on_chain_start: call_model )。然后开始聊天模型调用(on_chat_model_start: ChatOpenAI),按token的增量流式返回(on_chat_model_stream: ChatOpenAI ),直到聊天模型(on_chat_model_end: ChatOpenAI )输出完全部内容后停止。继而将结果写回通道(ChannelWrite<call_model.messages> ),再次回到call_model节点做决策,最终完成整个图的运行流程。 这一系列事件完整呈现了从流程启动到结束,各个关键节点和模型调用的具体执行顺序和状态变化,有助于更深入理解LangGraph中事件流驱动下的Agent运行逻辑。
提取具体事件信息

定义events列表存储事件,通过索引访问具体事件。每个事件包含event(事件类型)、name(事件名称,指向处理方法)和data(事件相关数据)。例如在on_chat_model_stream事件中,可从data中获取流式输出的token。代码示例:
  1. events =[]asyncfor event in graph.astream_events({"messages":["你好,请你介绍一下你自己"]},version="v2"):
  2.     events.append(event)# 查看第0个事件
  3. events[0]# 查看第10个事件
  4. events[10]
复制代码
LangGraph中的事件流-1.jpg

LangGraph中的事件流-2.jpg


过滤和处理事件信息

依据事件名称等字段过滤事件,如仅打印on_chat_model_stream事件信息,可灵活获取和展示所需数据。LangGraph的stream_mode = "messages"模式基于事件流进行格式化提取,也支持自定义数据流。代码如下:
自定义
  1. # 仅打印聊天模型输出的事件asyncfor event in graph.astream_events({"messages":["你好,请你介绍一下你自己"]},version="v2"):
  2.     kind = event["event"]if kind =="on_chat_model_stream":print(event, end="|", flush=True)# 自定义数据流,打印聊天模型输出的内容asyncfor event in graph.astream_events({"messages":["你好,请你介绍一下你自己"]},version="v2"):
  3.     kind = event["event"]if kind =="on_chat_model_stream":print(event["data"]["chunk"].content, end="]", flush=True)
复制代码
"messages"模式代码
  1. first =Trueasyncfor msg, metadata in graph.astream({"messages":["你好,请你介绍一下你自己"]}, stream_mode="all"):if msg.content andnotisinstance(msg, HumanMessage):print(msg.content, end="|", flush=True)ifisinstance(msg, AIMessageChunk):if first:
  2.             gathered = msg
  3.             first =Falseelse:
  4.             gathered = gathered + msg
  5.     if msg.tool_call_chunks:print(gathered.tool_calls)
复制代码
输出内容

你好||我是|一个|智能|助手|,|在|协|助|回答|问题|、|提供|信息|和|执行|各种|任务|。|我|能够|处理|自然|语言|查询|,|并|使用|多个|工具|和|API|来|获取|实时|数据|、|查询|天气|信息|、|执行|数据库|操作|等|。|我的|目标|是|为|您|提供|准确|和|及时|的|帮助|。|如果|您|有|任何|问题|或|需要|帮助|,|请|随时|告诉|我|!
四、总结与展望

    事件流的作用:事件流在构建复杂应用程序时提供强大控制灵活性,能在不同阶段获取对应数据,如大模型生成的token、检索器检索到的内容等。ReAct组件的扩展:ReAct组件除支持工具调用外,还可接入不同规划方式(planning)。目前ReAct流程不具备上下文记忆能力,后续课程将围绕memory和storage技术点展开,拓展ReAct在多轮对话场景中的应用。

原文地址:https://blog.csdn.net/qq_43588095/article/details/147635246
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

发布主题
阅读排行更多+

Powered by Discuz! X3.4© 2001-2013 Discuz Team.( 京ICP备17022993号-3 )