AI创想

标题: LangGraph(八)——LangGraph运行时 [打印本页]

作者: AI小编    时间: 昨天 23:00
标题: LangGraph(八)——LangGraph运行时
作者:CSDN博客
目录


1. 引言

  Pregel实现了LangGraph的运行时,管理LangGraph应用程序的执行。编译一个StateGraph或创建entrypoint都会生成一个Pregel实例,该实例可以接受输入后被调用。
2. Pregel(了解

2.1 简介

  在LangGraph中,Pregel将actors和channels合并为一个应用程序。Actors从channels读取数据并将其写入channels。Pregel将应用程序的执行组织成多个步骤,遵循Pregel算法/批量同步并行模型。
  每个步骤包含三个阶段:
  1. 计划:确定本步骤中需要执行的actors。例如,在第一步中选择订阅特定输入channel的actors,在后续步骤中选择订阅上一步更新channels的actors。
  2. 执行:并行执行所有选定的actors,直到所有actors完成、一个actor失败或达到超时为止。在该阶段,channels更新对actors是不可见的,直到下一步。
  3. 更新:使用actors在本步骤写入的值更新channels。
  重复执行,直到没有actors被选中执行,或者达到最大步骤数。
2.1.1 Actors

  一个actor是一个PregelNode。它订阅channels,从channels中读取数据,并将数据写入到channels。可以将其视为Pregel算法中的actor。PregelNode实现了LangChain的Runnable接口。
2.1.2 Channels

  channels用于在actors之间进行通信。每个channel都有一个值类型、一个更新类型和一个更新函数——该函数接收一系列更新并修改存储的值。channels可用于从一个链向另一个链发送数据,或从一个链在未来步骤中向自身发送数据。LangGraph提供了多种内置channels:
  1. LastValue:默认channel,存储发送到channel的最后一个值,适用于输入和输出值,或用于从一步向下一步发送数据。
  2. Topic:一个可配置的PubSub主题,适用于在actors之间发送多个值,或用于累积输出。可以配置为去重值或在多个步骤期间累积值。
  3. BinaryOperatorAggregate:存储一个持久值,通过将二进制运算符应用于当前值和发送到channel的每个更新来更新值,适用于在多个步骤中计算聚合。例如:total = BinaryOperatorAggregate(int, operator.add)
2.2 Pregel API示例(这里是源代码里的示例)

2.2.1 单个节点
  1. from langgraph.channels import EphemeralValue
  2. from langgraph.pregel import Pregel, Channel
  3. node1 =(
  4.         Channel.subscribe_to("a")|(lambda x: x + x)| Channel.write_to("b"))
  5. app = Pregel(
  6.     nodes={"node1": node1},
  7.     channels={"a": EphemeralValue(str),"b": EphemeralValue(str),},
  8.     input_channels=["a"],
  9.     output_channels=["b"],)print(app.invoke({"a":"foo"}))
复制代码
  运行结果为:
  1. {'b': 'foofoo'}
复制代码
2.2.2 多个节点
  1. from langgraph.channels import LastValue, EphemeralValue
  2. from langgraph.pregel import Pregel, Channel
  3. node1 =(
  4.         Channel.subscribe_to("a")|(lambda x: x + x)| Channel.write_to("b"))
  5. node2 =(
  6.         Channel.subscribe_to("b")|(lambda x: x + x)| Channel.write_to("c"))
  7. app = Pregel(
  8.     nodes={"node1": node1,"node2": node2},
  9.     channels={"a": EphemeralValue(str),"b": LastValue(str),"c": EphemeralValue(str),},
  10.     input_channels=["a"],
  11.     output_channels=["b","c"],)
  12. app.invoke({"a":"foo"})
复制代码
  运行结果为:
  1. {'b': 'foofoo', 'c': 'foofoofoofoo'}
复制代码
2.2.3 Topic Channel
  1. from langgraph.channels import EphemeralValue, Topic
  2. from langgraph.pregel import Pregel, Channel
  3. node1 =(
  4.         Channel.subscribe_to("a")|(lambda x: x + x)|{"b": Channel.write_to("b"),"c": Channel.write_to("c")})
  5. node2 =(
  6.         Channel.subscribe_to("b")|(lambda x: x + x)|{"c": Channel.write_to("c"),})
  7. app = Pregel(
  8.     nodes={"node1": node1,"node2": node2},
  9.     channels={"a": EphemeralValue(str),"b": EphemeralValue(str),"c": Topic(str, accumulate=True),},
  10.     input_channels=["a"],
  11.     output_channels=["c"],)
  12. app.invoke({"a":"foo"})
复制代码
  运行结果为:
  1. {'c': ['foofoo', 'foofoofoofoo']}
复制代码
2.2.4 BinaryOperatorAggregate Channel
  1. from langgraph.channels import EphemeralValue, BinaryOperatorAggregate
  2. from langgraph.pregel import Pregel, Channel
  3. node1 =(
  4.         Channel.subscribe_to("a")|(lambda x: x + x)|{"b": Channel.write_to("b"),"c": Channel.write_to("c")})
  5. node2 =(
  6.         Channel.subscribe_to("b")|(lambda x: x + x)|{"c": Channel.write_to("c"),})defreducer(current, update):if current:return current +" | "+ update
  7.     else:return update
  8. app = Pregel(
  9.     nodes={"node1": node1,"node2": node2},
  10.     channels={"a": EphemeralValue(str),"b": EphemeralValue(str),"c": BinaryOperatorAggregate(str, operator=reducer),},
  11.     input_channels=["a"],
  12.     output_channels=["c"])
  13. app.invoke({"a":"foo"})
复制代码
  运行结果为
  1. {'c': 'foofoo | foofoofoofoo'}
复制代码
2.2.5 循环
  1. from langgraph.channels import EphemeralValue
  2. from langgraph.pregel import Pregel, Channel, ChannelWrite, ChannelWriteEntry
  3. example_node =(
  4.         Channel.subscribe_to("value")|(lambda x: x + x iflen(x)<10elseNone)| ChannelWrite(writes=[ChannelWriteEntry(channel="value", skip_none=True)]))
  5. app = Pregel(
  6.     nodes={"example_node": example_node},
  7.     channels={"value": EphemeralValue(str),},
  8.     input_channels=["value"],
  9.     output_channels=["value"])
  10. app.invoke({"value":"a"})
复制代码
  运行结果为:
  1. {'value': 'aaaaaaaaaaaaaaaa'}
复制代码
3. 高级API

3.1 Graph API

  StateGraph是一种高级抽象,简化了Pregel应用程序的创建。它允许你定义一个由节点和边组成的图。当你编译图时,StateGraph API会自动为你创建Pregel应用程序。
  1. from typing import TypedDict, Optional
  2. from langgraph.constants import START
  3. from langgraph.graph import StateGraph
  4. classEssay(TypedDict):
  5.     topic:str
  6.     content: Optional[str]
  7.     score: Optional[float]defwrite_essay(essay: Essay):return{"content":f"Essay about {essay['topic']}",}defscore_essay(essay: Essay):return{"score":10}
  8. builder = StateGraph(Essay)
  9. builder.add_node(write_essay)
  10. builder.add_node(score_essay)
  11. builder.add_edge(START,"write_essay")
  12. graph = builder.compile()print("Nodes:")print(graph.nodes)print("Edges:")print(graph.channels)
复制代码
  运行结果:
  1. Nodes:
  2. {'__start__': <langgraph.pregel.read.PregelNode object at 0x000001FCEDFDA000>, 'write_essay': <langgraph.pregel.read.PregelNode object at 0x000001FCEDFDA1E0>, 'score_essay': <langgraph.pregel.read.PregelNode object at 0x000001FCEDFDA360>}
  3. Edges:
  4. {'topic': <langgraph.channels.last_value.LastValue object at 0x000001FCEE3E0D40>, 'content': <langgraph.channels.last_value.LastValue object at 0x000001FCEE3E3EC0>, 'score': <langgraph.channels.last_value.LastValue object at 0x000001FCEE3E3B80>, '__start__': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x000001FCEE4D0D40>, 'branch:to:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x000001FCEE50ECC0>, 'branch:to:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x000001FCEE50C7C0>}
复制代码
3.2 Functional API

  在Functional API中,你可以使用entrypoint来创建一个Pregel应用程序。entrypoint装饰器允许你定义一个接受输入并返回输出的函数。
  1. from typing import TypedDict, Optional
  2. from langgraph.checkpoint.memory import InMemorySaver
  3. from langgraph.func import entrypoint
  4. classEssay(TypedDict):
  5.     topic:str
  6.     content: Optional[str]
  7.     score: Optional[float]
  8. checkpointer = InMemorySaver()@entrypoint(checkpointer=checkpointer)defwrite_essay(essay: Essay):return{"content":f"Essay about {essay['topic']}",}print("Nodes: ")print(write_essay.nodes)print("Channels: ")print(write_essay.channels)
复制代码
  运行结果为:
  1. Nodes:
  2. {'write_essay': <langgraph.pregel.read.PregelNode object at 0x000001FCEDFDA5A0>}
  3. Channels:
  4. {'__start__': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x000001FCEE532140>, '__end__': <langgraph.channels.last_value.LastValue object at 0x000001FCEE310680>, '__previous__': <langgraph.channels.last_value.LastValue object at 0x000001FCEE5B31C0>}
复制代码
参考

https://langchain-ai.github.io/langgraph/concepts/low_level/

原文地址:https://blog.csdn.net/qq_51180928/article/details/148366919




欢迎光临 AI创想 (https://www.llms-ai.com/) Powered by Discuz! X3.4