AI创想

标题: LangGraph认知篇-Send机制 [打印本页]

作者: AI小编    时间: 昨天 22:43
标题: LangGraph认知篇-Send机制
作者:找了一圈尾巴
Send 机制简介

        在LangGraph 中,节点(Nodes)和边(Edges)默认是预先定义的,且基于同一个共享状态(state)进行操作。但在某些场景下,例如实现映射-规约(map-reduce)等设计模式时,可能存在以下需求:
        为满足上述需求,LangGraph 提供了 Send 机制,其核心功能是:通过条件边(conditional edges)动态生成下游节点的调用指令,实现状态的按需分发和节点的动态触发
Send机制与静态图结构的区别

特性静态图Send机制
分支数量预先定义运行时动态生成
状态传递所有节点共享同一状态各分支拥有独立状态
执行模式顺序/固定路径执行并行/动态路径执行
适用场景固定拓扑结构数据驱动的动态拓扑
Send 对象的核心属性

工作机制图解

(, 下载次数: 0)


官方代码示例
  1. from typing import Annotated, TypedDict
  2. import operator
  3. from langgraph.types import Send
  4. from langgraph.graph import StateGraph, END, START
  5. # 1. 定义整体状态结构
  6. class OverallState(TypedDict):
  7.     subjects: list[str]  # 存储需要生成笑话的主题列表
  8.     # 存储生成的笑话列表,使用operator.add指定合并方式(列表元素累加)
  9.     jokes: Annotated[list[str], operator.add]
  10. # 2. 定义条件边处理函数:动态生成Send对象
  11. def continue_to_jokes(state: OverallState):
  12.     # 为每个主题创建一个Send对象,指定调用"generate_joke"节点并传递该主题
  13.     return [Send("generate_joke", {"subject": s}) for s in state['subjects']]
  14. # 3. 构建状态图工作流
  15. builder = StateGraph(OverallState)
  16. # 4. 添加"generate_joke"节点:根据传入的主题生成笑话
  17. # 该节点接收包含"subject"的状态,返回包含笑话的字典(会合并到整体状态的jokes列表中)
  18. builder.add_node("generate_joke", lambda state: {"jokes": [f"Joke about {state['subject']}"]})
  19. # 5. 配置起始节点的条件边:通过continue_to_jokes函数动态决定下一步
  20. builder.add_conditional_edges(START, continue_to_jokes)
  21. # 6. 配置"generate_joke"节点的后续节点:生成笑话后到达结束节点
  22. builder.add_edge("generate_joke", END)
  23. # 7. 编译工作流图
  24. graph = builder.compile()
  25. # 8. 执行工作流:输入包含两个主题的状态
  26. result = graph.invoke({"subjects": ["cats", "dogs"]})
  27. print(result)
  28. # 输出:{'subjects': ['cats', 'dogs'], 'jokes': ['Joke about cats', 'Joke about dogs']}
复制代码
Map-reduce 设计模式简介

        映射 - 归约(Map-Reduce)是一种用于大规模数据处理的分布式计算设计模式,由 Google 提出,旨在通过拆分任务、并行处理和结果聚合,高效处理海量数据。其核心思想是将复杂任务分解为两个主要阶段:映射(Map) 和归约(Reduce),从而实现数据的分布式处理和结果的高效汇总。
核心思想与流程

(, 下载次数: 0)


        Map-Reduce 的工作流程可概括为 “拆分 - 处理 - 合并”,具体分为三个关键步骤:
Map-reduce 与 LangGraph 中 Send 机制的结合

        在 LangGraph 工作流框架中,Map-Reduce 模式可通过 Send 机制实现:
        这种结合既保留了 Map-Reduce 的并行处理能力,又通过动态节点调用适配了更灵活的工作流场景。
LangGraph 实现Map-Reduce 示例
  1. from typing import TypedDict
  2. from langgraph.graph import StateGraph, Send, END
  3. # 状态定义
  4. class MapReduceState(TypedDict):
  5.     documents: list[str]        # 初始文档
  6.     chunks: list[str]           # 文档分块
  7.     summaries: list[str]        # 分块摘要
  8.     final_summary: str          # 最终摘要
  9. # 1. 文档分块(初始节点)
  10. def split_documents(state: MapReduceState):
  11.     documents = state["documents"]
  12.     chunks = [chunk for doc in documents for chunk in split_text(doc)]
  13.     return {"chunks": chunks}
  14. # 2. 动态路由(Send核心)
  15. def map_router(state: MapReduceState):
  16.     return [
  17.         Send("summarize_chunk", {"chunk": c})
  18.         for c in state["chunks"]
  19.     ]
  20. # 3. 摘要生成(并行执行)
  21. def summarize_chunk(state: MapReduceState):
  22.     return {"summary": llm(f"Summarize: {state['chunk']}")}
  23. # 4. 结果聚合
  24. def reduce_summaries(state: MapReduceState):
  25.     all_summaries = [s for branch in state.values() if "summary" in s]
  26.     return {"final_summary": combine_summaries(all_summaries)}
  27. # 构建工作流
  28. graph = StateGraph(MapReduceState)
  29. graph.add_node("split_docs", split_documents)
  30. graph.add_node("summarize_chunk", summarize_chunk)
  31. graph.add_node("reduce", reduce_summaries)
  32. # 动态分支配置
  33. graph.add_conditional_edges("split_docs", map_router)
  34. graph.add_conditional_edges("summarize_chunk", lambda _: "reduce")
  35. graph.add_edge("reduce", END)
  36. # 执行工作流
  37. result = graph.invoke({
  38.     "documents": ["long_text1", "long_text2"],
  39.     "chunks": [],
  40.     "summaries": [],
  41.     "final_summary": ""
  42. })
  43. print(result["final_summary"])
复制代码
执行流程:
其核心机制包括:
  1. 动态路由(Send核心)
  2. def map_router(state: MapReduceState):
  3.     return [
  4.         Send("summarize_chunk", {"chunk": c})
  5.         for c in state["chunks"]
  6.     ]
复制代码
  1. # 主状态(分割后)
  2. {
  3.     "documents": [doc1, doc2],
  4.     "chunks": [chunk1, chunk2, chunk3, ...],
  5.     "chunk_summaries": [],
  6.     "final_summary": ""
  7. }
  8. # 分支独立状态示例
  9. [
  10.     {"chunk": "chunk1内容", "chunk_id": 0},  # 分支1
  11.     {"chunk": "chunk2内容", "chunk_id": 1},  # 分支2
  12.     {"chunk": "chunk3内容", "chunk_id": 2}   # 分支3
  13. ]
复制代码
  1. graph.add_conditional_edges("summarize_chunk", lambda _: "reduce")
  2. # 4. 结果聚合
  3. def reduce_summaries(state: MapReduceState):
  4.     all_summaries = [s for branch in state.values() if "summary" in s]
  5.     return {"final_summary": combine_summaries(all_summaries)}
复制代码
Send 机制典型应用场景

  1. Send("process_chunk", {"chunk": c}) for c in doc_chunks
复制代码
  1. Send("call_api", {"request": r}) for r in requests
复制代码
  1. Send("process_item", {"item": i}) for i in batch
复制代码
  1. Send(next_step, config) for config in dynamic_configs
复制代码
注意事项

参考文献

https://github.com/langchain-ai/langgraph
Overview

原文地址:https://blog.csdn.net/weixin_41645817/article/details/149813487




欢迎光临 AI创想 (https://www.llms-ai.com/) Powered by Discuz! X3.4