开启左侧

LangGraph框架

[复制链接]
AI小编 发表于 10 小时前 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题
作者:大海里行船
课程地址
课程地址
一、介绍

LangGraph框架-1.png

LangGraph框架-2.png


二、快速体验LangGraph和langchain的差异
  1. pip install langgraph
  2. ## 后面的案例中会用到langchain,所以同时也安装下langchain的依赖
  3. pip install langchain
  4. pip install langchain_community
复制代码
1、比较创建智能体的差异

就是包装了一层Langchain
Langchain构建
  1. # 制定OpenAI的API_KEY。ifnot os.environ.get("OPENAI_API_KEY"):
  2.     os.environ["OPENAI_API_KEY"]= load_key("OPENAI_API_KEY")from langchain.chat_models import init_chat_model
  3. # 创建访问OpenAI的Model。# model = init_chat_model("gpt-4o-mini", model_provider="openai")# openai在国内是无法直接访问的,需要科学上网。这里指定base_url是因为使用的是openai的国内代理,2233.ai。
  4. model = init_chat_model("gpt-4o-mini", model_provider="openai", base_url="https://api.gptsapi.net/v1")
  5. model.invoke("你是谁?能帮我解决什么问题?")
复制代码
LangGraph构建
  1. from langgraph.prebuilt import create_react_agent
  2. agent = create_react_agent(
  3.     model=model,
  4.     tools=[],
  5.     prompt="You are a helpful assistant",)
  6. agent.invoke({"messages":[{"role":"user","content":"你是谁?能帮我解决什么问题?"}]})
复制代码
2、提供一些工具的差异

langchain
  1. import datetime
  2. from langchain.tools import tool
  3. from config.load_key import load_key
  4. from langchain_community.chat_models import ChatTongyi
  5. # 构建阿里云百炼大模型客户端
  6. llm = ChatTongyi(
  7.     model="qwen-plus",
  8.     api_key=load_key("BAILIAN_API_KEY"),)# 定义工具 注意要添加注释@tooldefget_current_date():"""获取今天日期"""return datetime.datetime.today().strftime("%Y-%m-%d")# 大模型绑定工具
  9. llm_with_tools = llm.bind_tools([get_current_date])# 工具容器
  10. all_tools ={"get_current_date": get_current_date}# 把所有消息存到一起
  11. query ="今天是几月几号"
  12. messages =[query]# 询问大模型。大模型会判断需要调用工具,并返回结果
  13. ai_msg = llm_with_tools.invoke(messages)print(ai_msg)
  14. messages.append(ai_msg)# 打印需要调用的工具print(ai_msg.tool_calls)if ai_msg.tool_calls:for tool_call in ai_msg.tool_calls:
  15.         selected_tool = all_tools[tool_call["name"].lower()]
  16.         tool_msg = selected_tool.invoke(tool_call)
  17.         messages.append(tool_msg)
  18.     llm_with_tools.invoke(messages).content
复制代码
LangGraph
  1. from langgraph.prebuilt import create_react_agent
  2. agent = create_react_agent(
  3.     model=llm,
  4.     tools=[get_current_date],
  5.     prompt="You are a helpful assistant",)
  6. agent.invoke({"messages":[{"role":"user","content":"今天是几月几号"}]})
复制代码
3、总结

LangGraph框架-3.png


三、LangGraph注意点

1、流式输出

LangGraph框架-4.png


2、工具的调用

