开启左侧

LangGraph 指南篇-基础控制

[复制链接]
AI小编 发表于 3 小时前 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题
作者:找了一圈尾巴
前言

       本文聚焦 LangGraph 的基础控制方法,通过详细讲解与代码示例实现,帮助读者快速掌握其基础控制的写法,为后续开发 LangGraph Agent 工作流奠定基础。
状态管理

        在 LangGraph 中,状态管理是构建复杂工作流的核心机制。节点通过返回值更新图状态,状态更新规则由 State 对象的定义决定。
一、状态定义方式

    TypedDict(适合简单场景):轻量型定义,适合结构简单、无需复杂验证的状态。
  1. # TypedDict 示例(简单场景)
  2. from typing import TypedDict
  3. class InputState(TypedDict):
  4.     user_input: str
复制代码
    BaseModel(适合复杂场景):基于 Pydantic,支持类型验证、约束定义和嵌套结构,适合复杂状态。
  1. # BaseModel 示例(复杂场景)
  2. from pydantic import BaseModel
  3. class State(BaseModel):
  4.     # 1. 工具调用相关消息列表(精确类型+长度约束)
  5.     messages: List[BaseMessage] = Field(
  6.         ...,  # 必须提供初始值,不允许为空
  7.         min_length=1,  # 至少包含1条消息(原300可能为笔误,消息数量通常不需要这么大,调整为合理值)
  8.         description="工具调用的完整消息记录,包含用户指令、工具输入/输出、系统提示等,每条消息需为BaseMessage子类(如HumanMessage、ToolMessage)"
  9.     )
  10.    
  11.     # 2. 元数据(结构化键值对,明确包含的信息)
  12.     metadata: Dict[str, Union[str, int, datetime, bool]] = Field(
  13.         default_factory=lambda: {
  14.             "session_id": "",  # 会话唯一标识
  15.             "step": 0,  # 当前流程步骤(从0开始)
  16.             "last_tool": None,  # 上一次调用的工具名称(如"search")
  17.             "is_finished": False,  # 流程是否已结束
  18.             "created_at": datetime.now()  # 状态创建时间
  19.         },
  20.         description="状态元数据,包含会话标识、流程进度、工具调用记录、时间戳等关键信息,值类型支持字符串、数字、时间、布尔值"
  21.     )
复制代码
二、状态更新机制

        节点通过返回值更新状态,LangGraph 使用 Reducer(规约器) 控制更新逻辑。
    默认 Reducer(覆盖更新)未指定 Reducer 时,节点返回的字段直接覆盖原状态对应字段。
  1. from typing_extensions import TypedDict
  2. class State(TypedDict):
  3.     foo: int
  4.     bar: list[str]
  5. # 节点函数:仅更新foo字段
  6. def node_1(state: State) -> dict:
  7.     return {"foo": 2}  # 无需返回bar,原bar值会保留
  8. # 初始状态:{"foo": 1, "bar": ["hi"]}
  9. # 节点执行后状态:{"foo": 2, "bar": ["hi"]}(foo被覆盖,bar不变)
复制代码
    内置 Reducer:通过 Annotated 指定内置 Reducer,实现复杂更新(如列表追加)。
  1. from typing import Annotated
  2. from langgraph.graph import add
  3. class State(TypedDict):
  4.     logs: Annotated[List[str], add]  # 自动追加新元素
  5. def add_log(state: State) -> dict:
  6.     return {"logs": ["step 1 completed"]}  # 追加到现有logs
  7. # 初始:{"logs": ["start"]} → 执行后:{"logs": ["start", "step 1 completed"]}
复制代码
    add_messages:消息列表专用(处理消息去重 / 更新)
  1. from typing import Annotated
  2. from langgraph.graph.message import add_messages
  3. from langchain_core.messages import AnyMessage
  4. class State(TypedDict):
  5.     messages: Annotated[List[AnyMessage], add_messages]  # 智能合并消息
  6. def add_ai_message(state: State) -> dict:
  7.     from langchain_core.messages import AIMessage
  8.     return {"messages": [AIMessage(content="Hello!")]}  # 自动追加新消息
