AI创想
标题:
LangGraph 流式输出全攻略:从基础用法到高级特性的深度解析
[打印本页]
作者:
创想小编
时间:
5 小时前
标题:
LangGraph 流式输出全攻略:从基础用法到高级特性的深度解析
作者:佑瞻
在开发智能应用时,我们常常面临这样的场景:需要实时获取流程的中间结果,或者逐步展示大语言模型的生成过程。这时候,LangGraph 的流式输出功能就成了关键利器。今天,我们就来深入聊聊 LangGraph 的流式 API,从基础用法到高级特性,帮助你在项目中实现更流畅的实时交互体验。
一、流式 API 的核心概念与基本用法
1.1 流式处理的优势
相比传统的一次性返回结果,流式输出有两大核心优势:
实时反馈
:用户能立即看到部分结果,提升交互体验
内存优化
:无需一次性加载全部数据,适合处理大量输出
1.2 基本用法示例
LangGraph 提供了.stream()(同步)和.astream()(异步)两种流式方法,以迭代器形式 yield 输出结果。来看一个基础示例:
python
# 同步流式处理
for chunk in graph.stream(inputs, stream_mode="updates"):
print(chunk)
# 异步流式处理(需配合async/await使用)
async for chunk in graph.astream(inputs, stream_mode="updates"):
print(chunk)
复制代码
1.3 支持的流模式
LangGraph 支持五种核心流模式,每种模式解决不同的场景需求:
模式
描述
values
流式传输每一步后的完整状态值
updates
流式传输每一步的状态更新内容,同一步的多次更新会分开流式传输
custom
从节点内部流式传输自定义数据
messages
流式传输 LLM 调用的令牌及元数据,适用于实时展示模型生成过程
debug
流式传输执行过程中的详细调试信息,包含节点名称和完整状态
二、多模式流式处理与状态流
2.1 同时流式传输多种模式
通过传递模式列表,我们可以同时获取多种类型的流数据:
python
for mode, chunk in graph.stream(inputs, stream_mode=["updates", "custom"]):
print(f"模式: {mode}, 内容: {chunk}")
复制代码
输出会是(mode, chunk)元组,让我们能清晰区分不同模式的数据。
2.2 状态流的两种形态
updates 模式
:仅流式传输节点返回的状态更新,包含节点名称和更新内容
values 模式
:流式传输每一步后的完整状态,适合跟踪整个状态的变化
python
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
topic: str
joke: str
def refine_topic(state: State):
return {"topic": state["topic"] + " and cats"}
def generate_joke(state: State):
return {"joke": f"This is a joke about {state['topic']}"}
graph = (
StateGraph(State)
.add_node(refine_topic)
.add_node(generate_joke)
.add_edge(START, "refine_topic")
.add_edge("refine_topic", "generate_joke")
.add_edge("generate_joke", END)
.compile()
)
# 仅流式传输更新内容
print("--- updates模式 ---")
for chunk in graph.stream({"topic": "ice cream"}, stream_mode="updates"):
print(chunk)
# 流式传输完整状态
print("\n--- values模式 ---")
for chunk in graph.stream({"topic": "ice cream"}, stream_mode="values"):
print(chunk)
复制代码
三、子图流式处理与调试模式
3.1 包含子图输出的流式处理
通过设置subgraphs=True,我们可以在父图的流中包含子图的输出:
python
from langgraph.graph import START, StateGraph
# 定义子图
class SubgraphState(TypedDict):
foo: str
bar: str
def subgraph_node_1(state: SubgraphState):
return {"bar": "bar"}
def subgraph_node_2(state: SubgraphState):
return {"foo": state["foo"] + state["bar"]}
subgraph_builder = StateGraph(SubgraphState)
subgraph_builder.add_node(subgraph_node_1)
subgraph_builder.add_node(subgraph_node_2)
subgraph_builder.add_edge(START, "subgraph_node_1")
subgraph_builder.add_edge("subgraph_node_1", "subgraph_node_2")
subgraph = subgraph_builder.compile()
# 定义父图
class ParentState(TypedDict):
foo: str
def node_1(state: ParentState):
return {"foo": "hi! " + state["foo"]}
builder = StateGraph(ParentState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", subgraph)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
graph = builder.compile()
# 流式处理父图和子图的更新
for chunk in graph.stream({"foo": "foo"}, stream_mode="updates", subgraphs=True):
print(chunk)
复制代码
输出会包含命名空间元组,明确标识数据来自哪个图或子图,例如:
plaintext
((), {'node_1': {'foo': 'hi! foo'}})
(('node_2:dfddc4ba-c3c5-6887-5012-a243b5b377c2',), {'subgraph_node_1': {'bar': 'bar'}})
复制代码
3.2 调试模式:获取最详细的执行信息
debug模式会流式传输执行过程中的所有可用信息,是排查问题的利器:
python
for chunk in graph.stream({"topic": "ice cream"}, stream_mode="debug"):
print(chunk)
复制代码
四、LLM 令牌流与精确过滤
4.1 实时流式传输 LLM 生成内容
通过messages模式,我们可以逐令牌地流式传输 LLM 输出,实现类似 ChatGPT 的实时打字效果:
python
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, START
@dataclass
class MyState:
topic: str
joke: str = ""
llm = init_chat_model(model="openai:gpt-4o-mini")
def call_model(state: MyState):
llm_response = llm.invoke(
[{"role": "user", "content": f"Generate a joke about {state.topic}"}]
)
return {"joke": llm_response.content}
graph = (
StateGraph(MyState)
.add_node(call_model)
.add_edge(START, "call_model")
.compile()
)
# 流式传输LLM令牌
for message_chunk, metadata in graph.stream({"topic": "ice cream"}, stream_mode="messages"):
if message_chunk.content:
print(message_chunk.content, end="|", flush=True)
复制代码
4.2 按标签过滤 LLM 调用
通过为 LLM 添加标签,我们可以精确过滤需要流式传输的令牌:
python
from langchain.chat_models import init_chat_model
# 初始化带标签的LLM
llm_1 = init_chat_model(model="openai:gpt-4o-mini", tags=['joke'])
llm_2 = init_chat_model(model="openai:gpt-4o-mini", tags=['poem'])
# 定义使用这些LLM的图
graph = ...
# 仅流式传输标签为'joke'的LLM输出
async for msg, metadata in graph.astream({"topic": "cats"}, stream_mode="messages"):
if metadata["tags"] == ["joke"]:
print(msg.content, end="|", flush=True)
复制代码
4.3 按节点过滤 LLM 输出
通过元数据中的langgraph_node字段,我们可以仅流式传输特定节点的 LLM 输出:
python
for msg, metadata in graph.stream(inputs, stream_mode="messages"):
if msg.content and metadata["langgraph_node"] == "some_node_name":
# 处理来自指定节点的LLM输出
...
复制代码
五、自定义数据流与任意 LLM 集成
5.1 从节点内部发送自定义数据
通过get_stream_writer(),我们可以在节点内部主动发送自定义数据:
python
from langgraph.config import get_stream_writer
from langgraph.graph import StateGraph, START
class State(TypedDict):
query: str
answer: str
def node(state: State):
writer = get_stream_writer()
writer({"custom_key": "Generating custom data inside node"})
return {"answer": "some data"}
graph = (
StateGraph(State)
.add_node(node)
.add_edge(START, "node")
.compile()
)
inputs = {"query": "example"}
# 流式传输自定义数据
for chunk in graph.stream(inputs, stream_mode="custom"):
print(chunk)
复制代码
5.2 集成任意 LLM 的流式输出
即使 LLM 没有集成 LangChain,我们也可以通过custom模式实现流式传输:
python
from langgraph.config import get_stream_writer
def call_arbitrary_model(state):
writer = get_stream_writer()
# 假设存在自定义流式客户端
for chunk in your_custom_streaming_client(state["topic"]):
writer({"custom_llm_chunk": chunk})
return {"result": "completed"}
graph = (
StateGraph(State)
.add_node(call_arbitrary_model)
.compile()
)
# 流式传输自定义LLM的输出
for chunk in graph.stream({"topic": "cats"}, stream_mode="custom"):
print(chunk)
复制代码
5.3 禁用特定模型的流式传输
当混合使用支持和不支持流式的模型时,需要显式禁用流式:
python
from langchain.chat_models import init_chat_model
# 禁用流式传输
model = init_chat_model(
"anthropic:claude-3-7-sonnet-latest",
disable_streaming=True
)
复制代码
六、Python <3.11 下的异步流式处理
6.1 手动传递 RunnableConfig
在 Python <3.11 中,异步 LLM 调用需要显式传递 RunnableConfig:
python
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, START
llm = init_chat_model("openai:gpt-4o-mini")
async def async_node(state):
# 显式传递RunnableConfig
llm_response = await llm.ainvoke(
[{"role": "user", "content": "Hello"}],
config=RunnableConfig()
)
return {"response": llm_response.content}
# 定义图并执行
...
复制代码
6.2 手动传递流写入器
在异步节点中无法使用get_stream_writer(),需要手动传递 writer 参数:
python
async def async_node_with_writer(state, writer):
await writer({"custom_data": "async custom data"})
return {"result": "done"}
# 定义图时传递writer
graph = StateGraph(...)
graph.invoke(..., config={"writer": your_writer})
复制代码
七、实践建议与总结
通过 LangGraph 的流式输出功能,我们能够:
为用户提供更流畅的实时交互体验优化内存使用,处理大规模输出精确监控和调试复杂流程实现 LLM 生成内容的实时展示
如果本文对你有帮助,别忘了点赞收藏,关注我,一起探索更高效的开发方式~
原文地址:https://blog.csdn.net/The_Thieves/article/details/148809652
欢迎光临 AI创想 (https://www.llms-ai.com/)
Powered by Discuz! X3.4