开启左侧

LangGraph 源码学习总结 1-Graph结构

[复制链接]
创想小编 发表于 4 小时前 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题
作者:CSDN博客
PregelProtocol、Pregel、CompiledGraph 与 CompiledStateGraph 的架构解析


  • PregelProtocol
    角色:最底层“通信协议”
    功能:
      定义了 invoke/stream/astream/get_state/update_state 等抽象接口,相当于“图必须长什么样”。本身不带任何实现,只规定“图必须能跑、能流式输出、能读写状态”。

  • Pregel
    角色:通用图执行引擎
    功能:
      完整实现了 PregelProtocol,用“channel + node”模型跑任意 DAG。节点就是普通 Runnable,边通过 Channel 隐式连接,数据以消息形式在 Channel 中流动。
    • 无状态概念,输入/输出都是 dict,内部状态只在一次 invoke 内有效。
      差异:最灵活,但写起来像“拼乐高”,需手动声明 channel 和节点。

  • CompiledGraph
    角色:把“声明式图”编译成可执行 Pregel 的中间层
    功能:
      由 Graph.compile() 返回,内部把“边+节点”翻译成 Pregel 所需的 channel 与 PregelNode。
    • 对用户仍暴露 PregelProtocol 接口(invoke/stream…),但隐藏了 channel 细节。
      差异:你不再手写 Channel,只需 .add_node / .add_edge,编译器帮你生成底层 Pregel。

  • CompiledStateGraph
    角色:带“状态模式”的 CompiledGraph
    功能:
      在 CompiledGraph 基础上额外实现“状态读写”与“状态模式校验”。要求节点函数签名 NodeFunc(State) -> State|UpdateDict,自动做状态合并/写回。
    • 支持 get_state/update_state 回溯、断点续跑、人机交互。
      差异:比 CompiledGraph 多一个“状态模式(schema)”,所有节点共享同一份结构化状态,适合多步对话、Agent 循环等场景。

一句话总结
    PregelProtocol:接口合同Pregel:最通用、无状态、手动连 channelCompiledGraph:帮你把“图”翻译成 Pregel,省掉 channel 细节CompiledStateGraph:在 CompiledGraph 之上再加“共享状态模式”,支持状态持久化与断点续跑
Channel

Channel 是“带类型的、可订阅的、可写的、可缓存的”数据槽。
    类型:声明它只能存什么(str/int/list…)。可订阅:节点用 Channel.subscribe_to("槽名") 表示“我要读这个槽”。可写:节点用 Channel.write_to("槽名") 表示“我把结果写进这个槽”。可缓存:一次 [invoke](file://src/pregel/p_1.py#19#4) 里,槽里的值会一直留着,供后续节点反复读。
传统 DAG 要显式画边:
  1. A ──edge──> B
复制代码
在 Pregel 里,边被“槽”取代:
    节点 A 把结果 write_to("x")
  • 节点 B 用 subscribe_to("x")
    于是 A→B 的依赖就“隐式”成立了——不需要在代码里写 add_edge("A","B"),只要它们读写同一个槽名即可。
Channel类

Channel 是 Pregel 框架中实现 数据流驱动执行 的核心工具类,通过声明式订阅(subscribe_to)和写入(write_to)机制,隐式连接节点(PregelNode)与数据流。
通过订阅和写入机制实现节点间松耦合通信,开发者无需手动管理数据流,只需声明依赖关系即可构建动态 DAG
  1. classChannel:@overload@classmethoddefsubscribe_to(
  2.         cls,
  3.         channels:str,*,
  4.         key:str|None=None,
  5.         tags:list[str]|None=None,)-> PregelNode:...@overload@classmethoddefsubscribe_to(
  6.         cls,
  7.         channels: Sequence[str],*,
  8.         key:None=None,
  9.         tags:list[str]|None=None,)-> PregelNode:...@classmethoddefsubscribe_to(
  10.         cls,
  11.         channels:str| Sequence[str],*,
  12.         key:str|None=None,
  13.         tags:list[str]|None=None,)-> PregelNode:"""Runs process.invoke() each time channels are updated,
  14.         with a dict of the channel values as input."""ifnotisinstance(channels,str)and key isnotNone:raise ValueError("Can't specify a key when subscribing to multiple channels")return PregelNode(
  15.             channels=cast(
  16.                 Union[list[str], Mapping[str,str]],({key: channels}ifisinstance(channels,str)and key isnotNoneelse([channels]ifisinstance(channels,str)else{chan: chan for chan in channels})),),
  17.             triggers=[channels]ifisinstance(channels,str)else channels,
  18.             tags=tags,)@classmethoddefwrite_to(
  19.         cls,*channels:str,**kwargs: WriteValue,)-> ChannelWrite:"""Writes to channels the result of the lambda, or None to skip writing."""return ChannelWrite([ChannelWriteEntry(c)for c in channels]+[(
  20.                     ChannelWriteEntry(k, mapper=v)ifcallable(v)else ChannelWriteEntry(k, value=v))for k, v in kwargs.items()])
复制代码
Channel.subscribe_to(“a”) 返回的确实是一个 PregelNode 实例,而不是“Channel”本身。
它只是用 “槽名” 去构造了一个 可运行节点(PregelNode),该节点在运行时会:

  • 从名为 “a” 的 真实 Channel 实例 里读当前值;
    把读到的值喂给后面的 Runnable(这里是 lambda x: x + x);
    最终通过 Channel.write_to(“b”) 把结果写到名为 “b” 的 Channel 里。
    所以“Channel”在这里只是 工厂类,负责生成“能读写 Channel 的节点”,真正的数据槽仍是你在 Pregel(…, channels={…}) 里声明的那些 BaseChannel 对象。
使用Pregel创建单结点图
  1. from langgraph.channels import EphemeralValue
  2. from langgraph.pregel import Pregel, Channel, ChannelWriteEntry
  3. node1 =(
  4.     Channel.subscribe_to("a")|(lambda x: x + x)| Channel.write_to("b"))
  5. app = Pregel(
  6.     nodes={"node1": node1},
  7.     channels={"a": EphemeralValue(str),"b": EphemeralValue(str),},
  8.     input_channels=["a"],
  9.     output_channels=["b"],)
  10. result = app.invoke({"a":"foo"})print(result)
复制代码
代码解释
  1. # 初始槽状态:a="foo", b=None
  2. node1 订阅 a,读到 "foo" → 计算得 "foofoo" → 写回 b  
  3. # 槽状态:a="foo", b="foofoo"  
  4. 没有节点再订阅 b,流程结束,输出 b 的值 "foofoo"
复制代码
整个过程中,数据就像“消息”一样从槽 a 流出,经过节点加工,再流入槽 b——这就是“在 Channel 中流动”。

原文地址:https://blog.csdn.net/Revivedsun/article/details/153478122
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

发布主题
阅读排行更多+

Powered by Discuz! X3.4© 2001-2013 Discuz Team.( 京ICP备17022993号-3 )