开启左侧

LangGraph 函数式 API 实战:用极简方式构建强大工作流系统

[复制链接]
AI小编 发表于 昨天 22:48 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题
作者:佑瞻
在开发智能应用时,我们常常面临这样的困境:如何在不改变现有代码结构的前提下,为应用添加持久化、并行执行和人机交互等高级功能?传统框架往往要求大规模重构代码,而 LangGraph 的函数式 API(Functional API)却能以最小的改动解决这些问题。今天我们就来探索这个 "代码增强器",看看如何用几行装饰器让你的应用脱胎换骨。
一、函数式 API 核心:用装饰器开启工作流革命

函数式 API 的设计哲学非常贴合实际开发需求 —— 不强迫我们改变编程习惯,而是通过@entrypoint和@task两个装饰器,像插件一样为现有函数注入强大能力。这种 "无感升级" 的体验,就像给普通轿车装上涡轮增压引擎,无需改造车身就能获得澎湃动力。
1.1 工作流的入口:@entrypoint

@entrypoint是工作流的起点,它将普通函数转换为具备持久化和中断处理能力的工作流入口:
python
  1. from langgraph.func import entrypoint
  2. from langgraph.checkpoint.memory import MemorySaver
  3. @entrypoint(checkpointer=MemorySaver())
  4. def data_analysis_workflow(inputs: dict) -> dict:
  5.     """数据分析工作流,处理输入并请求审核"""
  6.     # 调用任务处理数据
  7.     processed = data_processor(inputs).result()
  8.     # 中断等待人工审核
  9.     is_approved = interrupt({
  10.         "data": processed,
  11.         "action": "请审核分析结果"
  12.     })
  13.     return {"processed": processed, "approved": is_approved}
复制代码
这里的checkpointer参数至关重要,它负责将工作流状态保存到检查点,实现断点续传。就像我们在编辑文档时定期保存,避免意外丢失进度。
1.2 离散任务的封装:@task

@task用于定义可异步执行的任务单元,这些任务会自动支持重试、缓存等高级功能:
python
  1. @task
  2. def data_processor(inputs: dict) -> dict:
  3.     """模拟耗时的数据处理任务"""
  4.     import time
  5.     time.sleep(2)  # 模拟API调用或复杂计算
  6.     return {"processed": True, "result": f"分析结果: {inputs['raw_data']}"}
复制代码
任务只能在@entrypoint或其他任务内部调用,返回的 future 对象允许我们异步处理结果,就像点餐后不必守在厨房,而是可以继续做其他事情。
二、高级功能:让工作流如虎添翼

2.1 并行执行:释放 I/O 密集型任务的潜力

对于 API 调用等耗时操作,并行执行能显著提升性能。我们可以同时发起多个任务,然后等待所有结果:
python
  1. @task
  2. def fetch_weather(city: str) -> str:
  3.     """获取城市天气的任务"""
  4.     # 模拟API调用
  5.     import random
  6.     return f"{city}的天气是{random.choice(['晴朗', '多云', '小雨'])}"
  7. @entrypoint(checkpointer=MemorySaver())
  8. def weather_forecast(cities: list[str]) -> list[str]:
  9.     """获取多个城市天气预报的工作流"""
  10.     futures = [fetch_weather(city) for city in cities]
  11.     # 并行执行并收集结果
  12.     return [future.result() for future in futures]
  13. # 调用示例
  14. result = weather_forecast.invoke(["北京", "上海", "广州"])
  15. print(result)  # 输出各城市天气
复制代码
这种方式就像派多个快递员同时取件,比逐个取件效率高得多,尤其适合同时调用多个 LLM API 的场景。
2.2 混合编程:函数式与图形 API 的无缝协作

LangGraph 的函数式 API 和图形 API(StateGraph)可以在同一应用中混用,发挥各自优势:
python
  1. from langgraph.func import entrypoint
  2. from langgraph.graph import StateGraph
  3. # 定义图形API的状态图
  4. builder = StateGraph()
  5. # 添加节点和边...
  6. data_graph = builder.compile()
  7. @entrypoint()
  8. def hybrid_workflow(inputs: dict) -> dict:
  9.     """混合使用两种API的工作流"""
  10.     # 调用图形API的状态图
  11.     graph_result = data_graph.invoke(inputs)
  12.     # 调用函数式API的任务
  13.     task_result = data_processor(inputs).result()
  14.     return {"graph": graph_result, "task": task_result}
复制代码
这种组合就像同时使用螺丝刀和扳手,根据任务特点选择最适合的工具,让开发更灵活。
2.3 流式处理:实时反馈的用户体验

函数式 API 支持与图形 API 相同的流式处理机制,让长时间运行的任务能实时反馈进度:
python
  1. from langgraph.func import entrypoint
  2. from langgraph.config import get_stream_writer
  3. @entrypoint(checkpointer=MemorySaver())
  4. def file_processor(file_path: str) -> str:
  5.     """文件处理工作流,实时反馈进度"""
  6.     writer = get_stream_writer()
  7.     writer("开始读取文件...")
  8.    
  9.     with open(file_path, "r") as f:
  10.         content = f.read()
  11.         writer(f"文件读取完成,共{len(content)}字节")
  12.    
  13.     # 模拟处理过程
  14.     processed = content.upper()
  15.     writer("文件处理完成")
  16.    
  17.     return processed
  18. # 流式调用
  19. config = {"configurable": {"thread_id": "file_process"}}
  20. for mode, chunk in file_processor.stream("data.txt", config=config):
  21.     print(f"{mode}: {chunk}")
