作者:CSDN博客
目录
1. 引言2. Pregel(了解)
2.1 简介
2.1.1 Actors2.1.2 Channels
2.2 Pregel API示例(这里是源代码里的示例)
2.2.1 单个节点2.2.2 多个节点2.2.3 Topic Channel2.2.4 BinaryOperatorAggregate Channel2.2.5 循环
3. 高级API
3.1 Graph API3.2 Functional API
参考
1. 引言
Pregel实现了LangGraph的运行时,管理LangGraph应用程序的执行。编译一个StateGraph或创建entrypoint都会生成一个Pregel实例,该实例可以接受输入后被调用。
2. Pregel(了解)
2.1 简介
在LangGraph中,Pregel将actors和channels合并为一个应用程序。Actors从channels读取数据并将其写入channels。Pregel将应用程序的执行组织成多个步骤,遵循Pregel算法/批量同步并行模型。
每个步骤包含三个阶段:
1. 计划:确定本步骤中需要执行的actors。例如,在第一步中选择订阅特定输入channel的actors,在后续步骤中选择订阅上一步更新channels的actors。
2. 执行:并行执行所有选定的actors,直到所有actors完成、一个actor失败或达到超时为止。在该阶段,channels更新对actors是不可见的,直到下一步。
3. 更新:使用actors在本步骤写入的值更新channels。
重复执行,直到没有actors被选中执行,或者达到最大步骤数。
2.1.1 Actors
一个actor是一个PregelNode。它订阅channels,从channels中读取数据,并将数据写入到channels。可以将其视为Pregel算法中的actor。PregelNode实现了LangChain的Runnable接口。
2.1.2 Channels
channels用于在actors之间进行通信。每个channel都有一个值类型、一个更新类型和一个更新函数——该函数接收一系列更新并修改存储的值。channels可用于从一个链向另一个链发送数据,或从一个链在未来步骤中向自身发送数据。LangGraph提供了多种内置channels:
1. LastValue:默认channel,存储发送到channel的最后一个值,适用于输入和输出值,或用于从一步向下一步发送数据。
2. Topic:一个可配置的PubSub主题,适用于在actors之间发送多个值,或用于累积输出。可以配置为去重值或在多个步骤期间累积值。
3. BinaryOperatorAggregate:存储一个持久值,通过将二进制运算符应用于当前值和发送到channel的每个更新来更新值,适用于在多个步骤中计算聚合。例如:total = BinaryOperatorAggregate(int, operator.add)
2.2 Pregel API示例(这里是源代码里的示例)
2.2.1 单个节点
- from langgraph.channels import EphemeralValue
- from langgraph.pregel import Pregel, Channel
- node1 =(
- Channel.subscribe_to("a")|(lambda x: x + x)| Channel.write_to("b"))
- app = Pregel(
- nodes={"node1": node1},
- channels={"a": EphemeralValue(str),"b": EphemeralValue(str),},
- input_channels=["a"],
- output_channels=["b"],)print(app.invoke({"a":"foo"}))
复制代码 运行结果为:2.2.2 多个节点
- from langgraph.channels import LastValue, EphemeralValue
- from langgraph.pregel import Pregel, Channel
- node1 =(
- Channel.subscribe_to("a")|(lambda x: x + x)| Channel.write_to("b"))
- node2 =(
- Channel.subscribe_to("b")|(lambda x: x + x)| Channel.write_to("c"))
- app = Pregel(
- nodes={"node1": node1,"node2": node2},
- channels={"a": EphemeralValue(str),"b": LastValue(str),"c": EphemeralValue(str),},
- input_channels=["a"],
- output_channels=["b","c"],)
- app.invoke({"a":"foo"})
复制代码 运行结果为:- {'b': 'foofoo', 'c': 'foofoofoofoo'}
复制代码 2.2.3 Topic Channel
- from langgraph.channels import EphemeralValue, Topic
- from langgraph.pregel import Pregel, Channel
- node1 =(
- Channel.subscribe_to("a")|(lambda x: x + x)|{"b": Channel.write_to("b"),"c": Channel.write_to("c")})
- node2 =(
- Channel.subscribe_to("b")|(lambda x: x + x)|{"c": Channel.write_to("c"),})
- app = Pregel(
- nodes={"node1": node1,"node2": node2},
- channels={"a": EphemeralValue(str),"b": EphemeralValue(str),"c": Topic(str, accumulate=True),},
- input_channels=["a"],
- output_channels=["c"],)
- app.invoke({"a":"foo"})
复制代码 运行结果为:- {'c': ['foofoo', 'foofoofoofoo']}
复制代码 2.2.4 BinaryOperatorAggregate Channel
- from langgraph.channels import EphemeralValue, BinaryOperatorAggregate
- from langgraph.pregel import Pregel, Channel
- node1 =(
- Channel.subscribe_to("a")|(lambda x: x + x)|{"b": Channel.write_to("b"),"c": Channel.write_to("c")})
- node2 =(
- Channel.subscribe_to("b")|(lambda x: x + x)|{"c": Channel.write_to("c"),})defreducer(current, update):if current:return current +" | "+ update
- else:return update
- app = Pregel(
- nodes={"node1": node1,"node2": node2},
- channels={"a": EphemeralValue(str),"b": EphemeralValue(str),"c": BinaryOperatorAggregate(str, operator=reducer),},
- input_channels=["a"],
- output_channels=["c"])
- app.invoke({"a":"foo"})
复制代码 运行结果为- {'c': 'foofoo | foofoofoofoo'}
复制代码 2.2.5 循环
- from langgraph.channels import EphemeralValue
- from langgraph.pregel import Pregel, Channel, ChannelWrite, ChannelWriteEntry
- example_node =(
- Channel.subscribe_to("value")|(lambda x: x + x iflen(x)<10elseNone)| ChannelWrite(writes=[ChannelWriteEntry(channel="value", skip_none=True)]))
- app = Pregel(
- nodes={"example_node": example_node},
- channels={"value": EphemeralValue(str),},
- input_channels=["value"],
- output_channels=["value"])
- app.invoke({"value":"a"})
复制代码 运行结果为:- {'value': 'aaaaaaaaaaaaaaaa'}
复制代码 3. 高级API
3.1 Graph API
StateGraph是一种高级抽象,简化了Pregel应用程序的创建。它允许你定义一个由节点和边组成的图。当你编译图时,StateGraph API会自动为你创建Pregel应用程序。- from typing import TypedDict, Optional
- from langgraph.constants import START
- from langgraph.graph import StateGraph
- classEssay(TypedDict):
- topic:str
- content: Optional[str]
- score: Optional[float]defwrite_essay(essay: Essay):return{"content":f"Essay about {essay['topic']}",}defscore_essay(essay: Essay):return{"score":10}
- builder = StateGraph(Essay)
- builder.add_node(write_essay)
- builder.add_node(score_essay)
- builder.add_edge(START,"write_essay")
- graph = builder.compile()print("Nodes:")print(graph.nodes)print("Edges:")print(graph.channels)
复制代码 运行结果:- Nodes:
- {'__start__': <langgraph.pregel.read.PregelNode object at 0x000001FCEDFDA000>, 'write_essay': <langgraph.pregel.read.PregelNode object at 0x000001FCEDFDA1E0>, 'score_essay': <langgraph.pregel.read.PregelNode object at 0x000001FCEDFDA360>}
- Edges:
- {'topic': <langgraph.channels.last_value.LastValue object at 0x000001FCEE3E0D40>, 'content': <langgraph.channels.last_value.LastValue object at 0x000001FCEE3E3EC0>, 'score': <langgraph.channels.last_value.LastValue object at 0x000001FCEE3E3B80>, '__start__': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x000001FCEE4D0D40>, 'branch:to:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x000001FCEE50ECC0>, 'branch:to:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x000001FCEE50C7C0>}
复制代码 3.2 Functional API
在Functional API中,你可以使用entrypoint来创建一个Pregel应用程序。entrypoint装饰器允许你定义一个接受输入并返回输出的函数。- from typing import TypedDict, Optional
- from langgraph.checkpoint.memory import InMemorySaver
- from langgraph.func import entrypoint
- classEssay(TypedDict):
- topic:str
- content: Optional[str]
- score: Optional[float]
- checkpointer = InMemorySaver()@entrypoint(checkpointer=checkpointer)defwrite_essay(essay: Essay):return{"content":f"Essay about {essay['topic']}",}print("Nodes: ")print(write_essay.nodes)print("Channels: ")print(write_essay.channels)
复制代码 运行结果为:- Nodes:
- {'write_essay': <langgraph.pregel.read.PregelNode object at 0x000001FCEDFDA5A0>}
- Channels:
- {'__start__': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x000001FCEE532140>, '__end__': <langgraph.channels.last_value.LastValue object at 0x000001FCEE310680>, '__previous__': <langgraph.channels.last_value.LastValue object at 0x000001FCEE5B31C0>}
复制代码 参考
https://langchain-ai.github.io/langgraph/concepts/low_level/
原文地址:https://blog.csdn.net/qq_51180928/article/details/148366919 |