复制代码
    自定义 Reducer:根据业务需求定义更新逻辑(如重要消息置顶)。
  1. from typing import Annotated
  2. # 自定义Reducer:带"important"标签的消息置顶
  3. def priority_adder(current: List[dict], new: List[dict]) -> List[dict]:
  4.     for item in new:
  5.         if "important" in item.get("tags", []):
  6.             current.insert(0, item)  # 置顶
  7.         else:
  8.             current.append(item)  # 普通追加
  9.     return current
  10. class State(TypedDict):
  11.     notifications: Annotated[List[dict], priority_adder]  # 使用自定义Reducer
复制代码
    MessagesState(对话场景专用)预定义状态类,内置 messages 字段和 add_messages Reducer,可直接扩展。
  1. from langgraph.graph import MessagesState
  2. class ChatState(MessagesState):
  3.     user_id: str  # 扩展字段:用户ID
  4.     topic: str = ""  # 扩展字段:对话主题
  5. # 节点:添加消息并更新主题
  6. def update_chat(state: ChatState) -> dict:
  7.     from langchain_core.messages import HumanMessage
  8.     return {
  9.         "messages": [HumanMessage(content="讨论AI应用")],
  10.         "topic": "AI应用"
  11.     }
复制代码
创建步骤序列-add_sequence

        Sequence 是 LangGraph 中最基础的工作流模式,指节点按固定顺序依次执行的线性流程。每个节点完成后,状态会传递给下一个节点,直到所有节点执行完毕。
        适用于步骤固定的场景,如:数据预处理→模型推理→结果格式化、多轮工具调用的依次执行等。
        LangGraph 有两种方式定义节点顺序:
    方式 1:手动添加节点和边(基础方式)
  1. from langgraph.graph import START, StateGraph
  2. # 1. 初始化图(绑定状态类型)
  3. builder = StateGraph(State)
  4. # 2. 添加节点(节点名默认为函数名,如"step_1")
  5. builder.add_node(step_1)  # 等价于 builder.add_node("step_1", step_1)
  6. builder.add_node(step_2)
  7. builder.add_node(step_3)
  8. # 3. 定义执行顺序:START → step_1 → step_2 → step_3
  9. builder.add_edge(START, "step_1")       # 起始点连接第一个节点
  10. builder.add_edge("step_1", "step_2")    # 节点1执行完→节点2
  11. builder.add_edge("step_2", "step_3")    # 节点2执行完→节点3
  12. # 4. 编译图(生成可执行对象)
  13. graph = builder.compile()
复制代码
    方式2:使用add_sequence简化(推荐)
  1. # 一行代码添加序列节点,自动按列表顺序连接
  2. builder = StateGraph(State).add_sequence([step_1, step_2, step_3])
  3. builder.add_edge(START, "step_1")  # 只需指定起始点
  4. graph = builder.compile()
复制代码
分支机制(Branches)

        LangGraph 支持通过分支机制实现节点的并行执行、条件路由等复杂工作流,核心包括并行执行延迟执行条件分支三种模式。
一、并行执行节点(Parallel Execution)

        通过 “扇出(fan-out)→ 扇入(fan-in)” 机制让多个节点同时执行,提升工作流效率,适用于可并行处理的任务(如多源数据获取、并行计算等)。
    实现核心:
      扇出:从一个节点同时连接到多个节点(如 A → B 和 A → C);
      扇入:多个节点的输出汇聚到同一个后续节点(如 B → D 和 C → D);
      聚合规则:通过 reducer 定义并行节点的状态合并方式(如列表拼接)。
    关键代码示例:
  1. # 示例:A → B/C (并行) → D
  2. builder = StateGraph(State)
  3. builder.add_edge("a", "b")  # 并行分支1
  4. builder.add_edge("a", "c")  # 并行分支2
  5. builder.add_edge("b", "d")  # 汇聚点
  6. builder.add_edge("c", "d")  # 汇聚点
