开启左侧

LangGraph 流式输出全攻略:从基础用法到高级特性的深度解析

[复制链接]
创想小编 发表于 3 小时前 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题
作者:佑瞻
在开发智能应用时,我们常常面临这样的场景:需要实时获取流程的中间结果,或者逐步展示大语言模型的生成过程。这时候,LangGraph 的流式输出功能就成了关键利器。今天,我们就来深入聊聊 LangGraph 的流式 API,从基础用法到高级特性,帮助你在项目中实现更流畅的实时交互体验。
一、流式 API 的核心概念与基本用法

1.1 流式处理的优势

相比传统的一次性返回结果,流式输出有两大核心优势:
    实时反馈:用户能立即看到部分结果,提升交互体验内存优化:无需一次性加载全部数据,适合处理大量输出
1.2 基本用法示例

LangGraph 提供了.stream()(同步)和.astream()(异步)两种流式方法,以迭代器形式 yield 输出结果。来看一个基础示例:
python
  1. # 同步流式处理
  2. for chunk in graph.stream(inputs, stream_mode="updates"):
  3.     print(chunk)
  4. # 异步流式处理(需配合async/await使用)
  5. async for chunk in graph.astream(inputs, stream_mode="updates"):
  6.     print(chunk)
复制代码
1.3 支持的流模式

LangGraph 支持五种核心流模式,每种模式解决不同的场景需求:
模式描述
values流式传输每一步后的完整状态值
updates流式传输每一步的状态更新内容,同一步的多次更新会分开流式传输
custom从节点内部流式传输自定义数据
messages流式传输 LLM 调用的令牌及元数据,适用于实时展示模型生成过程
debug流式传输执行过程中的详细调试信息,包含节点名称和完整状态
二、多模式流式处理与状态流

2.1 同时流式传输多种模式

通过传递模式列表,我们可以同时获取多种类型的流数据:
python
  1. for mode, chunk in graph.stream(inputs, stream_mode=["updates", "custom"]):
  2.     print(f"模式: {mode}, 内容: {chunk}")
复制代码
输出会是(mode, chunk)元组,让我们能清晰区分不同模式的数据。
2.2 状态流的两种形态

    updates 模式:仅流式传输节点返回的状态更新,包含节点名称和更新内容values 模式:流式传输每一步后的完整状态,适合跟踪整个状态的变化
python
  1. from langgraph.graph import StateGraph, START, END
  2. class State(TypedDict):
  3.     topic: str
  4.     joke: str
  5. def refine_topic(state: State):
  6.     return {"topic": state["topic"] + " and cats"}
  7. def generate_joke(state: State):
  8.     return {"joke": f"This is a joke about {state['topic']}"}
  9. graph = (
  10.     StateGraph(State)
  11.     .add_node(refine_topic)
  12.     .add_node(generate_joke)
  13.     .add_edge(START, "refine_topic")
  14.     .add_edge("refine_topic", "generate_joke")
  15.     .add_edge("generate_joke", END)
  16.     .compile()
  17. )
  18. # 仅流式传输更新内容
  19. print("--- updates模式 ---")
  20. for chunk in graph.stream({"topic": "ice cream"}, stream_mode="updates"):
  21.     print(chunk)
  22. # 流式传输完整状态
  23. print("\n--- values模式 ---")
  24. for chunk in graph.stream({"topic": "ice cream"}, stream_mode="values"):
  25.     print(chunk)
复制代码
三、子图流式处理与调试模式

3.1 包含子图输出的流式处理