@tool(“devide_tool”, return_direct=True)中的return_direct=True是工具执行后的结果将直接作为最终响应返回给用户
  1. from langchain_core.tools import tool
  2. from langgraph.prebuilt import ToolNode
  3. # 定义工具 return_direct=True 表示直接返回工具的结果@tool("devide_tool", return_direct=True)defdevide(a:int, b:int)->float:"""计算两个整数的除法。
  4.    
  5.     Args:
  6.         a (int): 除数
  7.         b (int): 被除数"""# 自定义错误if b ==1:raise ValueError("除数不能为1")return a / b
  8. print(devide.name)print(devide.description)print(devide.args)# 定义工具调用错误处理函数defhandle_tool_error(error: Exception)->str:"""处理工具调用错误。
  9.    
  10.     Args:
  11.         error (Exception): 工具调用错误"""ifisinstance(error, ValueError):return"除数为1没有意义,请重新输入一个除数和被除数。"elifisinstance(error, ZeroDivisionError):return"除数不能为0,请重新输入一个除数和被除数。"returnf"工具调用错误: {error}"
  12. tool_node = ToolNode([devide],
  13.     handle_tool_errors=handle_tool_error
  14. )
  15. agent_with_error_handler = create_react_agent(
  16.     model=llm,
  17.     tools=tool_node
  18. )
  19. result = agent_with_error_handler.invoke({"messages":[{"role":"user","content":"10除以1等于多少"}]})# 打印最后的返回结果print(result["messages"][-1].content)
复制代码
3、消息记忆

LangGraph框架-5.png


1. 短期记忆

LangGraph框架-6.png

  1. from langgraph.checkpoint.memory import InMemorySaver
  2. from langgraph.prebuilt import create_react_agent
  3. checkpoint = InMemorySaver()defget_weather(city:str)->str:"""获取某个城市的天气"""returnf"城市:{city},天气一直都是晴天!"
  4. agent = create_react_agent(
  5.     model=llm,
  6.     tools=[get_weather],
  7.     checkpointer=checkpoint
  8. )# Run the agent
  9. config ={"configurable":{"thread_id":"1"}}
  10. cs_response = agent.invoke({"messages":[{"role":"user","content":"长沙天气怎么样?"}]},
  11.     config
  12. )print(cs_response)# Continue the conversation using the same thread_id
  13. bj_response = agent.invoke({"messages":[{"role":"user","content":"北京呢?"}]},
  14.     config
  15. )print(bj_response)
复制代码
2. 短期记忆内存管理

消息可能出现存多少条内存不够了,需要清除一部分消息,缓解内存压力
LangGraph框架-7.png

  1. from langmem.short_term import SummarizationNode
  2. from langchain_core.messages.utils import count_tokens_approximately
  3. from langgraph.prebuilt import create_react_agent
  4. from langgraph.prebuilt.chat_agent_executor import AgentState
  5. from langgraph.checkpoint.memory import InMemorySaver
  6. from typing import Any
  7. # 使用大模型对历史信息进行总结
  8. summarization_node = SummarizationNode(
  9.     token_counter=count_tokens_approximately,
  10.     model=llm,
  11.     max_tokens=384,# 允许保留在上下文中的原始消息的最大 token 数(超过则触发总结)
  12.     max_summary_tokens=128,# 总结后生成的摘要最多占用的 token 数
  13.     output_messages_key="llm_input_messages",)classState(AgentState):# 注意:这个状态管理的作用是为了能够保存上一次总结的结果。这样就可以防止每次调用大模型时,都要重新总结历史信息。# 这是一个比较常见的优化方式,因为大模型的调用是比较耗时的。
  14.     context:dict[str, Any]
  15. checkpoint = InMemorySaver()
  16. agent = create_react_agent(
  17.     model=llm,
  18.     tools=tools,
  19.     pre_model_hook=summarization_node,
  20.     state_schema=State,
  21.     checkpointer=checkpoint,)
复制代码
LangGraph框架-8.png

  1. # 可以删除某些不重要from langchain_core.messages.utils import(
  2.     trim_messages,
  3.     count_tokens_approximately
  4. )from langgraph.prebuilt import create_react_agent
  5. # This function will be called every time before the node that calls LLMdefpre_model_hook(state):
  6.     trimmed_messages = trim_messages(
  7.         state["messages"],
  8.         strategy="last",
  9.         token_counter=count_tokens_approximately,
  10.         max_tokens=384,
  11.         start_on="human",
  12.         end_on=("human","tool"),)return{"llm_input_messages": trimmed_messages}
  13. checkpoint = InMemorySaver()
  14. agent = create_react_agent(
  15.     model=llm,
  16.     tools=[],
  17.     pre_model_hook=pre_model_hook,
  18.     checkpointer=checkpoint,)
复制代码
3. 短期状态管理机制

可以在消息会话中拿到额外的信息比如用户id
  1. from typing import Annotated
  2. from langgraph.prebuilt import InjectedState, create_react_agent
  3. from langgraph.prebuilt.chat_agent_executor import AgentState
  4. from langchain_core.tools import tool
  5. classCustomState(AgentState):
  6.     user_id:str@tool(return_direct=True)defget_user_info(state: Annotated[CustomState, InjectedState])->str:"""查询用户信息。"""
  7.     user_id = state["user_id"]return"user_123用户的姓名是楼兰。"if user_id =="user_123"else"未知用户"
  8. agent = create_react_agent(
  9.     model=llm,
  10.     tools=[get_user_info],
  11.     state_schema=CustomState,)
  12. agent.invoke({"messages":"查询用户信息","user_id":"user_123"})
复制代码
4. 长期消息记忆

LangGraph框架-9.png

  1. from langchain_core.runnables import RunnableConfig
  2. from langgraph.config import get_store
  3. from langgraph.prebuilt import create_react_agent
  4. from langgraph.store.memory import InMemoryStore
  5. from langchain_core.tools import tool
  6. # 定义长期存储
  7. store = InMemoryStore()# 添加一些测试数据。users是命名空间,user_123是key,后面的JSON数据是value
  8. store.put(("users",),"user_123",{"name":"楼兰","age":"33",})# 定义工具@tool(return_direct=True)defget_user_info(config: RunnableConfig)->str:"""查找用户信息"""# 获取长期存储。获取到了后,这个存储组件可读也可写
  9.     store = get_store()# 获取配置中的用户ID
  10.     user_id = config["configurable"].get("user_id")
  11.     user_info = store.get(("users",), user_id)returnstr(user_info.value)if user_info else"Unknown user"
  12. agent = create_react_agent(
  13.     model=llm,
  14.     tools=[get_user_info],
  15.     store=store,)# Run the agent
  16. agent.invoke({"messages":[{"role":"user","content":"查找用户信息"}]},
  17.     config={"configurable":{"user_id":"user_123"}})
复制代码
4、Human-in-the-loop人类监督(经过人类干预判断是否执行)

LangGraph框架-10.png

  1. from langgraph.checkpoint.memory import InMemorySaver
  2. from langgraph.types import interrupt
  3. from langgraph.prebuilt import create_react_agent
  4. from langchain_core.tools import tool
  5. from langchain_community.chat_models import ChatTongyi
  6. from langgraph.types import Command
  7. llm = ChatTongyi(
  8.     model="qwen-plus",
  9.     api_key="你的key",)# An example of a sensitive tool that requires human review / approval@tool(return_direct=True)defbook_hotel(hotel_name:str):"""预定宾馆"""
  10.     response = interrupt(f"正准备执行'book_hotel'工具预定宾馆,相关参数名:{{'hotel_name': {hotel_name}}}。""请选择OK表示同意,或者选择edit提出补充意见。")if response["type"]=="OK":print("OK")passelif response["type"]=="edit":print("edit")
  11.         hotel_name = response["args"]["hotel_name"]else:raise ValueError(f"Unknown response type: {response['type']}")returnf"成功在 {hotel_name} 预定了一个房间。"
  12. checkpoint = InMemorySaver()
  13. agent = create_react_agent(
  14.     model=llm,
  15.     tools=[book_hotel],
  16.     checkpointer=checkpoint,)
  17. config ={"configurable":{"thread_id":"1"}}for chunk in agent.stream({"messages":[{"role":"user","content":"帮我在图灵宾馆预定一个房间"}]},
  18.     config
  19. ):print(chunk)print("\n")# 只要中间记忆没有被线程刷新就还可以恢复 必须回复后才能问别的print("发送edit吧")for chunk in agent.stream(# Command(resume={"type": "OK"}),
  20.     Command(resume={"type":"edit","args":{"hotel_name":"三号宾馆"}}),
  21.     config
  22. ):print(chunk)print(chunk['tools']['messages'][-1].content)print("\n")
复制代码
同意指令
LangGraph框架-11.png


5、总结

LangGraph框架-12.png


四、Graph图

LangGraph框架-13.png


1、案例一demo
  1. from typing import TypedDict
  2. from langgraph.constants import END, START
  3. from langgraph.graph import StateGraph
  4. classInputState(TypedDict):
  5.     user_input:strclassOutputState(TypedDict):
  6.     graph_output:strclassOverallState(TypedDict):
  7.     foo:str
  8.     user_input:str
  9.     graph_output:strclassPrivateState(TypedDict):
  10.     bar:strdefnode_1(state: InputState)-> OverallState:# Write to OverallStatereturn{"foo": state["user_input"]+"->学院"}defnode_2(state: OverallState)-> PrivateState:# Read from OverallState, write to PrivateStatereturn{"bar": state["foo"]+"->非常"}defnode_3(state: PrivateState)-> OutputState:# Read from PrivateState, write to OutputStatereturn{"graph_output": state["bar"]+"->靠谱"}# 构建图
  11. builder = StateGraph(OverallState,input=InputState, output=OutputState)# 添加Node
  12. builder.add_node("node_1", node_1)
  13. builder.add_node("node_2", node_2)
  14. builder.add_node("node_3", node_3)# 添加Edge
  15. builder.add_edge(START,"node_1")
  16. builder.add_edge("node_1","node_2")
  17. builder.add_edge("node_2","node_3")
  18. builder.add_edge("node_3", END)# 编译图
  19. graph = builder.compile()# 调用图
  20. graph.invoke({"user_input":"图灵"})
复制代码
2、显示案例一流程图

LangGraph框架-14.png

LangGraph框架-15.png


3、三个主要组件

1. State状态

State是所有节点共享的状态,它是一个字典,包含了所有节点的状态。有几个需要注意的地方:
    State形式上,可以是TypedDict字典,也可以是Pydantic中的一个BaseModel。例如:
  1. from pydantic import BaseModel
  2. # The overall state of the graph (this is the public state shared across nodes)classOverallState(BaseModel):
  3.     a:str
复制代码
这两种实现,本质上没有太多的区别。
    State中定义的属性,通常不需要指定默认值。如果需要默认值,可以通过在START节点后,定义一个node来指定默认值。
  1. defnode(state: OverallState):return{"a":"goodbye"}
复制代码
    State中的属性,除了可以修改值之外,也可以定义一些操作。来指定如何更新State中的值。例如
  1. from langgraph.graph.message import add_messages
  2. """
  3. messages 和 list_field 使用特定操作实现累加更新;
  4. extra_field 使用默认的覆盖更新。
  5. """classState(TypedDict):
  6.     messages: Annotated[List[AnyMessage], add_messages]
  7.     list_field: Annotated[list[int], add]
  8.     extra_field:int
复制代码
此时,如果有一个node,返回了State中更新的值,那么messages和list_field的值就会添加到原有的旧集合中,而extra_field的值则会被替换。
  1. from langchain_core.messages import AnyMessage, AIMessage
  2. from langgraph.graph import StateGraph
  3. from langgraph.graph.message import add_messages
  4. from typing import Annotated, TypedDict
  5. from operator import add
  6. classState(TypedDict):
  7.     messages: Annotated[List[AnyMessage], add_messages]
  8.     list_field: Annotated[list[int], add]
  9.     extra_field:intdefnode1(state: State):
  10.     new_message = AIMessage("Hello!")return{"messages":[new_message],"list_field":[10],"extra_field":10}defnode2(state: State):
  11.     new_message = AIMessage("LangGraph!")return{"messages":[new_message],"list_field":[20],"extra_field":20}
  12. graph =(
  13.     StateGraph(State).add_node("node1", node1).add_node("node2", node2).set_entry_point("node1").add_edge("node1","node2").compile())
  14. input_message ={"role":"user","content":"Hi"}
  15. result = graph.invoke({"messages":[input_message],"list_field":[1,2,3]})print(result)# for message in result["messages"]:#     message.pretty_print()# print(result["extra_field"])
复制代码
在LangGraph的应用当中,State通常都会保存聊天消息。为此,LangGraph中还提供了一个langgraph.graph.MessagesState,可以用来快速保存消息。他的声明方式就是这样的:
  1. classMessagesState(TypedDict):
  2.     messages: Annotated[list[AnyMessage], add_messages]
复制代码
然后,对于Messages,也可以用序列化的方式来声明,例如下面两种方式都是可以的
  1. # 一{"messages":[HumanMessage(content="message")]}# 二{"messages":[{"type":"user","content":"message"}]}
复制代码
2. Node节点

Node是图中的一个处理数据的节点。也有以下几个需要注意的地方:
    在LangGraph中,Node通常是一个Python的函数,它接受一个State对象作为输入,返回一个State对象作为输出。每个Node都有一个唯一的名称,通常是一个字符串。如果没有提供名称,LangGraph会自动生成一个和函数名一样的名称。在具体实现时,通常包含两个具体的参数,第一个是State,这个是必选的。第二个是一个可选的配置项config。这里面包含了一些节点运行的配置参数。LangGraph对每个Node提供了缓存机制。只要Node的传入参数相同,LangGraph就会优先从缓存当中获取Node的执行结果。从而提升Node的运行速度。
  1. from typing import TypedDict
  2. from langchain_core.runnables import RunnableConfig
  3. from langgraph.constants import START, END
  4. from langgraph.graph import StateGraph
  5. from langgraph.types import CachePolicy
  6. from langgraph.cache.memory import InMemoryCache  # 是langgraph中的,而不是langchain中的。# 配置状态classState(TypedDict):
  7.     number:int
  8.     user_id:str# 配置信息classConfigSchema(TypedDict):
  9.     user_id:strdefnode_1(state: State, config: RunnableConfig):
  10.     time.sleep(3)
  11.     user_id = config["configurable"]["user_id"]return{"number": state["number"]+1,"user_id": user_id}
  12. builder = StateGraph(State, config_schema=ConfigSchema)# Node缓存5秒
  13. builder.add_node("node1", node_1, cache_policy=CachePolicy(ttl=5))
  14. builder.add_edge(START,"node1")
  15. builder.add_edge("node1", END)# graph = builder.compile(cache=InMemoryCache())缓存
  16. graph = builder.compile(cache=InMemoryCache())print(graph.invoke({"number":5}, config={"configurable":{"user_id":"123"}}, stream_mode='updates'))# {'node1': {'number': 6, 'user_id': '123'}}# node入参相同===也就是{"number": 5},就会走缓存print(graph.invoke({"number":5}, config={"configurable":{"user_id":"456"}}, stream_mode='updates'))# [{'node1': {'number': 6, 'user_id': '123'}, '_metadata_': {'cached': 'True'}}]
复制代码
    对于Node,LangGraph除了提供缓存机制,还提供了重试机制。
    可以针对单个节点指定,例如:
  1. from langgraph.types import RetryPolicy
  2. builder.add_node("node1", node_1, retry=RetryPolicy(max_attempts=4))
复制代码
    另外,也可以针对某一次任务调用指定,例如
  1. # 图最多执行 25 步如果 25 步内没走到 END,就会抛出异常,防止无限运行。print(graph.invoke(xxxxx, config={"recursion_limit":25}))
复制代码
3. Edge边

在Graph图中,通过Edge(边)把Node(节点)连接起来,从而决定State应该如何在Graph中传递。LangGraph中也提供了非常灵活的构建方式。
    普通Edge和EntryPoint
    Edge通常是用来把两个Node连接起来,形成逻辑处理路线。例如 graph.add_edge("node_1","node_2") 。
    LangGraph中提供了两个默认的Node,START和END,用来作为Graph的入口和出口。
    同时,也可以自行指定EntryPoint。例如
  1. builder = StateGraph(State)
  2. builder.set_entry_point("node1")
  3. builder.set_finish_point("node2")
复制代码
    条件Edge和EntryPoint
    我们也可以添加带有条件判断的Edge和EntryPoint,用来动态构建更复杂的工作流程。
    具体实现时,可以指定一个函数,函数的返回值就可以是下一个Node的名称。
  1. from typing import TypedDict
  2. from langchain_core.runnables import RunnableConfig
  3. from langgraph.constants import START, END
  4. from langgraph.graph import StateGraph
  5. # 配置状态classState(TypedDict):
  6.     number:intdefnode_1(state: State, config: RunnableConfig):return{"number": state["number"]+1}
  7. builder = StateGraph(State)# Node缓存5秒
  8. builder.add_node("node1", node_1)defrouting_func(state: State)->str:if state["number"]>5:return"node1"else:return END
  9. builder.add_edge("node1", END)# 添加条件边
  10. builder.add_conditional_edges(START, routing_func)
  11. graph = builder.compile()print(graph.invoke({"number":4}))
复制代码
LangGraph框架-16.png


    另外,如果不想在路由函数中写出过多具体的节点名称,也可以在函数中返回一个自定义的结果,然后将这个结果解析到某一个具体的Node上。例如
  1. defrouting_func(state: State)->bool:if state["number"]>5:returnTrueelse:returnFalse
  2. builder.add_conditional_edges(START, routing_func,{True:"node_a",False:"node_b"})
复制代码
    Send动态路由
    在条件边中,如果希望一个Node后同时路由到多个Node,就可以返回Send动态路由的方式实现。
Send对象可传入两个参数,第一个是下一个Node的名称,第二个是Node的输入。
  1. from operator import add
  2. from typing import TypedDict, Annotated
  3. from langgraph.constants import START, END
  4. from langgraph.graph import StateGraph
  5. from langgraph.types import Send
  6. # 配置状态classState(TypedDict):
  7.     messages: Annotated[list[str], add]classPrivateState(TypedDict):
  8.     msg:strdefnode_1(state: PrivateState)-> State:
  9.     res = state["msg"]+"!"return{"messages":[res]}
  10. builder = StateGraph(State)# Node缓存5秒
  11. builder.add_node("node1", node_1)defrouting_func(state: State):
  12.     result =[]for message in state["messages"]:
  13.         result.append(Send("node1",{"msg": message}))return result
  14. # 通过路由函数,将消息中每个字符串分别传入node1处理。
  15. builder.add_conditional_edges(START, routing_func,["node1"])
  16. builder.add_edge("node1", END)
  17. graph = builder.compile()print(graph.invoke({"messages":["hello","world","hello","graph"]}))
复制代码
LangGraph框架-17.png


    Command命令
    通常,Graph中一个典型的业务步骤是State进入一个Node处理。在Node中先更新State状态,然后再通过Edges传递给下一个Node。如果希望将这两个步骤合并为一个命令,那么还可以使用Command命令
  1. from operator import add
  2. from typing import TypedDict, Annotated
  3. from langgraph.constants import START, END
  4. from langgraph.graph import StateGraph
  5. from langgraph.types import Command
  6. # 配置状态classState(TypedDict):
  7.     messages: Annotated[list[str], add]defnode_1(state: State):
  8.     new_message =[]for message in state["messages"]:
  9.         new_message.append(message +"!")return Command(
  10.         goto=END,
  11.         update={"messages": new_message})
  12. builder = StateGraph(State)
  13. builder.add_node("node1", node_1)# node1中通过Command同时集成了更新State和指定下一个Node
  14. builder.add_edge(START,"node1")
  15. graph = builder.compile()print(graph.invoke({"messages":["hello","world","hello","graph"]}))# {'messages': ['hello', 'world', 'hello', 'graph', 'hello!', 'world!', 'hello!', 'graph!']}
复制代码
4、子图

一个Graph除了可以单独使用,还可以作为一个Node,嵌入到另外一个Graph中,这种用法就叫做子图。
使用子图时,基本和使用Node没有太多的区别
唯一需要注意的是,当触发了SubGraph代表Node后,实际上是相当于重新调用依次subgraph.invoke(state)方法
  1. # subgraph与graph使用相同Statefrom operator import add
  2. from typing import TypedDict, Annotated
  3. from langgraph.constants import END
  4. from langgraph.graph import StateGraph, MessagesState, START
  5. classState(TypedDict):
  6.     messages: Annotated[list[str], add]# Subgraphdefsub_node_1(state: State)-> MessagesState:return{"messages":["response from subgraph"]}
  7. subgraph_builder = StateGraph(State)
  8. subgraph_builder.add_node("sub_node_1", sub_node_1)
  9. subgraph_builder.add_edge(START,"sub_node_1")
  10. subgraph_builder.add_edge("sub_node_1", END)
  11. subgraph = subgraph_builder.compile()# Parent graph
  12. builder = StateGraph(State)
  13. builder.add_node("subgraph_node", subgraph)
  14. builder.add_edge(START,"subgraph_node")
  15. builder.add_edge("subgraph_node", END)
  16. graph = builder.compile()print(graph.invoke({"messages":["hello subgraph"]}))# 结果hello subgraph会出现两次。这是因为在subgraph_node中默认调用了一次subgraph.invoke(state)方法。主图里也调用了一次invoke。这就会往state中添加两次语句# {'messages': ['hello subgraph', 'hello subgraph', 'response from subgraph']}# 解决方案就是主图不要和子图不使用同一个状态管理State 也就是四、1。 案例一demo中的方式
复制代码
5、图的Stream支持

和调用大模型相似,Graph除了可以通过invoke方法进行直接调用外,也支持通过stream()方法进行流式调用。不过大模型的流式调用是依次返回大模型响应的Token。而Graph的流式输出则是依次返回数据处理步骤。
graph提供了stream()方法进行同步的流式调用,也提供了astream()方法进行异步的流式调用。
  1. for chunk in graph.stream({"messages":["hello subgraph"]}, stream_mode="debug"):print(chunk)# {'subgraph_node': {'messages': ['hello subgraph', 'response from subgraph']}}
复制代码
LangGraph支持几种不同的stream mode:
    values:在图的每一步之后流式传输状态的完整值。updates:在图的每一步之后,将更新内容流式传输到状态。如果在同一步骤中进行了多次更新(例如,运行了多个节点),这些更新将分别进行流式传输。custom:从图节点内部流式传输自定义数据。通常用于调试。messages:从任何调用大语言模型(LLM)的图节点中,流式传输二元组(LLM的Token,元数据)。debug:在图的执行过程中尽可能多地传输信息。用得比较少。
values、updates、debug输出模式,使用之前案例验证,就能很快感受到其中的区别。
messages输出模式,由于在之前案例中并没有调用大模型,所以不会有输出结果。
而custom输出模式,可以自定义输出内容。在Node节点内或者Tools工具内,通过get_stream_writer()方法获取一个StreamWriter对象,然后使用write方法将自定义数据写入流中。
  1. from typing import TypedDict
  2. from langgraph.config import get_stream_writer
  3. from langgraph.graph import StateGraph, START
  4. classState(TypedDict):
  5.     query:str
  6.     answer:strdefnode(state: State):
  7.     writer = get_stream_writer()
  8.     writer({"自定义key":"在节点内返回自定义信息"})return{"answer":"some data"}
  9. graph =(
  10.     StateGraph(State).add_node(node).add_edge(START,"node").compile())
  11. inputs ={"query":"example"}# Usagefor chunk in graph.stream(inputs, stream_mode="custom"):print(chunk)
复制代码
最后,在langChain中,构建LLM对象时,大都支持desable_streaming属性,禁止流式输出,那么就无法使用上面的流式输出。例如:
  1. llm = ChatOpenAI(model="", disable_streaming=True)
复制代码
五、构建多智能体工作流

1、流式输出大模型调用结果

在介绍Graph的流式输出时,我们提到LangGraph的Graph流式输出有几种不同的模式,其中有一种messages模式,是用来监控大语言模型的Token记录的。这里我们就可以来测试一下。
  1. from langchain_community.chat_models import ChatTongyi
  2. # 构建阿里云百炼大模型客户端
  3. llm = ChatTongyi(
  4.     model="qwen-plus",
  5.     api_key="",)from langgraph.graph import StateGraph, MessagesState, START
  6. defcall_model(state: MessagesState):
  7.     response = llm.invoke(state["messages"])return{"messages": response}
  8. builder = StateGraph(MessagesState)
  9. builder.add_node(call_model)
  10. builder.add_edge(START,"call_model")
  11. graph = builder.compile()for chunk in graph.stream({"messages":[{"role":"user","content":"湖南的省会是哪里?"}]},
  12.     stream_mode="messages",):print(chunk)
复制代码
通常,如果要对大模型调用成本进行统计时,这种messages就是比较好的一种方式。
2、大模型消息持久化

和之前介绍LangGraph的Agent相似,Graph图也支持构建消息的持久化功能。并且也通常支持通过checkpoint构建短期记忆,以store构建长期记忆。这里短期记忆和长期记忆,都是可以通过内存或者数据库进行持久化保存的。不过短期记忆更倾向于通过对消息的短期存储,实现多轮对话的效果。而长期记忆则倾向于对消息长期存储后支持语义检索。
  1. from langchain_community.chat_models import ChatTongyi
  2. # 构建阿里云百炼大模型客户端
  3. llm = ChatTongyi(
  4.     model="qwen-plus",
  5.     api_key="",)from langgraph.graph import StateGraph, MessagesState, START
  6. from langgraph.checkpoint.memory import InMemorySaver
  7. defcall_model(state: MessagesState):
  8.     response = llm.invoke(state["messages"])return{"messages": response}
  9. builder = StateGraph(MessagesState)
  10. builder.add_node(call_model)
  11. builder.add_edge(START,"call_model")
  12. checkpointer = InMemorySaver()
  13. graph = builder.compile(checkpointer=checkpointer)
  14. config ={"configurable":{"thread_id":"1"}}for chunk in graph.stream({"messages":[{"role":"user","content":"湖南的省会是哪里?"}]},
  15.     config,
  16.     stream_mode="values",):
  17.     chunk["messages"][-1].pretty_print()for chunk in graph.stream({"messages":[{"role":"user","content":"湖北呢?"}]},
  18.     config,
  19.     stream_mode="values",):
  20.     chunk["messages"][-1].pretty_print()
复制代码
3、Human-In-Loop人类干预

在LangGraph中也可以通过中断任务,等待确认的方式,来实现过程干预,这样能够更好的减少大语言模型的结果不稳定给任务带来的影响。
在具体实现人类干预时,需要注意一下几点:
    必须指定一个checkpointers短期记忆,否则无法保存任务状态。在执行Graph任务时,必须指定一个带有thread_id的配置项,指定线程ID。之后才能通过线程ID,指定恢复线程。在任务执行过程中,通过interrupt()方法,中断任务,等待确认。在人类确认之后,使用Graph提交一个resume=True的Command指令,恢复任务,并继续进行。
这种实现方式,在之前介绍LangGraph构建单Agent时已经介绍过,不过,结合Graph的State,在多个Node之间进行复杂控制,这样更能体现出人类监督的价值。
例如,下面的案例可以实现这样一种典型的人类确认:

LangGraph框架-18.png

  1. # 构建一个带有Human-In-Loop的图from operator import add
  2. from langchain_core.messages import AnyMessage
  3. from langgraph.checkpoint.memory import InMemorySaver
  4. from langgraph.constants import START, END
  5. from langgraph.graph import StateGraph
  6. from langchain_community.chat_models import ChatTongyi
  7. # 构建阿里云百炼大模型客户端
  8. llm = ChatTongyi(
  9.     model="qwen-plus",
  10.     api_key="",)from typing import Literal, TypedDict, Annotated
  11. from langgraph.types import interrupt, Command
  12. classState(TypedDict):
  13.     messages: Annotated[list[AnyMessage], add]defhuman_approval(state: State)-> Command[Literal["call_llm", END]]:
  14.     is_approved = interrupt({"question":"是否同意调用大语言模型?"})if is_approved:return Command(goto="call_llm")else:return Command(goto=END)defcall_llm(state: State):
  15.     response = llm.invoke(state["messages"])return{"messages":[response]}
  16. builder = StateGraph(State)# Add the node to the graph in an appropriate location# and connect it to the relevant nodes.
  17. builder.add_node("human_approval", human_approval)
  18. builder.add_node("call_llm", call_llm)
  19. builder.add_edge(START,"human_approval")
  20. checkpointers = InMemorySaver()
  21. graph = builder.compile(checkpointer=checkpointers)from langchain_core.messages import HumanMessage
  22. # 提交任务,等待确认
  23. thread_config ={"configurable":{"thread_id":1}}
  24. graph.invoke({"messages":[HumanMessage("湖南的省会是哪里?")]}, config=thread_config)# 执行后会中断任务,等待确认# 确认同意,继续执行任务
  25. final_result = graph.invoke(Command(resume=True), config=thread_config)print(final_result)# 不同意,终止任务# final_result = graph.invoke(Command(resume=False), config=thread_config)# print(final_result)
复制代码
ps:
任务中断和恢复,需要保持相同的thread_id。通常应用当中都会单独生成一个随机的thread_id,保证唯一的同时,防止其他任务干扰。
interrupt()方法中中断任务的时间不能过长,过长了之后就无法恢复任务了。
任务确认时,Command中传递的resume可以是简单的True或False,也可以是一个字典。通过字典可以进行更多的判断。
4、Time Travel时间回溯

由于大语言模型回答问题的不确定性,基于大语言模型构建的应用,也是充满不确定性的。而对于这种不确定性的系统,就有必要进行更精确的检查。当某一个步骤出现问题时,才能及时发现问题,并对出问题的那个步骤进行重演。为此,LangGraph提供了Time Travel时间回溯功能,可以保存Graph的运行过程,并可以手动指定从Graph的某一个Node开始进行重演。
具体实现时,需要注意以下几点:
    在运行Graph时,需要提供初始的输入消息。运行时,指定thread_id线程ID。并且要基于这个线程ID,再指定一个checkpoint检查点。执行后将在每一个Node执行后,生成一个check_point_id。指定thread_id和check_point_id,进行任务重演。重演前,可以选择更新state,当然,如果没问题,也可以不指定。
构建图
LangGraph框架-19.png


查看检查点
LangGraph框架-20.png

  1. # 构建一个图。图中两个步骤:第一步让大模型推荐一个有名的作家,第二步,让大模型用推荐的作家的风格写一个100字以内的笑话。from typing import TypedDict
  2. from typing_extensions import NotRequired
  3. from langgraph.checkpoint.memory import InMemorySaver
  4. from langgraph.constants import START, END
  5. from langgraph.graph import StateGraph
  6. import uuid
  7. from langchain_community.chat_models import ChatTongyi
  8. # 构建阿里云百炼大模型客户端
  9. llm = ChatTongyi(
  10.     model="qwen-plus",
  11.     api_key="",)classState(TypedDict):
  12.     author: NotRequired[str]
  13.     joke: NotRequired[str]defauthor_node(state: State):
  14.     prompt ="帮我推荐一位受人们欢迎的作家。只需要给出作家的名字即可。"
  15.     author = llm.invoke(prompt)return{"author": author}defjoke_node(state: State):
  16.     prompt =f"用作家:{state['author']} 的风格,写一个100字以内的笑话"
  17.     joke = llm.invoke(prompt)return{"joke": joke}
  18. builder = StateGraph(State)
  19. builder.add_node(author_node)
  20. builder.add_node(joke_node)
  21. builder.add_edge(START,"author_node")
  22. builder.add_edge("author_node","joke_node")
  23. builder.add_edge("joke_node", END)
  24. checkpointers = InMemorySaver()
  25. graph = builder.compile(checkpointer=checkpointers)
  26. graph
  27. # 正常执行一个图
  28. config ={"configurable":{"thread_id": uuid.uuid4(),}}
  29. state = graph.invoke({}, config)print(state["author"])print()print(state["joke"])# 查看所有checkpoint检查点
  30. states =list(graph.get_state_history(config))for state in states:print(state.next)print(state.config["configurable"]["checkpoint_id"])print()# 选定某一个检查点。这里选择author_node,让大模型重新推荐作家# 1也就是auth那个节点开始
  31. selected_state = states[1]print(selected_state.next)print(selected_state.values)# 为了后面的重演,更新state。可选步骤:
  32. new_config = graph.update_state(selected_state.config, values={"author":"郭德纲"})print(new_config)# 接下来,指定thread_id和checkpoint_id,进行重演
  33. graph.invoke(None, new_config)
复制代码
5、多智能体架构

可以看到,在LangChain体系中,LangChain主要集成了和大语言模型交互的能力,而LangGraph主要实现了复杂的流程调度。将这两个能力结合起来,一个强大的多智能体构建框架就已经成型了。
接下来,我们就用LangGraph来实现一个非常典型的多智能体架构,作为一个完整的案例。
    这个机器人可以通过一个supervisor节点,对用户的输入进行分类,然后根据分类结果,选择不同的agent节点进行处理。接下来每个Agent节点,都可以选择不同的工具进行处理,最后将处理结果汇总,返回给supervisor节点。supervisor节点再将结果返回给用户。

    LangGraph框架-21.png

Director.py 比较懒,自己去加其他的,这里只演示笑话的这个
  1. from operator import add
  2. from typing import TypedDict, Annotated
  3. from langchain_community.chat_models import ChatTongyi
  4. from langchain_core.messages import AnyMessage, HumanMessage, AIMessage
  5. from langgraph.checkpoint.memory import InMemorySaver
  6. from langgraph.config import get_stream_writer
  7. from langgraph.constants import START, END
  8. from langgraph.graph import StateGraph
  9. llm = ChatTongyi(
  10.     model="qwen-plus",
  11.     api_key="")
  12. nodes =["supervisor","travel","couplet","joke","other"]classState(TypedDict):
  13.     messages: Annotated[list[AnyMessage], add]type:strdefother_node(state: State):print(">>> other_node")
  14.     writer = get_stream_writer()
  15.     writer({"node":">>>> other_node"})return{"messages":[HumanMessage(content="我暂时无法回答这个问题")],"type":"other"}deftravel_node(state: State):print(">>> travel_node")
  16.     writer = get_stream_writer()
  17.     writer({"node":">>>> travel_node"})return{"messages":[HumanMessage(content="travel_node")],"type":"travel"}defsupervisor_node(state: State):print(">>> supervisor_node")
  18.     writer = get_stream_writer()
  19.     writer({"node":">>>> supervisor_node"})# 跟据用户问题,对问题分类。分类结果保存在type中# 根据用户的问题,对问题进行分类。分类结果保存到type当中
  20.     prompt ="""你是一个专业的客服助手,负责对用户的问题进行分类,并将任务分给其他Agent执行。
  21.     如果用户的问题是和旅游路线规划相关的,那就返回 travel 。
  22.     如果用户的问题是希望讲一个笑话,那就返回 joke 。
  23.     如果用户的问题是希望对一个对联,那就返回 couplet 。
  24.     如果是其他的问题,返回 other 。
  25.     除了这几个选项外,不要返回任何其他的内容。"""
  26.     prompts =[{"role":"system","content": prompt},{"role":"user","content": state["messages"][0]}]# 如果有type的话那么就是已经交给对应的路由处理完成,可以直接返回if"type"in state:
  27.         writer({"supervisor_step":f"已获取{state['type']} 智能处理结果"})return{"type": END}else:
  28.         resource = llm.invoke(prompts)
  29.         typeRes = resource.content
  30.         writer({"supervisor_step":f"问题分类结果{typeRes}"})if typeRes in nodes:return{"type": typeRes}else:raise ValueError("type is not type in (supervisor, travel, couplet, joke, other)")defjoke_node(state: State):print(">>> joke_node")
  31.     writer = get_stream_writer()
  32.     writer({"node":">>>> joke_node"})
  33.     system_prompt ="你是一个笑话大师,根据用户的问题,写一个不超过100个字的笑话。"
  34.     prompts =[{"role":"system","content": system_prompt},{"role":"user","content": state["messages"][0]}]
  35.     response = llm.invoke(prompts)return{"messages":[AIMessage(content=response.content)],"type":"joke"}defcouplet_node(state: State):print(">>> couplet_node")
  36.     writer = get_stream_writer()
  37.     writer({"node":">>>> couplet_node"})return{"messages":[HumanMessage(content="couplet_node")],"type":"couplet"}#  条件路由defrouting_func(state: State):if state["type"]=="travel":return"travel_node"elif state["type"]=="joke":return"joke_node"elif state["type"]=="couplet":return"couplet_node"elif state["type"]== END:return END
  38.     else:return"other_node"# 构件图
  39. builder = StateGraph(State)# 添加节点
  40. builder.add_node("supervisor_node", supervisor_node)
  41. builder.add_node("travel_node", travel_node)
  42. builder.add_node("joke_node", joke_node)
  43. builder.add_node("couplet_node", couplet_node)
  44. builder.add_node("other_node", other_node)# 添加Edge
  45. builder.add_edge(START,"supervisor_node")
  46. builder.add_conditional_edges("supervisor_node", routing_func,["travel_node","joke_node","couplet_node","other_node", END])
  47. builder.add_edge("travel_node","supervisor_node")
  48. builder.add_edge("joke_node","supervisor_node")
  49. builder.add_edge("couplet_node","supervisor_node")
  50. builder.add_edge("other_node","supervisor_node")# 构建Graph
  51. checkpointer = InMemorySaver()
  52. graph = builder.compile(checkpointer=checkpointer)# 执行任务测试代码if __name__ =="__main__":
  53.     config ={"configurable":{"thread_id":"1"}}# 选中路由回答# for chunk in graph.stream(#         input={"messages": ["给我讲一个郭德纲的笑话"]},#         config=config,#         stream_mode="custom"# ):#     print(chunk)# 无法回答# for chunk in graph.stream(#         input={"messages": ["今天天气怎么样"]},#         config=config,#         stream_mode="values"# ):#     print(chunk)# 最终获取的答案就是# res = graph.invoke(input={"messages": ["今天天气怎么样"]}, config=config)# print(res["messages"][-1].content)
复制代码
调用
  1. import uuid
  2. from Director import graph
  3. query ="讲一个郭德纲的笑话"
  4. config ={"configurable":{"thread_id": uuid.uuid4()}}
  5. res = graph.invoke(input={"messages":[query]}, config=config)print(res["messages"][-1].content)
复制代码
原文地址:https://blog.csdn.net/weixin_49390750/article/details/152042999
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

发布主题
阅读排行更多+

Powered by Discuz! X3.4© 2001-2013 Discuz Team.( 京ICP备17022993号-3 )