复制代码
二、延迟执行节点(Defer Execution)

        当分支长度不同时(如一条分支多一个节点),让后续节点等待所有前置分支完成后再执行,避免因分支进度不一致导致的状态不完整。
    实现核心:对需要等待的节点设置 defer=True,使其延迟执行,直到所有指向它的前置节点(包括长分支)都完成。
    关键代码示例:
  1. # 在并行执行示例基础上,给B添加一个子节点B_2
  2. def b_2(state: State): return {"aggregate": ["B_2"]}
  3. builder = StateGraph(State)
  4. builder.add_node(a)
  5. builder.add_node(b)
  6. builder.add_node(b_2)  # B的后续节点
  7. builder.add_node(c)
  8. builder.add_node(d, defer=True)  # D延迟执行,等待所有前置完成
  9. # 分支结构:A→B→B_2→D,A→C→D
  10. builder.add_edge(START, "a")
  11. builder.add_edge("a", "b")
  12. builder.add_edge("a", "c")
  13. builder.add_edge("b", "b_2")
  14. builder.add_edge("b_2", "d")
  15. builder.add_edge("c", "d")
  16. builder.add_edge("d", END)
  17. graph = builder.compile()
  18. graph.invoke({"aggregate": []})
  19. # 输出:['A', 'B', 'C', 'B_2', 'D'](D等待B_2完成后执行)
复制代码
三、条件分支(Conditional Branching)

        根据运行时的状态动态选择执行路径,适用于需要根据输入或中间结果决策的场景(如根据用户问题类型路由到不同处理节点)。
    实现核心:使用 add_conditional_edges 定义条件路由函数,根据状态返回下一步节点名称(可返回单个或多个节点)。
    动态路由:
  1. class State(TypedDict):
  2.     which: str  # 决策字段
  3. def conditional_edge(state: State) -> Literal["b", "c"]:
  4.     return state["which"]  # 动态路由
  5. builder.add_conditional_edges("a", conditional_edge)
复制代码
    单/多分支路由:
  1. def route_nodes(state: State) -> Sequence[str]:
  2.     if state["type"] == "complex":
  3.         return ["c", "d"]  # 多分支
  4.     return ["b"]  # 单分支
复制代码
MapReduce 模式

        在 LangGraph 中,Map-Reduce 是一种通过并行处理子任务(Map 阶段) 再聚合结果(Reduce 阶段) 的工作流模式,适用于需要批量处理多个相似任务并汇总结果的场景(如多文档摘要、多源数据整合等)。其核心通过 Send API 实现动态扇出(Fan-out)并行任务,再通过汇聚节点完成结果合并。
    代码示例
  1. import operator
  2. from typing import Annotated
  3. from langchain_openai import ChatOpenAI
  4. from typing_extensions import TypedDict
  5. from langgraph.types import Send
  6. from langgraph.graph import END, StateGraph, START
  7. from pydantic import BaseModel, Field
  8. """
  9. 生成与{topic}相关的1到3个示例的逗号分隔列表。
  10. 生成一个关于{subject}的笑话
  11. 下面是一些关于{topic}的笑话。选择最好的一个!返回最佳的ID。
  12. {jokes}
  13. """
  14. subjects_prompt = """Generate a comma separated list of between 1 and 3 examples related to: {topic}."""
  15. joke_prompt = """Generate a joke about {subject}"""
  16. best_joke_prompt = """Below are a bunch of jokes about {topic}. Select the best one! Return the ID of the best one.
  17. {jokes}"""
  18. class Subjects(BaseModel):
  19.     subjects: list[str]
  20. class Joke(BaseModel):
  21.     joke: str
  22. class BestJoke(BaseModel):
  23.     id: int = Field(description="Index of the best joke, starting with 0")
  24. from dotenv import load_dotenv
  25. load_dotenv()
  26. model = ChatOpenAI(model="gpt-4o-mini")
  27. class OverallState(TypedDict):
  28.     topic: str
  29.     subjects: list
  30.     jokes: Annotated[list, operator.add]
  31.     best_selected_joke: str
  32. class JokeState(TypedDict):
  33.     subject: str
  34. def generate_topics(state: OverallState):
  35.     prompt = subjects_prompt.format(topic=state["topic"])
  36.     response = model.with_structured_output(Subjects).invoke(prompt)
  37.     return {"subjects": response.subjects}
  38. def generate_joke(state: JokeState):
  39.     prompt = joke_prompt.format(subject=state["subject"])
  40.     response = model.with_structured_output(Joke).invoke(prompt)
  41.     return {"jokes": [response.joke]}
  42. def continue_to_jokes(state: OverallState):
  43.     return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]
  44. """for循环与列表推导式 以时间换空间的方式   python没有真正意义上的多线程"""
  45. def best_joke(state: OverallState):
  46.     jokes = "\n\n".join(state["jokes"])
  47.     prompt = best_joke_prompt.format(topic=state["topic"], jokes=jokes)
  48.     response = model.with_structured_output(BestJoke).invoke(prompt)
  49.     return {"best_selected_joke": state["jokes"][response.id]}
  50. graph = StateGraph(OverallState)
  51. graph.add_node("generate_topics", generate_topics)
  52. graph.add_node("generate_joke", generate_joke)
  53. graph.add_node("best_joke", best_joke)
  54. graph.add_edge(START, "generate_topics")
  55. graph.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])
  56. graph.add_edge("generate_joke", "best_joke")
  57. graph.add_edge("best_joke", END)
  58. app = graph.compile()
  59. from IPython.display import Image
  60. Image(app.get_graph().draw_mermaid_png(output_file_path='./img/示例4.png'))
  61. # Call the graph: here we call it to generate a list of jokes
  62. for s in app.stream({"topic": "animals"}):
  63.     print(s)