通过设置subgraphs=True,我们可以在父图的流中包含子图的输出:
python
  1. from langgraph.graph import START, StateGraph
  2. # 定义子图
  3. class SubgraphState(TypedDict):
  4.     foo: str
  5.     bar: str
  6. def subgraph_node_1(state: SubgraphState):
  7.     return {"bar": "bar"}
  8. def subgraph_node_2(state: SubgraphState):
  9.     return {"foo": state["foo"] + state["bar"]}
  10. subgraph_builder = StateGraph(SubgraphState)
  11. subgraph_builder.add_node(subgraph_node_1)
  12. subgraph_builder.add_node(subgraph_node_2)
  13. subgraph_builder.add_edge(START, "subgraph_node_1")
  14. subgraph_builder.add_edge("subgraph_node_1", "subgraph_node_2")
  15. subgraph = subgraph_builder.compile()
  16. # 定义父图
  17. class ParentState(TypedDict):
  18.     foo: str
  19. def node_1(state: ParentState):
  20.     return {"foo": "hi! " + state["foo"]}
  21. builder = StateGraph(ParentState)
  22. builder.add_node("node_1", node_1)
  23. builder.add_node("node_2", subgraph)
  24. builder.add_edge(START, "node_1")
  25. builder.add_edge("node_1", "node_2")
  26. graph = builder.compile()
  27. # 流式处理父图和子图的更新
  28. for chunk in graph.stream({"foo": "foo"}, stream_mode="updates", subgraphs=True):
  29.     print(chunk)
复制代码
输出会包含命名空间元组,明确标识数据来自哪个图或子图,例如:
plaintext
  1. ((), {'node_1': {'foo': 'hi! foo'}})
  2. (('node_2:dfddc4ba-c3c5-6887-5012-a243b5b377c2',), {'subgraph_node_1': {'bar': 'bar'}})
复制代码
3.2 调试模式:获取最详细的执行信息

debug模式会流式传输执行过程中的所有可用信息,是排查问题的利器:
python
  1. for chunk in graph.stream({"topic": "ice cream"}, stream_mode="debug"):
  2.     print(chunk)
复制代码
四、LLM 令牌流与精确过滤

4.1 实时流式传输 LLM 生成内容

通过messages模式,我们可以逐令牌地流式传输 LLM 输出,实现类似 ChatGPT 的实时打字效果:
python
  1. from langchain.chat_models import init_chat_model
  2. from langgraph.graph import StateGraph, START
  3. @dataclass
  4. class MyState:
  5.     topic: str
  6.     joke: str = ""
  7. llm = init_chat_model(model="openai:gpt-4o-mini")
  8. def call_model(state: MyState):
  9.     llm_response = llm.invoke(
  10.         [{"role": "user", "content": f"Generate a joke about {state.topic}"}]
  11.     )
  12.     return {"joke": llm_response.content}
  13. graph = (
  14.     StateGraph(MyState)
  15.     .add_node(call_model)
  16.     .add_edge(START, "call_model")
  17.     .compile()
  18. )
  19. # 流式传输LLM令牌
  20. for message_chunk, metadata in graph.stream({"topic": "ice cream"}, stream_mode="messages"):
  21.     if message_chunk.content:
  22.         print(message_chunk.content, end="|", flush=True)
复制代码
4.2 按标签过滤 LLM 调用

通过为 LLM 添加标签,我们可以精确过滤需要流式传输的令牌:
python
  1. from langchain.chat_models import init_chat_model
  2. # 初始化带标签的LLM
  3. llm_1 = init_chat_model(model="openai:gpt-4o-mini", tags=['joke'])
  4. llm_2 = init_chat_model(model="openai:gpt-4o-mini", tags=['poem'])
  5. # 定义使用这些LLM的图
  6. graph = ...
  7. # 仅流式传输标签为'joke'的LLM输出
  8. async for msg, metadata in graph.astream({"topic": "cats"}, stream_mode="messages"):
  9.     if metadata["tags"] == ["joke"]:
  10.         print(msg.content, end="|", flush=True)
复制代码
4.3 按节点过滤 LLM 输出