复制代码
流式处理就像直播比赛,观众不用等到结束就能看到实时进展,大大改善用户体验。
三、任务控制:重试、缓存与错误恢复

3.1 重试策略:自动处理临时故障

网络波动等临时故障可以通过重试策略自动解决,减少人工干预:
python
  1. from langgraph.types import RetryPolicy
  2. # 配置重试策略:遇到ValueError时重试
  3. retry_policy = RetryPolicy(retry_on=ValueError, max_attempts=3)
  4. @task(retry=retry_policy)
  5. def fragile_api_call(params: dict) -> dict:
  6.     """可能失败的API调用任务"""
  7.     import random
  8.     if random.random() < 0.5:
  9.         raise ValueError("临时网络错误")
  10.     return {"success": True, "data": "结果"}
  11. @entrypoint(checkpointer=MemorySaver())
  12. def robust_workflow(inputs: dict) -> dict:
  13.     """包含重试机制的工作流"""
  14.     return fragile_api_call(inputs).result()
复制代码
重试策略就像自动重试的购票软件,不必手动刷新,系统会自动尝试直到成功。
3.2 缓存任务:避免重复计算

对于频繁调用的耗时任务,缓存可以显著提升性能:
python
  1. from langgraph.cache.memory import InMemoryCache
  2. from langgraph.types import CachePolicy
  3. @task(cache_policy=CachePolicy(ttl=60))  # 缓存60秒
  4. def expensive_calculation(x: int) -> int:
  5.     """耗时的计算任务"""
  6.     import time
  7.     time.sleep(2)
  8.     return x * x
  9. @entrypoint(cache=InMemoryCache())
  10. def calculation_workflow(inputs: dict) -> dict:
  11.     """使用缓存的计算工作流"""
  12.     # 两次调用相同参数,第二次会命中缓存
  13.     result1 = expensive_calculation(inputs["x"]).result()
  14.     result2 = expensive_calculation(inputs["x"]).result()
  15.     return {"result1": result1, "result2": result2, "is_cached": result2 == result1}
复制代码
缓存就像备忘录,记住之前的计算结果,避免重复劳动,尤其适合需要多次处理相同数据的场景。
3.3 错误恢复:断点续传的实现

当工作流因错误中断后,恢复执行时无需重新运行已完成的任务:
python
  1. @task
  2. def slow_task() -> str:
  3.     """模拟耗时任务"""
  4.     import time
  5.     time.sleep(2)
  6.     return "任务完成"
  7. @task
  8. def fragile_task() -> str:
  9.     """第一次调用会失败的任务"""
  10.     global attempt
  11.     attempt += 1
  12.     if attempt < 2:
  13.         raise ValueError("模拟错误")
  14.     return "成功"
  15. @entrypoint(checkpointer=MemorySaver())
  16. def error_recovery_workflow(inputs: dict) -> str:
  17.     """错误恢复工作流"""
  18.     slow_result = slow_task().result()
  19.     fragile_result = fragile_task().result()
  20.     return f"{slow_result}, {fragile_result}"
  21. # 第一次调用会抛出错误
  22. try:
  23.     error_recovery_workflow.invoke({"key": "value"})
  24. except ValueError:
  25.     pass  # 处理错误
  26. # 恢复执行时,slow_task不会重新运行
  27. result = error_recovery_workflow.invoke(None)
  28. print(result)  # 输出:任务完成, 成功
复制代码
错误恢复机制就像视频播放的断点续播,从上次中断的地方继续,节省时间和资源。
四、状态管理:记忆与解耦的艺术

4.1 短期记忆:同一会话的状态延续

短期记忆允许在同一线程 ID 的多次调用间共享状态,实现累加等功能:
python
  1. @entrypoint(checkpointer=MemorySaver())
  2. def accumulator_workflow(n: int, *, previous: int = None) -> int:
  3.     """累加器工作流,记住上次结果"""
  4.     previous = previous or 0
  5.     return previous + n
  6. # 第一次调用
  7. result1 = accumulator_workflow.invoke(1, config={"configurable": {"thread_id": "counter"}})
  8. print(result1)  # 输出: 1
  9. # 第二次调用,previous记住了上次的1
  10. result2 = accumulator_workflow.invoke(2, config={"configurable": {"thread_id": "counter"}})
  11. print(result2)  # 输出: 3
复制代码
短期记忆就像计算器的 MR(记忆恢复)按钮,记住之前的计算结果,支持连续运算。
4.2 解耦返回值与保存值:灵活的状态控制

