作者:佑瞻
在开发智能应用时,我们常常面临这样的场景:需要实时获取流程的中间结果,或者逐步展示大语言模型的生成过程。这时候,LangGraph 的流式输出功能就成了关键利器。今天,我们就来深入聊聊 LangGraph 的流式 API,从基础用法到高级特性,帮助你在项目中实现更流畅的实时交互体验。
一、流式 API 的核心概念与基本用法
1.1 流式处理的优势
相比传统的一次性返回结果,流式输出有两大核心优势:
实时反馈:用户能立即看到部分结果,提升交互体验内存优化:无需一次性加载全部数据,适合处理大量输出
1.2 基本用法示例
LangGraph 提供了.stream()(同步)和.astream()(异步)两种流式方法,以迭代器形式 yield 输出结果。来看一个基础示例:
python- # 同步流式处理
- for chunk in graph.stream(inputs, stream_mode="updates"):
- print(chunk)
- # 异步流式处理(需配合async/await使用)
- async for chunk in graph.astream(inputs, stream_mode="updates"):
- print(chunk)
复制代码 1.3 支持的流模式
LangGraph 支持五种核心流模式,每种模式解决不同的场景需求:
| 模式 | 描述 | | values | 流式传输每一步后的完整状态值 | | updates | 流式传输每一步的状态更新内容,同一步的多次更新会分开流式传输 | | custom | 从节点内部流式传输自定义数据 | | messages | 流式传输 LLM 调用的令牌及元数据,适用于实时展示模型生成过程 | | debug | 流式传输执行过程中的详细调试信息,包含节点名称和完整状态 | 二、多模式流式处理与状态流
2.1 同时流式传输多种模式
通过传递模式列表,我们可以同时获取多种类型的流数据:
python- for mode, chunk in graph.stream(inputs, stream_mode=["updates", "custom"]):
- print(f"模式: {mode}, 内容: {chunk}")
复制代码 输出会是(mode, chunk)元组,让我们能清晰区分不同模式的数据。
2.2 状态流的两种形态
updates 模式:仅流式传输节点返回的状态更新,包含节点名称和更新内容values 模式:流式传输每一步后的完整状态,适合跟踪整个状态的变化
python- from langgraph.graph import StateGraph, START, END
- class State(TypedDict):
- topic: str
- joke: str
- def refine_topic(state: State):
- return {"topic": state["topic"] + " and cats"}
- def generate_joke(state: State):
- return {"joke": f"This is a joke about {state['topic']}"}
- graph = (
- StateGraph(State)
- .add_node(refine_topic)
- .add_node(generate_joke)
- .add_edge(START, "refine_topic")
- .add_edge("refine_topic", "generate_joke")
- .add_edge("generate_joke", END)
- .compile()
- )
- # 仅流式传输更新内容
- print("--- updates模式 ---")
- for chunk in graph.stream({"topic": "ice cream"}, stream_mode="updates"):
- print(chunk)
- # 流式传输完整状态
- print("\n--- values模式 ---")
- for chunk in graph.stream({"topic": "ice cream"}, stream_mode="values"):
- print(chunk)
复制代码 三、子图流式处理与调试模式
3.1 包含子图输出的流式处理
通过设置subgraphs=True,我们可以在父图的流中包含子图的输出:
python- from langgraph.graph import START, StateGraph
- # 定义子图
- class SubgraphState(TypedDict):
- foo: str
- bar: str
- def subgraph_node_1(state: SubgraphState):
- return {"bar": "bar"}
- def subgraph_node_2(state: SubgraphState):
- return {"foo": state["foo"] + state["bar"]}
- subgraph_builder = StateGraph(SubgraphState)
- subgraph_builder.add_node(subgraph_node_1)
- subgraph_builder.add_node(subgraph_node_2)
- subgraph_builder.add_edge(START, "subgraph_node_1")
- subgraph_builder.add_edge("subgraph_node_1", "subgraph_node_2")
- subgraph = subgraph_builder.compile()
- # 定义父图
- class ParentState(TypedDict):
- foo: str
- def node_1(state: ParentState):
- return {"foo": "hi! " + state["foo"]}
- builder = StateGraph(ParentState)
- builder.add_node("node_1", node_1)
- builder.add_node("node_2", subgraph)
- builder.add_edge(START, "node_1")
- builder.add_edge("node_1", "node_2")
- graph = builder.compile()
- # 流式处理父图和子图的更新
- for chunk in graph.stream({"foo": "foo"}, stream_mode="updates", subgraphs=True):
- print(chunk)
复制代码 输出会包含命名空间元组,明确标识数据来自哪个图或子图,例如:
plaintext- ((), {'node_1': {'foo': 'hi! foo'}})
- (('node_2:dfddc4ba-c3c5-6887-5012-a243b5b377c2',), {'subgraph_node_1': {'bar': 'bar'}})
复制代码 3.2 调试模式:获取最详细的执行信息
debug模式会流式传输执行过程中的所有可用信息,是排查问题的利器:
python- for chunk in graph.stream({"topic": "ice cream"}, stream_mode="debug"):
- print(chunk)
复制代码 四、LLM 令牌流与精确过滤
4.1 实时流式传输 LLM 生成内容
通过messages模式,我们可以逐令牌地流式传输 LLM 输出,实现类似 ChatGPT 的实时打字效果:
python- from langchain.chat_models import init_chat_model
- from langgraph.graph import StateGraph, START
- @dataclass
- class MyState:
- topic: str
- joke: str = ""
- llm = init_chat_model(model="openai:gpt-4o-mini")
- def call_model(state: MyState):
- llm_response = llm.invoke(
- [{"role": "user", "content": f"Generate a joke about {state.topic}"}]
- )
- return {"joke": llm_response.content}
- graph = (
- StateGraph(MyState)
- .add_node(call_model)
- .add_edge(START, "call_model")
- .compile()
- )
- # 流式传输LLM令牌
- for message_chunk, metadata in graph.stream({"topic": "ice cream"}, stream_mode="messages"):
- if message_chunk.content:
- print(message_chunk.content, end="|", flush=True)
复制代码 4.2 按标签过滤 LLM 调用
通过为 LLM 添加标签,我们可以精确过滤需要流式传输的令牌:
python- from langchain.chat_models import init_chat_model
- # 初始化带标签的LLM
- llm_1 = init_chat_model(model="openai:gpt-4o-mini", tags=['joke'])
- llm_2 = init_chat_model(model="openai:gpt-4o-mini", tags=['poem'])
- # 定义使用这些LLM的图
- graph = ...
- # 仅流式传输标签为'joke'的LLM输出
- async for msg, metadata in graph.astream({"topic": "cats"}, stream_mode="messages"):
- if metadata["tags"] == ["joke"]:
- print(msg.content, end="|", flush=True)
复制代码 4.3 按节点过滤 LLM 输出
通过元数据中的langgraph_node字段,我们可以仅流式传输特定节点的 LLM 输出:
python- for msg, metadata in graph.stream(inputs, stream_mode="messages"):
- if msg.content and metadata["langgraph_node"] == "some_node_name":
- # 处理来自指定节点的LLM输出
- ...
复制代码 五、自定义数据流与任意 LLM 集成
5.1 从节点内部发送自定义数据
通过get_stream_writer(),我们可以在节点内部主动发送自定义数据:
python- from langgraph.config import get_stream_writer
- from langgraph.graph import StateGraph, START
- class State(TypedDict):
- query: str
- answer: str
- def node(state: State):
- writer = get_stream_writer()
- writer({"custom_key": "Generating custom data inside node"})
- return {"answer": "some data"}
- graph = (
- StateGraph(State)
- .add_node(node)
- .add_edge(START, "node")
- .compile()
- )
- inputs = {"query": "example"}
- # 流式传输自定义数据
- for chunk in graph.stream(inputs, stream_mode="custom"):
- print(chunk)
复制代码 5.2 集成任意 LLM 的流式输出
即使 LLM 没有集成 LangChain,我们也可以通过custom模式实现流式传输:
python- from langgraph.config import get_stream_writer
- def call_arbitrary_model(state):
- writer = get_stream_writer()
- # 假设存在自定义流式客户端
- for chunk in your_custom_streaming_client(state["topic"]):
- writer({"custom_llm_chunk": chunk})
- return {"result": "completed"}
- graph = (
- StateGraph(State)
- .add_node(call_arbitrary_model)
- .compile()
- )
- # 流式传输自定义LLM的输出
- for chunk in graph.stream({"topic": "cats"}, stream_mode="custom"):
- print(chunk)
复制代码 5.3 禁用特定模型的流式传输
当混合使用支持和不支持流式的模型时,需要显式禁用流式:
python- from langchain.chat_models import init_chat_model
- # 禁用流式传输
- model = init_chat_model(
- "anthropic:claude-3-7-sonnet-latest",
- disable_streaming=True
- )
复制代码 六、Python <3.11 下的异步流式处理
6.1 手动传递 RunnableConfig
在 Python <3.11 中,异步 LLM 调用需要显式传递 RunnableConfig:
python- from langchain.chat_models import init_chat_model
- from langgraph.graph import StateGraph, START
- llm = init_chat_model("openai:gpt-4o-mini")
- async def async_node(state):
- # 显式传递RunnableConfig
- llm_response = await llm.ainvoke(
- [{"role": "user", "content": "Hello"}],
- config=RunnableConfig()
- )
- return {"response": llm_response.content}
- # 定义图并执行
- ...
复制代码 6.2 手动传递流写入器
在异步节点中无法使用get_stream_writer(),需要手动传递 writer 参数:
python- async def async_node_with_writer(state, writer):
- await writer({"custom_data": "async custom data"})
- return {"result": "done"}
- # 定义图时传递writer
- graph = StateGraph(...)
- graph.invoke(..., config={"writer": your_writer})
复制代码 七、实践建议与总结
通过 LangGraph 的流式输出功能,我们能够:
为用户提供更流畅的实时交互体验优化内存使用,处理大规模输出精确监控和调试复杂流程实现 LLM 生成内容的实时展示
如果本文对你有帮助,别忘了点赞收藏,关注我,一起探索更高效的开发方式~
原文地址:https://blog.csdn.net/The_Thieves/article/details/148809652 |