复制代码
两个关键阶段:
  1. def generate_joke(state: JokeState):
  2.     prompt = joke_prompt.format(subject=state["subject"])
  3.     response = model.with_structured_output(Joke).invoke(prompt)
  4.     return {"jokes": [response.joke]}
  5. def continue_to_jokes(state: OverallState):
  6.     return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]
复制代码
    Map 阶段:将一个总任务拆分为多个子任务,并行执行(如为每个子项生成结果);
      generate_joke:将主任务分解为多个子任务
      continue_to_jokes:使用 Send API 动态创建并行任务

  1. class OverallState(TypedDict):
  2.     topic: str
  3.     subjects: list
  4.     jokes: Annotated[list, operator.add]
  5.     best_selected_joke: str
  6. def best_joke(state: OverallState):
  7.     jokes = "\n\n".join(state["jokes"])
  8.     prompt = best_joke_prompt.format(topic=state["topic"], jokes=jokes)
  9.     response = model.with_structured_output(BestJoke).invoke(prompt)
  10.     return {"best_selected_joke": state["jokes"][response.id]}
复制代码
    Reduce 阶段:收集所有子任务的结果,进行汇总、筛选或加工(如从多个结果中选最优)。
      jokes 字段:通过 operator.add 规约器实现并行结果的自动拼接,确保多个子任务的输出被合并。
      best_joke:从聚合的jokes中筛选最优结果。

循环控制机制

循环结构的三要素
  1. builder.add_edge(START, "a")           # 入口节点
  2. builder.add_conditional_edges("a", route)  # 条件分支
  3. builder.add_edge("b", "a")             # 循环回边
复制代码
    入口节点:工作流起点
    条件分支:决定继续循环或退出
    循环回边:实现节点间循环
终止条件实现
  1. def route(state: State) -> Literal["b", END]:
  2.     if len(state["aggregate"]) < 7:  # 终止条件判断
  3.         return "b"   # 继续循环
  4.     else:
  5.         return END   # 退出循环
复制代码
递归深度控制

        若终止条件可能无法满足(如逻辑漏洞),需设置递归限制(recursion limit),规定最大执行步数(超级步骤),超过则抛出异常。
  1. from langgraph.errors import GraphRecursionError
  2. try:
  3.     # 设置递归限制为4(最多执行4个超级步骤)
  4.     graph.invoke({"aggregate": []}, {"recursion_limit": 4})
  5. except GraphRecursionError:
  6.     print("Recursion Error: 超过最大执行步数")
复制代码
可视化图

        LangGraph 提供了多种方式可视化工作流图,帮助开发者调试、展示节点关系和执行流程。核心支持 Mermaid 语法生成 和 PNG 图像渲染,适用于不同场景(如文档嵌入、调试分析)。
Mermaid 语法生成

        Mermaid 是一种文本驱动的图表描述语言,可生成流程图、时序图等。LangGraph 可直接将图转换为 Mermaid 语法,便于嵌入文档或通过 Mermaid 工具渲染。
    代码示例:
  1. # 生成 Mermaid 流程图语法
  2. mermaid_code = graph.get_graph().draw_mermaid()
  3. print(mermaid_code)
