作者:佑瞻
在开发智能应用时,我们常常面临这样的困境:如何在不改变现有代码结构的前提下,为应用添加持久化、并行执行和人机交互等高级功能?传统框架往往要求大规模重构代码,而 LangGraph 的函数式 API(Functional API)却能以最小的改动解决这些问题。今天我们就来探索这个 "代码增强器",看看如何用几行装饰器让你的应用脱胎换骨。
一、函数式 API 核心:用装饰器开启工作流革命
函数式 API 的设计哲学非常贴合实际开发需求 —— 不强迫我们改变编程习惯,而是通过@entrypoint和@task两个装饰器,像插件一样为现有函数注入强大能力。这种 "无感升级" 的体验,就像给普通轿车装上涡轮增压引擎,无需改造车身就能获得澎湃动力。
1.1 工作流的入口:@entrypoint
@entrypoint是工作流的起点,它将普通函数转换为具备持久化和中断处理能力的工作流入口:
python- from langgraph.func import entrypoint
- from langgraph.checkpoint.memory import MemorySaver
- @entrypoint(checkpointer=MemorySaver())
- def data_analysis_workflow(inputs: dict) -> dict:
- """数据分析工作流,处理输入并请求审核"""
- # 调用任务处理数据
- processed = data_processor(inputs).result()
- # 中断等待人工审核
- is_approved = interrupt({
- "data": processed,
- "action": "请审核分析结果"
- })
- return {"processed": processed, "approved": is_approved}
复制代码 这里的checkpointer参数至关重要,它负责将工作流状态保存到检查点,实现断点续传。就像我们在编辑文档时定期保存,避免意外丢失进度。
1.2 离散任务的封装:@task
@task用于定义可异步执行的任务单元,这些任务会自动支持重试、缓存等高级功能:
python- @task
- def data_processor(inputs: dict) -> dict:
- """模拟耗时的数据处理任务"""
- import time
- time.sleep(2) # 模拟API调用或复杂计算
- return {"processed": True, "result": f"分析结果: {inputs['raw_data']}"}
复制代码 任务只能在@entrypoint或其他任务内部调用,返回的 future 对象允许我们异步处理结果,就像点餐后不必守在厨房,而是可以继续做其他事情。
二、高级功能:让工作流如虎添翼
2.1 并行执行:释放 I/O 密集型任务的潜力
对于 API 调用等耗时操作,并行执行能显著提升性能。我们可以同时发起多个任务,然后等待所有结果:
python- @task
- def fetch_weather(city: str) -> str:
- """获取城市天气的任务"""
- # 模拟API调用
- import random
- return f"{city}的天气是{random.choice(['晴朗', '多云', '小雨'])}"
- @entrypoint(checkpointer=MemorySaver())
- def weather_forecast(cities: list[str]) -> list[str]:
- """获取多个城市天气预报的工作流"""
- futures = [fetch_weather(city) for city in cities]
- # 并行执行并收集结果
- return [future.result() for future in futures]
- # 调用示例
- result = weather_forecast.invoke(["北京", "上海", "广州"])
- print(result) # 输出各城市天气
复制代码 这种方式就像派多个快递员同时取件,比逐个取件效率高得多,尤其适合同时调用多个 LLM API 的场景。
2.2 混合编程:函数式与图形 API 的无缝协作
LangGraph 的函数式 API 和图形 API(StateGraph)可以在同一应用中混用,发挥各自优势:
python- from langgraph.func import entrypoint
- from langgraph.graph import StateGraph
- # 定义图形API的状态图
- builder = StateGraph()
- # 添加节点和边...
- data_graph = builder.compile()
- @entrypoint()
- def hybrid_workflow(inputs: dict) -> dict:
- """混合使用两种API的工作流"""
- # 调用图形API的状态图
- graph_result = data_graph.invoke(inputs)
- # 调用函数式API的任务
- task_result = data_processor(inputs).result()
- return {"graph": graph_result, "task": task_result}
复制代码 这种组合就像同时使用螺丝刀和扳手,根据任务特点选择最适合的工具,让开发更灵活。
2.3 流式处理:实时反馈的用户体验
函数式 API 支持与图形 API 相同的流式处理机制,让长时间运行的任务能实时反馈进度:
python- from langgraph.func import entrypoint
- from langgraph.config import get_stream_writer
- @entrypoint(checkpointer=MemorySaver())
- def file_processor(file_path: str) -> str:
- """文件处理工作流,实时反馈进度"""
- writer = get_stream_writer()
- writer("开始读取文件...")
-
- with open(file_path, "r") as f:
- content = f.read()
- writer(f"文件读取完成,共{len(content)}字节")
-
- # 模拟处理过程
- processed = content.upper()
- writer("文件处理完成")
-
- return processed
- # 流式调用
- config = {"configurable": {"thread_id": "file_process"}}
- for mode, chunk in file_processor.stream("data.txt", config=config):
- print(f"{mode}: {chunk}")
复制代码 流式处理就像直播比赛,观众不用等到结束就能看到实时进展,大大改善用户体验。
三、任务控制:重试、缓存与错误恢复
3.1 重试策略:自动处理临时故障
网络波动等临时故障可以通过重试策略自动解决,减少人工干预:
python- from langgraph.types import RetryPolicy
- # 配置重试策略:遇到ValueError时重试
- retry_policy = RetryPolicy(retry_on=ValueError, max_attempts=3)
- @task(retry=retry_policy)
- def fragile_api_call(params: dict) -> dict:
- """可能失败的API调用任务"""
- import random
- if random.random() < 0.5:
- raise ValueError("临时网络错误")
- return {"success": True, "data": "结果"}
- @entrypoint(checkpointer=MemorySaver())
- def robust_workflow(inputs: dict) -> dict:
- """包含重试机制的工作流"""
- return fragile_api_call(inputs).result()
复制代码 重试策略就像自动重试的购票软件,不必手动刷新,系统会自动尝试直到成功。
3.2 缓存任务:避免重复计算
对于频繁调用的耗时任务,缓存可以显著提升性能:
python- from langgraph.cache.memory import InMemoryCache
- from langgraph.types import CachePolicy
- @task(cache_policy=CachePolicy(ttl=60)) # 缓存60秒
- def expensive_calculation(x: int) -> int:
- """耗时的计算任务"""
- import time
- time.sleep(2)
- return x * x
- @entrypoint(cache=InMemoryCache())
- def calculation_workflow(inputs: dict) -> dict:
- """使用缓存的计算工作流"""
- # 两次调用相同参数,第二次会命中缓存
- result1 = expensive_calculation(inputs["x"]).result()
- result2 = expensive_calculation(inputs["x"]).result()
- return {"result1": result1, "result2": result2, "is_cached": result2 == result1}
复制代码 缓存就像备忘录,记住之前的计算结果,避免重复劳动,尤其适合需要多次处理相同数据的场景。
3.3 错误恢复:断点续传的实现
当工作流因错误中断后,恢复执行时无需重新运行已完成的任务:
python- @task
- def slow_task() -> str:
- """模拟耗时任务"""
- import time
- time.sleep(2)
- return "任务完成"
- @task
- def fragile_task() -> str:
- """第一次调用会失败的任务"""
- global attempt
- attempt += 1
- if attempt < 2:
- raise ValueError("模拟错误")
- return "成功"
- @entrypoint(checkpointer=MemorySaver())
- def error_recovery_workflow(inputs: dict) -> str:
- """错误恢复工作流"""
- slow_result = slow_task().result()
- fragile_result = fragile_task().result()
- return f"{slow_result}, {fragile_result}"
- # 第一次调用会抛出错误
- try:
- error_recovery_workflow.invoke({"key": "value"})
- except ValueError:
- pass # 处理错误
- # 恢复执行时,slow_task不会重新运行
- result = error_recovery_workflow.invoke(None)
- print(result) # 输出:任务完成, 成功
复制代码 错误恢复机制就像视频播放的断点续播,从上次中断的地方继续,节省时间和资源。
四、状态管理:记忆与解耦的艺术
4.1 短期记忆:同一会话的状态延续
短期记忆允许在同一线程 ID 的多次调用间共享状态,实现累加等功能:
python- @entrypoint(checkpointer=MemorySaver())
- def accumulator_workflow(n: int, *, previous: int = None) -> int:
- """累加器工作流,记住上次结果"""
- previous = previous or 0
- return previous + n
- # 第一次调用
- result1 = accumulator_workflow.invoke(1, config={"configurable": {"thread_id": "counter"}})
- print(result1) # 输出: 1
- # 第二次调用,previous记住了上次的1
- result2 = accumulator_workflow.invoke(2, config={"configurable": {"thread_id": "counter"}})
- print(result2) # 输出: 3
复制代码 短期记忆就像计算器的 MR(记忆恢复)按钮,记住之前的计算结果,支持连续运算。
4.2 解耦返回值与保存值:灵活的状态控制
有时我们需要返回一个值给调用者,同时保存另一个值到检查点,这时可以使用entrypoint.final:
python- from langgraph.func import entrypoint, entrypoint_final
- @entrypoint(checkpointer=MemorySaver())
- def special_counter(n: int, *, previous: int = None) -> entrypoint_final[int, int]:
- """返回当前累加值,但保存双倍值作为下次的previous"""
- previous = previous or 0
- current = previous + n
- # 返回current给调用者,但保存2*current到检查点
- return entrypoint_final(value=current, save=2 * current)
- # 第一次调用:返回0+3=3,保存6
- print(special_counter.invoke(3)) # 输出: 3
- # 第二次调用:previous是6,返回6+1=7,保存14
- print(special_counter.invoke(1)) # 输出: 7
复制代码 这种解耦就像银行账户,我们可以展示当前余额给用户,同时保存交易记录用于后续对账,提供更灵活的状态管理。
五、人在回路:让工作流支持人工干预
函数式 API 通过interrupt函数和Command原语支持人机交互,实现需要人工审核的场景:
python- from langgraph.types import interrupt, Command
- @entrypoint(checkpointer=MemorySaver())
- def content_review_workflow(content: str) -> dict:
- """内容审核工作流"""
- # 生成内容摘要
- summary = generate_summary(content).result()
-
- # 中断工作流,等待人工审核
- review_result = interrupt({
- "content": content,
- "summary": summary,
- "action": "请审核内容是否合规"
- })
-
- if review_result["approved"]:
- return {"status": "approved", "content": content}
- else:
- return {"status": "rejected", "reason": review_result.get("reason")}
- # 第一次调用,工作流中断并返回等待审核
- initial_result = content_review_workflow.invoke("待审核内容")
- print(initial_result["status"]) # 输出: waiting_for_approval
- # 人工审核后,传递审核结果恢复工作流
- approved_result = {"approved": True, "reason": "内容合规"}
- final_result = content_review_workflow.invoke(
- Command(resume=approved_result),
- config={"configurable": {"thread_id": "review_thread"}}
- )
- print(final_result["status"]) # 输出: approved
复制代码 人在回路机制就像流水线的质量检测环节,机器处理后由人工把关,确保结果符合预期。
六、实战案例:构建有记忆的聊天机器人
下面我们通过一个完整案例,演示如何使用函数式 API 构建一个能记住对话历史的聊天机器人:
python- from langchain_core.messages import BaseMessage
- from langgraph.graph import add_messages
- from langgraph.func import entrypoint, task
- from langgraph.checkpoint.memory import MemorySaver
- from langchain_anthropic import ChatAnthropic
- # 初始化LLM模型
- model = ChatAnthropic(model="claude-3-5-sonnet-latest")
- @task
- def call_llm(messages: list[BaseMessage]) -> BaseMessage:
- """调用LLM模型的任务"""
- return model.invoke(messages)
- @entrypoint(checkpointer=MemorySaver())
- def chatbot_workflow(inputs: list[BaseMessage], *, previous: list[BaseMessage] = None) -> BaseMessage:
- """聊天机器人工作流,记住对话历史"""
- # 合并历史消息和新输入
- all_messages = add_messages(previous or [], inputs)
-
- # 调用LLM生成回复
- response = call_llm(all_messages).result()
-
- # 返回回复给用户,但保存完整对话历史到检查点
- return entrypoint_final(value=response, save=add_messages(all_messages, response))
- # 配置工作流执行
- config = {"configurable": {"thread_id": "chat_thread"}}
- # 第一次对话
- first_message = {"role": "user", "content": "你好!我是小明"}
- for chunk in chatbot_workflow.stream([first_message], config=config):
- print("机器人回复:", chunk.content) # 输出机器人的回复
- # 第二次对话,机器人会记住之前的交流
- second_message = {"role": "user", "content": "我刚才说过什么?"}
- for chunk in chatbot_workflow.stream([second_message], config=config):
- print("机器人回复:", chunk.content) # 输出包含历史信息的回复
复制代码 这个聊天机器人通过短期记忆保存对话历史,实现多轮交互,就像人类聊天时能记住之前的话题,让对话更自然流畅。
七、总结与拓展方向
函数式 API 为我们提供了一种轻量级的工作流解决方案,其核心优势在于:
非侵入性:无需重构现有代码,几行装饰器即可添加高级功能灵活性:支持标准 Python 控制流,兼容函数式与图形 API完备性:涵盖并行执行、持久化、人机交互等复杂场景需求
如果本文对你有帮助,别忘了点赞收藏,关注我,一起探索更高效的开发方式~
原文地址:https://blog.csdn.net/The_Thieves/article/details/148831057 |