通过元数据中的langgraph_node字段,我们可以仅流式传输特定节点的 LLM 输出:
python
  1. for msg, metadata in graph.stream(inputs, stream_mode="messages"):
  2.     if msg.content and metadata["langgraph_node"] == "some_node_name":
  3.         # 处理来自指定节点的LLM输出
  4.         ...
复制代码
五、自定义数据流与任意 LLM 集成

5.1 从节点内部发送自定义数据

通过get_stream_writer(),我们可以在节点内部主动发送自定义数据:
python
  1. from langgraph.config import get_stream_writer
  2. from langgraph.graph import StateGraph, START
  3. class State(TypedDict):
  4.     query: str
  5.     answer: str
  6. def node(state: State):
  7.     writer = get_stream_writer()
  8.     writer({"custom_key": "Generating custom data inside node"})
  9.     return {"answer": "some data"}
  10. graph = (
  11.     StateGraph(State)
  12.     .add_node(node)
  13.     .add_edge(START, "node")
  14.     .compile()
  15. )
  16. inputs = {"query": "example"}
  17. # 流式传输自定义数据
  18. for chunk in graph.stream(inputs, stream_mode="custom"):
  19.     print(chunk)
复制代码
5.2 集成任意 LLM 的流式输出

即使 LLM 没有集成 LangChain,我们也可以通过custom模式实现流式传输:
python
  1. from langgraph.config import get_stream_writer
  2. def call_arbitrary_model(state):
  3.     writer = get_stream_writer()
  4.     # 假设存在自定义流式客户端
  5.     for chunk in your_custom_streaming_client(state["topic"]):
  6.         writer({"custom_llm_chunk": chunk})
  7.     return {"result": "completed"}
  8. graph = (
  9.     StateGraph(State)
  10.     .add_node(call_arbitrary_model)
  11.     .compile()
  12. )
  13. # 流式传输自定义LLM的输出
  14. for chunk in graph.stream({"topic": "cats"}, stream_mode="custom"):
  15.     print(chunk)
复制代码
5.3 禁用特定模型的流式传输

当混合使用支持和不支持流式的模型时,需要显式禁用流式:
python
  1. from langchain.chat_models import init_chat_model
  2. # 禁用流式传输
  3. model = init_chat_model(
  4.     "anthropic:claude-3-7-sonnet-latest",
  5.     disable_streaming=True
  6. )
复制代码
六、Python <3.11 下的异步流式处理

6.1 手动传递 RunnableConfig

在 Python <3.11 中,异步 LLM 调用需要显式传递 RunnableConfig:
python
  1. from langchain.chat_models import init_chat_model
  2. from langgraph.graph import StateGraph, START
  3. llm = init_chat_model("openai:gpt-4o-mini")
  4. async def async_node(state):
  5.     # 显式传递RunnableConfig
  6.     llm_response = await llm.ainvoke(
  7.         [{"role": "user", "content": "Hello"}],
  8.         config=RunnableConfig()
  9.     )
  10.     return {"response": llm_response.content}
  11. # 定义图并执行
  12. ...
复制代码
6.2 手动传递流写入器

在异步节点中无法使用get_stream_writer(),需要手动传递 writer 参数:
python
  1. async def async_node_with_writer(state, writer):
  2.     await writer({"custom_data": "async custom data"})
  3.     return {"result": "done"}
  4. # 定义图时传递writer
  5. graph = StateGraph(...)
  6. graph.invoke(..., config={"writer": your_writer})
复制代码
七、实践建议与总结

通过 LangGraph 的流式输出功能,我们能够:
    为用户提供更流畅的实时交互体验优化内存使用,处理大规模输出精确监控和调试复杂流程实现 LLM 生成内容的实时展示
如果本文对你有帮助,别忘了点赞收藏,关注我,一起探索更高效的开发方式~

原文地址:https://blog.csdn.net/The_Thieves/article/details/148809652
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

发布主题
阅读排行更多+

Powered by Discuz! X3.4© 2001-2013 Discuz Team.( 京ICP备17022993号-3 )