复制代码
    生成示例:
  1. %%{init: {'flowchart': {'curve': 'linear'}}}%%
  2. graph TD;
  3.     __start__([<p>__start__</p>]):::first
  4.     entry_node(entry_node)
  5.     node_entry_node_A(node_entry_node_A)
  6.     __end__([<p>__end__</p>]):::last
  7.     __start__ --> entry_node;
  8.     entry_node --> __end__;
  9.     entry_node --> node_entry_node_A;
  10.     node_entry_node_A -.-> entry_node;  # 虚线表示条件边
  11.     classDef default fill:#f2f0ff,line-height:1.2
复制代码
PNG 图像渲染

        若需要图片格式(如用于报告、演示),LangGraph 支持通过多种工具生成 PNG 图像,以下是三种常用方式:
1. 基于 Mermaid.ink API

        Mermaid.ink 是一个免费的在线 Mermaid 渲染服务,LangGraph 可直接调用该 API 生成 PNG,无需安装本地工具。
    代码示例:
  1. from IPython.display import Image, display
  2. # 生成 PNG 并显示(默认使用 Mermaid.ink)
  3. png_data = graph.get_graph().draw_mermaid_png()
  4. display(Image(png_data))
复制代码
    生成示例:
LangGraph 指南篇-基础控制-1.png


2. 基于 Mermaid + Pyppeteer

        通过 Pyppeteer(Headless Chrome 工具)在本地渲染 Mermaid 语法为 PNG,支持更多自定义选项(如节点颜色、背景色)。
    安装依赖
  1. pip install pyppeteer nest-asyncio  # nest-asyncio 用于 Jupyter 环境
复制代码
    代码示例:
  1. import nest_asyncio
  2. from langchain_core.runnables.graph import CurveStyle, NodeStyles, MermaidDrawMethod
  3. from IPython.display import Image, display
  4. nest_asyncio.apply()  # Jupyter 环境需启用异步支持
  5. # 自定义样式:线条类型、节点颜色、背景等
  6. png_data = graph.get_graph().draw_mermaid_png(
  7.     curve_style=CurveStyle.LINEAR,  # 线条样式:线性
  8.     node_colors=NodeStyles(
  9.         first="#ffdfba",  # 起始节点颜色
  10.         last="#baffc9",   # 终止节点颜色
  11.         default="#fad7de" # 默认节点颜色
  12.     ),
  13.     background_color="white",  # 背景色
  14.     padding=10,  # 边距
  15.     draw_method=MermaidDrawMethod.PYPPETEER  # 使用 Pyppeteer 本地渲染
  16. )
  17. display(Image(png_data))
复制代码
    生成示例:
LangGraph 指南篇-基础控制-2.png


3.基于 Graphviz

        Graphviz 是一款强大的开源绘图工具,LangGraph 可通过 pygraphviz 库调用其生成 PNG,适合复杂图的精细化展示。
    安装依赖
      安装 Graphviz 软件(官网);
      安装 Python 依赖:

  1. pip install pygraphviz
复制代码
    代码示例:
  1. from IPython.display import Image, display
  2. try:
  3.     png_data = graph.get_graph().draw_png()  # 直接生成 PNG
  4.     display(Image(png_data))
  5. except ImportError:
  6.     print("请安装 pygraphviz 和 Graphviz 工具")
复制代码
适用场景总结

可视化方式
优点
缺点
适用场景
Mermaid 语法
轻量、跨平台、易存储
需工具渲染才能可视化
文档嵌入、版本控制、快速分享
Mermaid.ink API
零依赖、快速生成
依赖网络、样式有限
临时调试、简单展示
Mermaid + Pyppeteer
本地渲染、样式丰富
需安装依赖
自定义需求高的本地展示
Graphviz
复杂图布局优、格式多样
安装复杂
专业报告、复杂工作流展示
参考文献

Use the Graph API

原文地址:https://blog.csdn.net/weixin_41645817/article/details/149915750
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

发布主题
阅读排行更多+

Powered by Discuz! X3.4© 2001-2013 Discuz Team.( 京ICP备17022993号-3 )