有时我们需要返回一个值给调用者,同时保存另一个值到检查点,这时可以使用entrypoint.final:
python
  1. from langgraph.func import entrypoint, entrypoint_final
  2. @entrypoint(checkpointer=MemorySaver())
  3. def special_counter(n: int, *, previous: int = None) -> entrypoint_final[int, int]:
  4.     """返回当前累加值,但保存双倍值作为下次的previous"""
  5.     previous = previous or 0
  6.     current = previous + n
  7.     # 返回current给调用者,但保存2*current到检查点
  8.     return entrypoint_final(value=current, save=2 * current)
  9. # 第一次调用:返回0+3=3,保存6
  10. print(special_counter.invoke(3))  # 输出: 3
  11. # 第二次调用:previous是6,返回6+1=7,保存14
  12. print(special_counter.invoke(1))  # 输出: 7
复制代码
这种解耦就像银行账户,我们可以展示当前余额给用户,同时保存交易记录用于后续对账,提供更灵活的状态管理。
五、人在回路:让工作流支持人工干预

函数式 API 通过interrupt函数和Command原语支持人机交互,实现需要人工审核的场景:
python
  1. from langgraph.types import interrupt, Command
  2. @entrypoint(checkpointer=MemorySaver())
  3. def content_review_workflow(content: str) -> dict:
  4.     """内容审核工作流"""
  5.     # 生成内容摘要
  6.     summary = generate_summary(content).result()
  7.    
  8.     # 中断工作流,等待人工审核
  9.     review_result = interrupt({
  10.         "content": content,
  11.         "summary": summary,
  12.         "action": "请审核内容是否合规"
  13.     })
  14.    
  15.     if review_result["approved"]:
  16.         return {"status": "approved", "content": content}
  17.     else:
  18.         return {"status": "rejected", "reason": review_result.get("reason")}
  19. # 第一次调用,工作流中断并返回等待审核
  20. initial_result = content_review_workflow.invoke("待审核内容")
  21. print(initial_result["status"])  # 输出: waiting_for_approval
  22. # 人工审核后,传递审核结果恢复工作流
  23. approved_result = {"approved": True, "reason": "内容合规"}
  24. final_result = content_review_workflow.invoke(
  25.     Command(resume=approved_result),
  26.     config={"configurable": {"thread_id": "review_thread"}}
  27. )
  28. print(final_result["status"])  # 输出: approved
复制代码
人在回路机制就像流水线的质量检测环节,机器处理后由人工把关,确保结果符合预期。
六、实战案例:构建有记忆的聊天机器人

下面我们通过一个完整案例,演示如何使用函数式 API 构建一个能记住对话历史的聊天机器人:
python
  1. from langchain_core.messages import BaseMessage
  2. from langgraph.graph import add_messages
  3. from langgraph.func import entrypoint, task
  4. from langgraph.checkpoint.memory import MemorySaver
  5. from langchain_anthropic import ChatAnthropic
  6. # 初始化LLM模型
  7. model = ChatAnthropic(model="claude-3-5-sonnet-latest")
  8. @task
  9. def call_llm(messages: list[BaseMessage]) -> BaseMessage:
  10.     """调用LLM模型的任务"""
  11.     return model.invoke(messages)
  12. @entrypoint(checkpointer=MemorySaver())
  13. def chatbot_workflow(inputs: list[BaseMessage], *, previous: list[BaseMessage] = None) -> BaseMessage:
  14.     """聊天机器人工作流,记住对话历史"""
  15.     # 合并历史消息和新输入
  16.     all_messages = add_messages(previous or [], inputs)
  17.    
  18.     # 调用LLM生成回复
  19.     response = call_llm(all_messages).result()
  20.    
  21.     # 返回回复给用户,但保存完整对话历史到检查点
  22.     return entrypoint_final(value=response, save=add_messages(all_messages, response))
  23. # 配置工作流执行
  24. config = {"configurable": {"thread_id": "chat_thread"}}
  25. # 第一次对话
  26. first_message = {"role": "user", "content": "你好!我是小明"}
  27. for chunk in chatbot_workflow.stream([first_message], config=config):
  28.     print("机器人回复:", chunk.content)  # 输出机器人的回复
  29. # 第二次对话,机器人会记住之前的交流
  30. second_message = {"role": "user", "content": "我刚才说过什么?"}
  31. for chunk in chatbot_workflow.stream([second_message], config=config):
  32.     print("机器人回复:", chunk.content)  # 输出包含历史信息的回复
复制代码
这个聊天机器人通过短期记忆保存对话历史,实现多轮交互,就像人类聊天时能记住之前的话题,让对话更自然流畅。
七、总结与拓展方向

函数式 API 为我们提供了一种轻量级的工作流解决方案,其核心优势在于:
    非侵入性:无需重构现有代码,几行装饰器即可添加高级功能灵活性:支持标准 Python 控制流,兼容函数式与图形 API完备性:涵盖并行执行、持久化、人机交互等复杂场景需求
如果本文对你有帮助,别忘了点赞收藏,关注我,一起探索更高效的开发方式~

原文地址:https://blog.csdn.net/The_Thieves/article/details/148831057
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

发布主题
阅读排行更多+

Powered by Discuz! X3.4© 2001-2013 Discuz Team.( 京ICP备17022993号-3 )