AI创想
标题:
LangGraph 函数式 API 实战:用极简方式构建强大工作流系统
[打印本页]
作者:
AI小编
时间:
昨天 22:48
标题:
LangGraph 函数式 API 实战:用极简方式构建强大工作流系统
作者:佑瞻
在开发智能应用时,我们常常面临这样的困境:如何在不改变现有代码结构的前提下,为应用添加持久化、并行执行和人机交互等高级功能?传统框架往往要求大规模重构代码,而 LangGraph 的函数式 API(Functional API)却能以最小的改动解决这些问题。今天我们就来探索这个 "代码增强器",看看如何用几行装饰器让你的应用脱胎换骨。
一、函数式 API 核心:用装饰器开启工作流革命
函数式 API 的设计哲学非常贴合实际开发需求 —— 不强迫我们改变编程习惯,而是通过@entrypoint和@task两个装饰器,像插件一样为现有函数注入强大能力。这种 "无感升级" 的体验,就像给普通轿车装上涡轮增压引擎,无需改造车身就能获得澎湃动力。
1.1 工作流的入口:@entrypoint
@entrypoint是工作流的起点,它将普通函数转换为具备持久化和中断处理能力的工作流入口:
python
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import MemorySaver
@entrypoint(checkpointer=MemorySaver())
def data_analysis_workflow(inputs: dict) -> dict:
"""数据分析工作流,处理输入并请求审核"""
# 调用任务处理数据
processed = data_processor(inputs).result()
# 中断等待人工审核
is_approved = interrupt({
"data": processed,
"action": "请审核分析结果"
})
return {"processed": processed, "approved": is_approved}
复制代码
这里的checkpointer参数至关重要,它负责将工作流状态保存到检查点,实现断点续传。就像我们在编辑文档时定期保存,避免意外丢失进度。
1.2 离散任务的封装:@task
@task用于定义可异步执行的任务单元,这些任务会自动支持重试、缓存等高级功能:
python
@task
def data_processor(inputs: dict) -> dict:
"""模拟耗时的数据处理任务"""
import time
time.sleep(2) # 模拟API调用或复杂计算
return {"processed": True, "result": f"分析结果: {inputs['raw_data']}"}
复制代码
任务只能在@entrypoint或其他任务内部调用,返回的 future 对象允许我们异步处理结果,就像点餐后不必守在厨房,而是可以继续做其他事情。
二、高级功能:让工作流如虎添翼
2.1 并行执行:释放 I/O 密集型任务的潜力
对于 API 调用等耗时操作,并行执行能显著提升性能。我们可以同时发起多个任务,然后等待所有结果:
python
@task
def fetch_weather(city: str) -> str:
"""获取城市天气的任务"""
# 模拟API调用
import random
return f"{city}的天气是{random.choice(['晴朗', '多云', '小雨'])}"
@entrypoint(checkpointer=MemorySaver())
def weather_forecast(cities: list[str]) -> list[str]:
"""获取多个城市天气预报的工作流"""
futures = [fetch_weather(city) for city in cities]
# 并行执行并收集结果
return [future.result() for future in futures]
# 调用示例
result = weather_forecast.invoke(["北京", "上海", "广州"])
print(result) # 输出各城市天气
复制代码
这种方式就像派多个快递员同时取件,比逐个取件效率高得多,尤其适合同时调用多个 LLM API 的场景。
2.2 混合编程:函数式与图形 API 的无缝协作
LangGraph 的函数式 API 和图形 API(StateGraph)可以在同一应用中混用,发挥各自优势:
python
from langgraph.func import entrypoint
from langgraph.graph import StateGraph
# 定义图形API的状态图
builder = StateGraph()
# 添加节点和边...
data_graph = builder.compile()
@entrypoint()
def hybrid_workflow(inputs: dict) -> dict:
"""混合使用两种API的工作流"""
# 调用图形API的状态图
graph_result = data_graph.invoke(inputs)
# 调用函数式API的任务
task_result = data_processor(inputs).result()
return {"graph": graph_result, "task": task_result}
复制代码
这种组合就像同时使用螺丝刀和扳手,根据任务特点选择最适合的工具,让开发更灵活。
2.3 流式处理:实时反馈的用户体验
函数式 API 支持与图形 API 相同的流式处理机制,让长时间运行的任务能实时反馈进度:
python
from langgraph.func import entrypoint
from langgraph.config import get_stream_writer
@entrypoint(checkpointer=MemorySaver())
def file_processor(file_path: str) -> str:
"""文件处理工作流,实时反馈进度"""
writer = get_stream_writer()
writer("开始读取文件...")
with open(file_path, "r") as f:
content = f.read()
writer(f"文件读取完成,共{len(content)}字节")
# 模拟处理过程
processed = content.upper()
writer("文件处理完成")
return processed
# 流式调用
config = {"configurable": {"thread_id": "file_process"}}
for mode, chunk in file_processor.stream("data.txt", config=config):
print(f"{mode}: {chunk}")
复制代码
流式处理就像直播比赛,观众不用等到结束就能看到实时进展,大大改善用户体验。
三、任务控制:重试、缓存与错误恢复
3.1 重试策略:自动处理临时故障
网络波动等临时故障可以通过重试策略自动解决,减少人工干预:
python
from langgraph.types import RetryPolicy
# 配置重试策略:遇到ValueError时重试
retry_policy = RetryPolicy(retry_on=ValueError, max_attempts=3)
@task(retry=retry_policy)
def fragile_api_call(params: dict) -> dict:
"""可能失败的API调用任务"""
import random
if random.random() < 0.5:
raise ValueError("临时网络错误")
return {"success": True, "data": "结果"}
@entrypoint(checkpointer=MemorySaver())
def robust_workflow(inputs: dict) -> dict:
"""包含重试机制的工作流"""
return fragile_api_call(inputs).result()
复制代码
重试策略就像自动重试的购票软件,不必手动刷新,系统会自动尝试直到成功。
3.2 缓存任务:避免重复计算
对于频繁调用的耗时任务,缓存可以显著提升性能:
python
from langgraph.cache.memory import InMemoryCache
from langgraph.types import CachePolicy
@task(cache_policy=CachePolicy(ttl=60)) # 缓存60秒
def expensive_calculation(x: int) -> int:
"""耗时的计算任务"""
import time
time.sleep(2)
return x * x
@entrypoint(cache=InMemoryCache())
def calculation_workflow(inputs: dict) -> dict:
"""使用缓存的计算工作流"""
# 两次调用相同参数,第二次会命中缓存
result1 = expensive_calculation(inputs["x"]).result()
result2 = expensive_calculation(inputs["x"]).result()
return {"result1": result1, "result2": result2, "is_cached": result2 == result1}
复制代码
缓存就像备忘录,记住之前的计算结果,避免重复劳动,尤其适合需要多次处理相同数据的场景。
3.3 错误恢复:断点续传的实现
当工作流因错误中断后,恢复执行时无需重新运行已完成的任务:
python
@task
def slow_task() -> str:
"""模拟耗时任务"""
import time
time.sleep(2)
return "任务完成"
@task
def fragile_task() -> str:
"""第一次调用会失败的任务"""
global attempt
attempt += 1
if attempt < 2:
raise ValueError("模拟错误")
return "成功"
@entrypoint(checkpointer=MemorySaver())
def error_recovery_workflow(inputs: dict) -> str:
"""错误恢复工作流"""
slow_result = slow_task().result()
fragile_result = fragile_task().result()
return f"{slow_result}, {fragile_result}"
# 第一次调用会抛出错误
try:
error_recovery_workflow.invoke({"key": "value"})
except ValueError:
pass # 处理错误
# 恢复执行时,slow_task不会重新运行
result = error_recovery_workflow.invoke(None)
print(result) # 输出:任务完成, 成功
复制代码
错误恢复机制就像视频播放的断点续播,从上次中断的地方继续,节省时间和资源。
四、状态管理:记忆与解耦的艺术
4.1 短期记忆:同一会话的状态延续
短期记忆允许在同一线程 ID 的多次调用间共享状态,实现累加等功能:
python
@entrypoint(checkpointer=MemorySaver())
def accumulator_workflow(n: int, *, previous: int = None) -> int:
"""累加器工作流,记住上次结果"""
previous = previous or 0
return previous + n
# 第一次调用
result1 = accumulator_workflow.invoke(1, config={"configurable": {"thread_id": "counter"}})
print(result1) # 输出: 1
# 第二次调用,previous记住了上次的1
result2 = accumulator_workflow.invoke(2, config={"configurable": {"thread_id": "counter"}})
print(result2) # 输出: 3
复制代码
短期记忆就像计算器的 MR(记忆恢复)按钮,记住之前的计算结果,支持连续运算。
4.2 解耦返回值与保存值:灵活的状态控制
有时我们需要返回一个值给调用者,同时保存另一个值到检查点,这时可以使用entrypoint.final:
python
from langgraph.func import entrypoint, entrypoint_final
@entrypoint(checkpointer=MemorySaver())
def special_counter(n: int, *, previous: int = None) -> entrypoint_final[int, int]:
"""返回当前累加值,但保存双倍值作为下次的previous"""
previous = previous or 0
current = previous + n
# 返回current给调用者,但保存2*current到检查点
return entrypoint_final(value=current, save=2 * current)
# 第一次调用:返回0+3=3,保存6
print(special_counter.invoke(3)) # 输出: 3
# 第二次调用:previous是6,返回6+1=7,保存14
print(special_counter.invoke(1)) # 输出: 7
复制代码
这种解耦就像银行账户,我们可以展示当前余额给用户,同时保存交易记录用于后续对账,提供更灵活的状态管理。
五、人在回路:让工作流支持人工干预
函数式 API 通过interrupt函数和Command原语支持人机交互,实现需要人工审核的场景:
python
from langgraph.types import interrupt, Command
@entrypoint(checkpointer=MemorySaver())
def content_review_workflow(content: str) -> dict:
"""内容审核工作流"""
# 生成内容摘要
summary = generate_summary(content).result()
# 中断工作流,等待人工审核
review_result = interrupt({
"content": content,
"summary": summary,
"action": "请审核内容是否合规"
})
if review_result["approved"]:
return {"status": "approved", "content": content}
else:
return {"status": "rejected", "reason": review_result.get("reason")}
# 第一次调用,工作流中断并返回等待审核
initial_result = content_review_workflow.invoke("待审核内容")
print(initial_result["status"]) # 输出: waiting_for_approval
# 人工审核后,传递审核结果恢复工作流
approved_result = {"approved": True, "reason": "内容合规"}
final_result = content_review_workflow.invoke(
Command(resume=approved_result),
config={"configurable": {"thread_id": "review_thread"}}
)
print(final_result["status"]) # 输出: approved
复制代码
人在回路机制就像流水线的质量检测环节,机器处理后由人工把关,确保结果符合预期。
六、实战案例:构建有记忆的聊天机器人
下面我们通过一个完整案例,演示如何使用函数式 API 构建一个能记住对话历史的聊天机器人:
python
from langchain_core.messages import BaseMessage
from langgraph.graph import add_messages
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import MemorySaver
from langchain_anthropic import ChatAnthropic
# 初始化LLM模型
model = ChatAnthropic(model="claude-3-5-sonnet-latest")
@task
def call_llm(messages: list[BaseMessage]) -> BaseMessage:
"""调用LLM模型的任务"""
return model.invoke(messages)
@entrypoint(checkpointer=MemorySaver())
def chatbot_workflow(inputs: list[BaseMessage], *, previous: list[BaseMessage] = None) -> BaseMessage:
"""聊天机器人工作流,记住对话历史"""
# 合并历史消息和新输入
all_messages = add_messages(previous or [], inputs)
# 调用LLM生成回复
response = call_llm(all_messages).result()
# 返回回复给用户,但保存完整对话历史到检查点
return entrypoint_final(value=response, save=add_messages(all_messages, response))
# 配置工作流执行
config = {"configurable": {"thread_id": "chat_thread"}}
# 第一次对话
first_message = {"role": "user", "content": "你好!我是小明"}
for chunk in chatbot_workflow.stream([first_message], config=config):
print("机器人回复:", chunk.content) # 输出机器人的回复
# 第二次对话,机器人会记住之前的交流
second_message = {"role": "user", "content": "我刚才说过什么?"}
for chunk in chatbot_workflow.stream([second_message], config=config):
print("机器人回复:", chunk.content) # 输出包含历史信息的回复
复制代码
这个聊天机器人通过短期记忆保存对话历史,实现多轮交互,就像人类聊天时能记住之前的话题,让对话更自然流畅。
七、总结与拓展方向
函数式 API 为我们提供了一种轻量级的工作流解决方案,其核心优势在于:
非侵入性
:无需重构现有代码,几行装饰器即可添加高级功能
灵活性
:支持标准 Python 控制流,兼容函数式与图形 API
完备性
:涵盖并行执行、持久化、人机交互等复杂场景需求
如果本文对你有帮助,别忘了点赞收藏,关注我,一起探索更高效的开发方式~
原文地址:https://blog.csdn.net/The_Thieves/article/details/148831057
欢迎光临 AI创想 (https://www.llms-ai.com/)
Powered by Discuz! X3.4