作者:大海里行船
课程地址
课程地址
一、介绍
二、快速体验LangGraph和langchain的差异
- pip install langgraph
- ## 后面的案例中会用到langchain,所以同时也安装下langchain的依赖
- pip install langchain
- pip install langchain_community
复制代码 1、比较创建智能体的差异
就是包装了一层Langchain
Langchain构建
- # 制定OpenAI的API_KEY。ifnot os.environ.get("OPENAI_API_KEY"):
- os.environ["OPENAI_API_KEY"]= load_key("OPENAI_API_KEY")from langchain.chat_models import init_chat_model
- # 创建访问OpenAI的Model。# model = init_chat_model("gpt-4o-mini", model_provider="openai")# openai在国内是无法直接访问的,需要科学上网。这里指定base_url是因为使用的是openai的国内代理,2233.ai。
- model = init_chat_model("gpt-4o-mini", model_provider="openai", base_url="https://api.gptsapi.net/v1")
- model.invoke("你是谁?能帮我解决什么问题?")
复制代码LangGraph构建
- from langgraph.prebuilt import create_react_agent
- agent = create_react_agent(
- model=model,
- tools=[],
- prompt="You are a helpful assistant",)
- agent.invoke({"messages":[{"role":"user","content":"你是谁?能帮我解决什么问题?"}]})
复制代码 2、提供一些工具的差异
langchain
- import datetime
- from langchain.tools import tool
- from config.load_key import load_key
- from langchain_community.chat_models import ChatTongyi
- # 构建阿里云百炼大模型客户端
- llm = ChatTongyi(
- model="qwen-plus",
- api_key=load_key("BAILIAN_API_KEY"),)# 定义工具 注意要添加注释@tooldefget_current_date():"""获取今天日期"""return datetime.datetime.today().strftime("%Y-%m-%d")# 大模型绑定工具
- llm_with_tools = llm.bind_tools([get_current_date])# 工具容器
- all_tools ={"get_current_date": get_current_date}# 把所有消息存到一起
- query ="今天是几月几号"
- messages =[query]# 询问大模型。大模型会判断需要调用工具,并返回结果
- ai_msg = llm_with_tools.invoke(messages)print(ai_msg)
- messages.append(ai_msg)# 打印需要调用的工具print(ai_msg.tool_calls)if ai_msg.tool_calls:for tool_call in ai_msg.tool_calls:
- selected_tool = all_tools[tool_call["name"].lower()]
- tool_msg = selected_tool.invoke(tool_call)
- messages.append(tool_msg)
- llm_with_tools.invoke(messages).content
复制代码LangGraph
- from langgraph.prebuilt import create_react_agent
- agent = create_react_agent(
- model=llm,
- tools=[get_current_date],
- prompt="You are a helpful assistant",)
- agent.invoke({"messages":[{"role":"user","content":"今天是几月几号"}]})
复制代码 3、总结
三、LangGraph注意点
1、流式输出
2、工具的调用
@tool(“devide_tool”, return_direct=True)中的return_direct=True是工具执行后的结果将直接作为最终响应返回给用户- from langchain_core.tools import tool
- from langgraph.prebuilt import ToolNode
- # 定义工具 return_direct=True 表示直接返回工具的结果@tool("devide_tool", return_direct=True)defdevide(a:int, b:int)->float:"""计算两个整数的除法。
-
- Args:
- a (int): 除数
- b (int): 被除数"""# 自定义错误if b ==1:raise ValueError("除数不能为1")return a / b
- print(devide.name)print(devide.description)print(devide.args)# 定义工具调用错误处理函数defhandle_tool_error(error: Exception)->str:"""处理工具调用错误。
-
- Args:
- error (Exception): 工具调用错误"""ifisinstance(error, ValueError):return"除数为1没有意义,请重新输入一个除数和被除数。"elifisinstance(error, ZeroDivisionError):return"除数不能为0,请重新输入一个除数和被除数。"returnf"工具调用错误: {error}"
- tool_node = ToolNode([devide],
- handle_tool_errors=handle_tool_error
- )
- agent_with_error_handler = create_react_agent(
- model=llm,
- tools=tool_node
- )
- result = agent_with_error_handler.invoke({"messages":[{"role":"user","content":"10除以1等于多少"}]})# 打印最后的返回结果print(result["messages"][-1].content)
复制代码 3、消息记忆
1. 短期记忆
- from langgraph.checkpoint.memory import InMemorySaver
- from langgraph.prebuilt import create_react_agent
- checkpoint = InMemorySaver()defget_weather(city:str)->str:"""获取某个城市的天气"""returnf"城市:{city},天气一直都是晴天!"
- agent = create_react_agent(
- model=llm,
- tools=[get_weather],
- checkpointer=checkpoint
- )# Run the agent
- config ={"configurable":{"thread_id":"1"}}
- cs_response = agent.invoke({"messages":[{"role":"user","content":"长沙天气怎么样?"}]},
- config
- )print(cs_response)# Continue the conversation using the same thread_id
- bj_response = agent.invoke({"messages":[{"role":"user","content":"北京呢?"}]},
- config
- )print(bj_response)
复制代码 2. 短期记忆内存管理
消息可能出现存多少条内存不够了,需要清除一部分消息,缓解内存压力
- from langmem.short_term import SummarizationNode
- from langchain_core.messages.utils import count_tokens_approximately
- from langgraph.prebuilt import create_react_agent
- from langgraph.prebuilt.chat_agent_executor import AgentState
- from langgraph.checkpoint.memory import InMemorySaver
- from typing import Any
- # 使用大模型对历史信息进行总结
- summarization_node = SummarizationNode(
- token_counter=count_tokens_approximately,
- model=llm,
- max_tokens=384,# 允许保留在上下文中的原始消息的最大 token 数(超过则触发总结)
- max_summary_tokens=128,# 总结后生成的摘要最多占用的 token 数
- output_messages_key="llm_input_messages",)classState(AgentState):# 注意:这个状态管理的作用是为了能够保存上一次总结的结果。这样就可以防止每次调用大模型时,都要重新总结历史信息。# 这是一个比较常见的优化方式,因为大模型的调用是比较耗时的。
- context:dict[str, Any]
- checkpoint = InMemorySaver()
- agent = create_react_agent(
- model=llm,
- tools=tools,
- pre_model_hook=summarization_node,
- state_schema=State,
- checkpointer=checkpoint,)
复制代码
- # 可以删除某些不重要from langchain_core.messages.utils import(
- trim_messages,
- count_tokens_approximately
- )from langgraph.prebuilt import create_react_agent
- # This function will be called every time before the node that calls LLMdefpre_model_hook(state):
- trimmed_messages = trim_messages(
- state["messages"],
- strategy="last",
- token_counter=count_tokens_approximately,
- max_tokens=384,
- start_on="human",
- end_on=("human","tool"),)return{"llm_input_messages": trimmed_messages}
- checkpoint = InMemorySaver()
- agent = create_react_agent(
- model=llm,
- tools=[],
- pre_model_hook=pre_model_hook,
- checkpointer=checkpoint,)
复制代码 3. 短期状态管理机制
可以在消息会话中拿到额外的信息比如用户id
- from typing import Annotated
- from langgraph.prebuilt import InjectedState, create_react_agent
- from langgraph.prebuilt.chat_agent_executor import AgentState
- from langchain_core.tools import tool
- classCustomState(AgentState):
- user_id:str@tool(return_direct=True)defget_user_info(state: Annotated[CustomState, InjectedState])->str:"""查询用户信息。"""
- user_id = state["user_id"]return"user_123用户的姓名是楼兰。"if user_id =="user_123"else"未知用户"
- agent = create_react_agent(
- model=llm,
- tools=[get_user_info],
- state_schema=CustomState,)
- agent.invoke({"messages":"查询用户信息","user_id":"user_123"})
复制代码 4. 长期消息记忆
- from langchain_core.runnables import RunnableConfig
- from langgraph.config import get_store
- from langgraph.prebuilt import create_react_agent
- from langgraph.store.memory import InMemoryStore
- from langchain_core.tools import tool
- # 定义长期存储
- store = InMemoryStore()# 添加一些测试数据。users是命名空间,user_123是key,后面的JSON数据是value
- store.put(("users",),"user_123",{"name":"楼兰","age":"33",})# 定义工具@tool(return_direct=True)defget_user_info(config: RunnableConfig)->str:"""查找用户信息"""# 获取长期存储。获取到了后,这个存储组件可读也可写
- store = get_store()# 获取配置中的用户ID
- user_id = config["configurable"].get("user_id")
- user_info = store.get(("users",), user_id)returnstr(user_info.value)if user_info else"Unknown user"
- agent = create_react_agent(
- model=llm,
- tools=[get_user_info],
- store=store,)# Run the agent
- agent.invoke({"messages":[{"role":"user","content":"查找用户信息"}]},
- config={"configurable":{"user_id":"user_123"}})
复制代码 4、Human-in-the-loop人类监督(经过人类干预判断是否执行)
- from langgraph.checkpoint.memory import InMemorySaver
- from langgraph.types import interrupt
- from langgraph.prebuilt import create_react_agent
- from langchain_core.tools import tool
- from langchain_community.chat_models import ChatTongyi
- from langgraph.types import Command
- llm = ChatTongyi(
- model="qwen-plus",
- api_key="你的key",)# An example of a sensitive tool that requires human review / approval@tool(return_direct=True)defbook_hotel(hotel_name:str):"""预定宾馆"""
- response = interrupt(f"正准备执行'book_hotel'工具预定宾馆,相关参数名:{{'hotel_name': {hotel_name}}}。""请选择OK表示同意,或者选择edit提出补充意见。")if response["type"]=="OK":print("OK")passelif response["type"]=="edit":print("edit")
- hotel_name = response["args"]["hotel_name"]else:raise ValueError(f"Unknown response type: {response['type']}")returnf"成功在 {hotel_name} 预定了一个房间。"
- checkpoint = InMemorySaver()
- agent = create_react_agent(
- model=llm,
- tools=[book_hotel],
- checkpointer=checkpoint,)
- config ={"configurable":{"thread_id":"1"}}for chunk in agent.stream({"messages":[{"role":"user","content":"帮我在图灵宾馆预定一个房间"}]},
- config
- ):print(chunk)print("\n")# 只要中间记忆没有被线程刷新就还可以恢复 必须回复后才能问别的print("发送edit吧")for chunk in agent.stream(# Command(resume={"type": "OK"}),
- Command(resume={"type":"edit","args":{"hotel_name":"三号宾馆"}}),
- config
- ):print(chunk)print(chunk['tools']['messages'][-1].content)print("\n")
复制代码同意指令
5、总结
四、Graph图
1、案例一demo
- from typing import TypedDict
- from langgraph.constants import END, START
- from langgraph.graph import StateGraph
- classInputState(TypedDict):
- user_input:strclassOutputState(TypedDict):
- graph_output:strclassOverallState(TypedDict):
- foo:str
- user_input:str
- graph_output:strclassPrivateState(TypedDict):
- bar:strdefnode_1(state: InputState)-> OverallState:# Write to OverallStatereturn{"foo": state["user_input"]+"->学院"}defnode_2(state: OverallState)-> PrivateState:# Read from OverallState, write to PrivateStatereturn{"bar": state["foo"]+"->非常"}defnode_3(state: PrivateState)-> OutputState:# Read from PrivateState, write to OutputStatereturn{"graph_output": state["bar"]+"->靠谱"}# 构建图
- builder = StateGraph(OverallState,input=InputState, output=OutputState)# 添加Node
- builder.add_node("node_1", node_1)
- builder.add_node("node_2", node_2)
- builder.add_node("node_3", node_3)# 添加Edge
- builder.add_edge(START,"node_1")
- builder.add_edge("node_1","node_2")
- builder.add_edge("node_2","node_3")
- builder.add_edge("node_3", END)# 编译图
- graph = builder.compile()# 调用图
- graph.invoke({"user_input":"图灵"})
复制代码 2、显示案例一流程图
3、三个主要组件
1. State状态
State是所有节点共享的状态,它是一个字典,包含了所有节点的状态。有几个需要注意的地方:
State形式上,可以是TypedDict字典,也可以是Pydantic中的一个BaseModel。例如:
- from pydantic import BaseModel
- # The overall state of the graph (this is the public state shared across nodes)classOverallState(BaseModel):
- a:str
复制代码 这两种实现,本质上没有太多的区别。
State中定义的属性,通常不需要指定默认值。如果需要默认值,可以通过在START节点后,定义一个node来指定默认值。
- defnode(state: OverallState):return{"a":"goodbye"}
复制代码State中的属性,除了可以修改值之外,也可以定义一些操作。来指定如何更新State中的值。例如
- from langgraph.graph.message import add_messages
- """
- messages 和 list_field 使用特定操作实现累加更新;
- extra_field 使用默认的覆盖更新。
- """classState(TypedDict):
- messages: Annotated[List[AnyMessage], add_messages]
- list_field: Annotated[list[int], add]
- extra_field:int
复制代码 此时,如果有一个node,返回了State中更新的值,那么messages和list_field的值就会添加到原有的旧集合中,而extra_field的值则会被替换。- from langchain_core.messages import AnyMessage, AIMessage
- from langgraph.graph import StateGraph
- from langgraph.graph.message import add_messages
- from typing import Annotated, TypedDict
- from operator import add
- classState(TypedDict):
- messages: Annotated[List[AnyMessage], add_messages]
- list_field: Annotated[list[int], add]
- extra_field:intdefnode1(state: State):
- new_message = AIMessage("Hello!")return{"messages":[new_message],"list_field":[10],"extra_field":10}defnode2(state: State):
- new_message = AIMessage("LangGraph!")return{"messages":[new_message],"list_field":[20],"extra_field":20}
- graph =(
- StateGraph(State).add_node("node1", node1).add_node("node2", node2).set_entry_point("node1").add_edge("node1","node2").compile())
- input_message ={"role":"user","content":"Hi"}
- result = graph.invoke({"messages":[input_message],"list_field":[1,2,3]})print(result)# for message in result["messages"]:# message.pretty_print()# print(result["extra_field"])
复制代码 在LangGraph的应用当中,State通常都会保存聊天消息。为此,LangGraph中还提供了一个langgraph.graph.MessagesState,可以用来快速保存消息。他的声明方式就是这样的:- classMessagesState(TypedDict):
- messages: Annotated[list[AnyMessage], add_messages]
复制代码 然后,对于Messages,也可以用序列化的方式来声明,例如下面两种方式都是可以的- # 一{"messages":[HumanMessage(content="message")]}# 二{"messages":[{"type":"user","content":"message"}]}
复制代码 2. Node节点
Node是图中的一个处理数据的节点。也有以下几个需要注意的地方:
在LangGraph中,Node通常是一个Python的函数,它接受一个State对象作为输入,返回一个State对象作为输出。每个Node都有一个唯一的名称,通常是一个字符串。如果没有提供名称,LangGraph会自动生成一个和函数名一样的名称。在具体实现时,通常包含两个具体的参数,第一个是State,这个是必选的。第二个是一个可选的配置项config。这里面包含了一些节点运行的配置参数。LangGraph对每个Node提供了缓存机制。只要Node的传入参数相同,LangGraph就会优先从缓存当中获取Node的执行结果。从而提升Node的运行速度。
- from typing import TypedDict
- from langchain_core.runnables import RunnableConfig
- from langgraph.constants import START, END
- from langgraph.graph import StateGraph
- from langgraph.types import CachePolicy
- from langgraph.cache.memory import InMemoryCache # 是langgraph中的,而不是langchain中的。# 配置状态classState(TypedDict):
- number:int
- user_id:str# 配置信息classConfigSchema(TypedDict):
- user_id:strdefnode_1(state: State, config: RunnableConfig):
- time.sleep(3)
- user_id = config["configurable"]["user_id"]return{"number": state["number"]+1,"user_id": user_id}
- builder = StateGraph(State, config_schema=ConfigSchema)# Node缓存5秒
- builder.add_node("node1", node_1, cache_policy=CachePolicy(ttl=5))
- builder.add_edge(START,"node1")
- builder.add_edge("node1", END)# graph = builder.compile(cache=InMemoryCache())缓存
- graph = builder.compile(cache=InMemoryCache())print(graph.invoke({"number":5}, config={"configurable":{"user_id":"123"}}, stream_mode='updates'))# {'node1': {'number': 6, 'user_id': '123'}}# node入参相同===也就是{"number": 5},就会走缓存print(graph.invoke({"number":5}, config={"configurable":{"user_id":"456"}}, stream_mode='updates'))# [{'node1': {'number': 6, 'user_id': '123'}, '_metadata_': {'cached': 'True'}}]
复制代码对于Node,LangGraph除了提供缓存机制,还提供了重试机制。
可以针对单个节点指定,例如:
- from langgraph.types import RetryPolicy
- builder.add_node("node1", node_1, retry=RetryPolicy(max_attempts=4))
复制代码- # 图最多执行 25 步如果 25 步内没走到 END,就会抛出异常,防止无限运行。print(graph.invoke(xxxxx, config={"recursion_limit":25}))
复制代码 3. Edge边
在Graph图中,通过Edge(边)把Node(节点)连接起来,从而决定State应该如何在Graph中传递。LangGraph中也提供了非常灵活的构建方式。
普通Edge和EntryPoint
Edge通常是用来把两个Node连接起来,形成逻辑处理路线。例如 graph.add_edge("node_1","node_2") 。
LangGraph中提供了两个默认的Node,START和END,用来作为Graph的入口和出口。
同时,也可以自行指定EntryPoint。例如
- builder = StateGraph(State)
- builder.set_entry_point("node1")
- builder.set_finish_point("node2")
复制代码条件Edge和EntryPoint
我们也可以添加带有条件判断的Edge和EntryPoint,用来动态构建更复杂的工作流程。
具体实现时,可以指定一个函数,函数的返回值就可以是下一个Node的名称。
- from typing import TypedDict
- from langchain_core.runnables import RunnableConfig
- from langgraph.constants import START, END
- from langgraph.graph import StateGraph
- # 配置状态classState(TypedDict):
- number:intdefnode_1(state: State, config: RunnableConfig):return{"number": state["number"]+1}
- builder = StateGraph(State)# Node缓存5秒
- builder.add_node("node1", node_1)defrouting_func(state: State)->str:if state["number"]>5:return"node1"else:return END
- builder.add_edge("node1", END)# 添加条件边
- builder.add_conditional_edges(START, routing_func)
- graph = builder.compile()print(graph.invoke({"number":4}))
复制代码
另外,如果不想在路由函数中写出过多具体的节点名称,也可以在函数中返回一个自定义的结果,然后将这个结果解析到某一个具体的Node上。例如
- defrouting_func(state: State)->bool:if state["number"]>5:returnTrueelse:returnFalse
- builder.add_conditional_edges(START, routing_func,{True:"node_a",False:"node_b"})
复制代码Send动态路由
在条件边中,如果希望一个Node后同时路由到多个Node,就可以返回Send动态路由的方式实现。
Send对象可传入两个参数,第一个是下一个Node的名称,第二个是Node的输入。- from operator import add
- from typing import TypedDict, Annotated
- from langgraph.constants import START, END
- from langgraph.graph import StateGraph
- from langgraph.types import Send
- # 配置状态classState(TypedDict):
- messages: Annotated[list[str], add]classPrivateState(TypedDict):
- msg:strdefnode_1(state: PrivateState)-> State:
- res = state["msg"]+"!"return{"messages":[res]}
- builder = StateGraph(State)# Node缓存5秒
- builder.add_node("node1", node_1)defrouting_func(state: State):
- result =[]for message in state["messages"]:
- result.append(Send("node1",{"msg": message}))return result
- # 通过路由函数,将消息中每个字符串分别传入node1处理。
- builder.add_conditional_edges(START, routing_func,["node1"])
- builder.add_edge("node1", END)
- graph = builder.compile()print(graph.invoke({"messages":["hello","world","hello","graph"]}))
复制代码
Command命令
通常,Graph中一个典型的业务步骤是State进入一个Node处理。在Node中先更新State状态,然后再通过Edges传递给下一个Node。如果希望将这两个步骤合并为一个命令,那么还可以使用Command命令
- from operator import add
- from typing import TypedDict, Annotated
- from langgraph.constants import START, END
- from langgraph.graph import StateGraph
- from langgraph.types import Command
- # 配置状态classState(TypedDict):
- messages: Annotated[list[str], add]defnode_1(state: State):
- new_message =[]for message in state["messages"]:
- new_message.append(message +"!")return Command(
- goto=END,
- update={"messages": new_message})
- builder = StateGraph(State)
- builder.add_node("node1", node_1)# node1中通过Command同时集成了更新State和指定下一个Node
- builder.add_edge(START,"node1")
- graph = builder.compile()print(graph.invoke({"messages":["hello","world","hello","graph"]}))# {'messages': ['hello', 'world', 'hello', 'graph', 'hello!', 'world!', 'hello!', 'graph!']}
复制代码 4、子图
一个Graph除了可以单独使用,还可以作为一个Node,嵌入到另外一个Graph中,这种用法就叫做子图。
使用子图时,基本和使用Node没有太多的区别
唯一需要注意的是,当触发了SubGraph代表Node后,实际上是相当于重新调用依次subgraph.invoke(state)方法- # subgraph与graph使用相同Statefrom operator import add
- from typing import TypedDict, Annotated
- from langgraph.constants import END
- from langgraph.graph import StateGraph, MessagesState, START
- classState(TypedDict):
- messages: Annotated[list[str], add]# Subgraphdefsub_node_1(state: State)-> MessagesState:return{"messages":["response from subgraph"]}
- subgraph_builder = StateGraph(State)
- subgraph_builder.add_node("sub_node_1", sub_node_1)
- subgraph_builder.add_edge(START,"sub_node_1")
- subgraph_builder.add_edge("sub_node_1", END)
- subgraph = subgraph_builder.compile()# Parent graph
- builder = StateGraph(State)
- builder.add_node("subgraph_node", subgraph)
- builder.add_edge(START,"subgraph_node")
- builder.add_edge("subgraph_node", END)
- graph = builder.compile()print(graph.invoke({"messages":["hello subgraph"]}))# 结果hello subgraph会出现两次。这是因为在subgraph_node中默认调用了一次subgraph.invoke(state)方法。主图里也调用了一次invoke。这就会往state中添加两次语句# {'messages': ['hello subgraph', 'hello subgraph', 'response from subgraph']}# 解决方案就是主图不要和子图不使用同一个状态管理State 也就是四、1。 案例一demo中的方式
复制代码 5、图的Stream支持
和调用大模型相似,Graph除了可以通过invoke方法进行直接调用外,也支持通过stream()方法进行流式调用。不过大模型的流式调用是依次返回大模型响应的Token。而Graph的流式输出则是依次返回数据处理步骤。
graph提供了stream()方法进行同步的流式调用,也提供了astream()方法进行异步的流式调用。- for chunk in graph.stream({"messages":["hello subgraph"]}, stream_mode="debug"):print(chunk)# {'subgraph_node': {'messages': ['hello subgraph', 'response from subgraph']}}
复制代码 LangGraph支持几种不同的stream mode:
values:在图的每一步之后流式传输状态的完整值。updates:在图的每一步之后,将更新内容流式传输到状态。如果在同一步骤中进行了多次更新(例如,运行了多个节点),这些更新将分别进行流式传输。custom:从图节点内部流式传输自定义数据。通常用于调试。messages:从任何调用大语言模型(LLM)的图节点中,流式传输二元组(LLM的Token,元数据)。debug:在图的执行过程中尽可能多地传输信息。用得比较少。
values、updates、debug输出模式,使用之前案例验证,就能很快感受到其中的区别。
messages输出模式,由于在之前案例中并没有调用大模型,所以不会有输出结果。
而custom输出模式,可以自定义输出内容。在Node节点内或者Tools工具内,通过get_stream_writer()方法获取一个StreamWriter对象,然后使用write方法将自定义数据写入流中。- from typing import TypedDict
- from langgraph.config import get_stream_writer
- from langgraph.graph import StateGraph, START
- classState(TypedDict):
- query:str
- answer:strdefnode(state: State):
- writer = get_stream_writer()
- writer({"自定义key":"在节点内返回自定义信息"})return{"answer":"some data"}
- graph =(
- StateGraph(State).add_node(node).add_edge(START,"node").compile())
- inputs ={"query":"example"}# Usagefor chunk in graph.stream(inputs, stream_mode="custom"):print(chunk)
复制代码 最后,在langChain中,构建LLM对象时,大都支持desable_streaming属性,禁止流式输出,那么就无法使用上面的流式输出。例如:- llm = ChatOpenAI(model="", disable_streaming=True)
复制代码 五、构建多智能体工作流
1、流式输出大模型调用结果
在介绍Graph的流式输出时,我们提到LangGraph的Graph流式输出有几种不同的模式,其中有一种messages模式,是用来监控大语言模型的Token记录的。这里我们就可以来测试一下。
- from langchain_community.chat_models import ChatTongyi
- # 构建阿里云百炼大模型客户端
- llm = ChatTongyi(
- model="qwen-plus",
- api_key="",)from langgraph.graph import StateGraph, MessagesState, START
- defcall_model(state: MessagesState):
- response = llm.invoke(state["messages"])return{"messages": response}
- builder = StateGraph(MessagesState)
- builder.add_node(call_model)
- builder.add_edge(START,"call_model")
- graph = builder.compile()for chunk in graph.stream({"messages":[{"role":"user","content":"湖南的省会是哪里?"}]},
- stream_mode="messages",):print(chunk)
复制代码 通常,如果要对大模型调用成本进行统计时,这种messages就是比较好的一种方式。
2、大模型消息持久化
和之前介绍LangGraph的Agent相似,Graph图也支持构建消息的持久化功能。并且也通常支持通过checkpoint构建短期记忆,以store构建长期记忆。这里短期记忆和长期记忆,都是可以通过内存或者数据库进行持久化保存的。不过短期记忆更倾向于通过对消息的短期存储,实现多轮对话的效果。而长期记忆则倾向于对消息长期存储后支持语义检索。
- from langchain_community.chat_models import ChatTongyi
- # 构建阿里云百炼大模型客户端
- llm = ChatTongyi(
- model="qwen-plus",
- api_key="",)from langgraph.graph import StateGraph, MessagesState, START
- from langgraph.checkpoint.memory import InMemorySaver
- defcall_model(state: MessagesState):
- response = llm.invoke(state["messages"])return{"messages": response}
- builder = StateGraph(MessagesState)
- builder.add_node(call_model)
- builder.add_edge(START,"call_model")
- checkpointer = InMemorySaver()
- graph = builder.compile(checkpointer=checkpointer)
- config ={"configurable":{"thread_id":"1"}}for chunk in graph.stream({"messages":[{"role":"user","content":"湖南的省会是哪里?"}]},
- config,
- stream_mode="values",):
- chunk["messages"][-1].pretty_print()for chunk in graph.stream({"messages":[{"role":"user","content":"湖北呢?"}]},
- config,
- stream_mode="values",):
- chunk["messages"][-1].pretty_print()
复制代码 3、Human-In-Loop人类干预
在LangGraph中也可以通过中断任务,等待确认的方式,来实现过程干预,这样能够更好的减少大语言模型的结果不稳定给任务带来的影响。
在具体实现人类干预时,需要注意一下几点:
必须指定一个checkpointers短期记忆,否则无法保存任务状态。在执行Graph任务时,必须指定一个带有thread_id的配置项,指定线程ID。之后才能通过线程ID,指定恢复线程。在任务执行过程中,通过interrupt()方法,中断任务,等待确认。在人类确认之后,使用Graph提交一个resume=True的Command指令,恢复任务,并继续进行。
这种实现方式,在之前介绍LangGraph构建单Agent时已经介绍过,不过,结合Graph的State,在多个Node之间进行复杂控制,这样更能体现出人类监督的价值。
例如,下面的案例可以实现这样一种典型的人类确认:
- # 构建一个带有Human-In-Loop的图from operator import add
- from langchain_core.messages import AnyMessage
- from langgraph.checkpoint.memory import InMemorySaver
- from langgraph.constants import START, END
- from langgraph.graph import StateGraph
- from langchain_community.chat_models import ChatTongyi
- # 构建阿里云百炼大模型客户端
- llm = ChatTongyi(
- model="qwen-plus",
- api_key="",)from typing import Literal, TypedDict, Annotated
- from langgraph.types import interrupt, Command
- classState(TypedDict):
- messages: Annotated[list[AnyMessage], add]defhuman_approval(state: State)-> Command[Literal["call_llm", END]]:
- is_approved = interrupt({"question":"是否同意调用大语言模型?"})if is_approved:return Command(goto="call_llm")else:return Command(goto=END)defcall_llm(state: State):
- response = llm.invoke(state["messages"])return{"messages":[response]}
- builder = StateGraph(State)# Add the node to the graph in an appropriate location# and connect it to the relevant nodes.
- builder.add_node("human_approval", human_approval)
- builder.add_node("call_llm", call_llm)
- builder.add_edge(START,"human_approval")
- checkpointers = InMemorySaver()
- graph = builder.compile(checkpointer=checkpointers)from langchain_core.messages import HumanMessage
- # 提交任务,等待确认
- thread_config ={"configurable":{"thread_id":1}}
- graph.invoke({"messages":[HumanMessage("湖南的省会是哪里?")]}, config=thread_config)# 执行后会中断任务,等待确认# 确认同意,继续执行任务
- final_result = graph.invoke(Command(resume=True), config=thread_config)print(final_result)# 不同意,终止任务# final_result = graph.invoke(Command(resume=False), config=thread_config)# print(final_result)
复制代码 ps:
任务中断和恢复,需要保持相同的thread_id。通常应用当中都会单独生成一个随机的thread_id,保证唯一的同时,防止其他任务干扰。
interrupt()方法中中断任务的时间不能过长,过长了之后就无法恢复任务了。
任务确认时,Command中传递的resume可以是简单的True或False,也可以是一个字典。通过字典可以进行更多的判断。
4、Time Travel时间回溯
由于大语言模型回答问题的不确定性,基于大语言模型构建的应用,也是充满不确定性的。而对于这种不确定性的系统,就有必要进行更精确的检查。当某一个步骤出现问题时,才能及时发现问题,并对出问题的那个步骤进行重演。为此,LangGraph提供了Time Travel时间回溯功能,可以保存Graph的运行过程,并可以手动指定从Graph的某一个Node开始进行重演。
具体实现时,需要注意以下几点:
在运行Graph时,需要提供初始的输入消息。运行时,指定thread_id线程ID。并且要基于这个线程ID,再指定一个checkpoint检查点。执行后将在每一个Node执行后,生成一个check_point_id。指定thread_id和check_point_id,进行任务重演。重演前,可以选择更新state,当然,如果没问题,也可以不指定。
构建图
查看检查点
- # 构建一个图。图中两个步骤:第一步让大模型推荐一个有名的作家,第二步,让大模型用推荐的作家的风格写一个100字以内的笑话。from typing import TypedDict
- from typing_extensions import NotRequired
- from langgraph.checkpoint.memory import InMemorySaver
- from langgraph.constants import START, END
- from langgraph.graph import StateGraph
- import uuid
- from langchain_community.chat_models import ChatTongyi
- # 构建阿里云百炼大模型客户端
- llm = ChatTongyi(
- model="qwen-plus",
- api_key="",)classState(TypedDict):
- author: NotRequired[str]
- joke: NotRequired[str]defauthor_node(state: State):
- prompt ="帮我推荐一位受人们欢迎的作家。只需要给出作家的名字即可。"
- author = llm.invoke(prompt)return{"author": author}defjoke_node(state: State):
- prompt =f"用作家:{state['author']} 的风格,写一个100字以内的笑话"
- joke = llm.invoke(prompt)return{"joke": joke}
- builder = StateGraph(State)
- builder.add_node(author_node)
- builder.add_node(joke_node)
- builder.add_edge(START,"author_node")
- builder.add_edge("author_node","joke_node")
- builder.add_edge("joke_node", END)
- checkpointers = InMemorySaver()
- graph = builder.compile(checkpointer=checkpointers)
- graph
- # 正常执行一个图
- config ={"configurable":{"thread_id": uuid.uuid4(),}}
- state = graph.invoke({}, config)print(state["author"])print()print(state["joke"])# 查看所有checkpoint检查点
- states =list(graph.get_state_history(config))for state in states:print(state.next)print(state.config["configurable"]["checkpoint_id"])print()# 选定某一个检查点。这里选择author_node,让大模型重新推荐作家# 1也就是auth那个节点开始
- selected_state = states[1]print(selected_state.next)print(selected_state.values)# 为了后面的重演,更新state。可选步骤:
- new_config = graph.update_state(selected_state.config, values={"author":"郭德纲"})print(new_config)# 接下来,指定thread_id和checkpoint_id,进行重演
- graph.invoke(None, new_config)
复制代码 5、多智能体架构
可以看到,在LangChain体系中,LangChain主要集成了和大语言模型交互的能力,而LangGraph主要实现了复杂的流程调度。将这两个能力结合起来,一个强大的多智能体构建框架就已经成型了。
接下来,我们就用LangGraph来实现一个非常典型的多智能体架构,作为一个完整的案例。
这个机器人可以通过一个supervisor节点,对用户的输入进行分类,然后根据分类结果,选择不同的agent节点进行处理。接下来每个Agent节点,都可以选择不同的工具进行处理,最后将处理结果汇总,返回给supervisor节点。supervisor节点再将结果返回给用户。
Director.py 比较懒,自己去加其他的,这里只演示笑话的这个
- from operator import add
- from typing import TypedDict, Annotated
- from langchain_community.chat_models import ChatTongyi
- from langchain_core.messages import AnyMessage, HumanMessage, AIMessage
- from langgraph.checkpoint.memory import InMemorySaver
- from langgraph.config import get_stream_writer
- from langgraph.constants import START, END
- from langgraph.graph import StateGraph
- llm = ChatTongyi(
- model="qwen-plus",
- api_key="")
- nodes =["supervisor","travel","couplet","joke","other"]classState(TypedDict):
- messages: Annotated[list[AnyMessage], add]type:strdefother_node(state: State):print(">>> other_node")
- writer = get_stream_writer()
- writer({"node":">>>> other_node"})return{"messages":[HumanMessage(content="我暂时无法回答这个问题")],"type":"other"}deftravel_node(state: State):print(">>> travel_node")
- writer = get_stream_writer()
- writer({"node":">>>> travel_node"})return{"messages":[HumanMessage(content="travel_node")],"type":"travel"}defsupervisor_node(state: State):print(">>> supervisor_node")
- writer = get_stream_writer()
- writer({"node":">>>> supervisor_node"})# 跟据用户问题,对问题分类。分类结果保存在type中# 根据用户的问题,对问题进行分类。分类结果保存到type当中
- prompt ="""你是一个专业的客服助手,负责对用户的问题进行分类,并将任务分给其他Agent执行。
- 如果用户的问题是和旅游路线规划相关的,那就返回 travel 。
- 如果用户的问题是希望讲一个笑话,那就返回 joke 。
- 如果用户的问题是希望对一个对联,那就返回 couplet 。
- 如果是其他的问题,返回 other 。
- 除了这几个选项外,不要返回任何其他的内容。"""
- prompts =[{"role":"system","content": prompt},{"role":"user","content": state["messages"][0]}]# 如果有type的话那么就是已经交给对应的路由处理完成,可以直接返回if"type"in state:
- writer({"supervisor_step":f"已获取{state['type']} 智能处理结果"})return{"type": END}else:
- resource = llm.invoke(prompts)
- typeRes = resource.content
- writer({"supervisor_step":f"问题分类结果{typeRes}"})if typeRes in nodes:return{"type": typeRes}else:raise ValueError("type is not type in (supervisor, travel, couplet, joke, other)")defjoke_node(state: State):print(">>> joke_node")
- writer = get_stream_writer()
- writer({"node":">>>> joke_node"})
- system_prompt ="你是一个笑话大师,根据用户的问题,写一个不超过100个字的笑话。"
- prompts =[{"role":"system","content": system_prompt},{"role":"user","content": state["messages"][0]}]
- response = llm.invoke(prompts)return{"messages":[AIMessage(content=response.content)],"type":"joke"}defcouplet_node(state: State):print(">>> couplet_node")
- writer = get_stream_writer()
- writer({"node":">>>> couplet_node"})return{"messages":[HumanMessage(content="couplet_node")],"type":"couplet"}# 条件路由defrouting_func(state: State):if state["type"]=="travel":return"travel_node"elif state["type"]=="joke":return"joke_node"elif state["type"]=="couplet":return"couplet_node"elif state["type"]== END:return END
- else:return"other_node"# 构件图
- builder = StateGraph(State)# 添加节点
- builder.add_node("supervisor_node", supervisor_node)
- builder.add_node("travel_node", travel_node)
- builder.add_node("joke_node", joke_node)
- builder.add_node("couplet_node", couplet_node)
- builder.add_node("other_node", other_node)# 添加Edge
- builder.add_edge(START,"supervisor_node")
- builder.add_conditional_edges("supervisor_node", routing_func,["travel_node","joke_node","couplet_node","other_node", END])
- builder.add_edge("travel_node","supervisor_node")
- builder.add_edge("joke_node","supervisor_node")
- builder.add_edge("couplet_node","supervisor_node")
- builder.add_edge("other_node","supervisor_node")# 构建Graph
- checkpointer = InMemorySaver()
- graph = builder.compile(checkpointer=checkpointer)# 执行任务测试代码if __name__ =="__main__":
- config ={"configurable":{"thread_id":"1"}}# 选中路由回答# for chunk in graph.stream(# input={"messages": ["给我讲一个郭德纲的笑话"]},# config=config,# stream_mode="custom"# ):# print(chunk)# 无法回答# for chunk in graph.stream(# input={"messages": ["今天天气怎么样"]},# config=config,# stream_mode="values"# ):# print(chunk)# 最终获取的答案就是# res = graph.invoke(input={"messages": ["今天天气怎么样"]}, config=config)# print(res["messages"][-1].content)
复制代码调用
- import uuid
- from Director import graph
- query ="讲一个郭德纲的笑话"
- config ={"configurable":{"thread_id": uuid.uuid4()}}
- res = graph.invoke(input={"messages":[query]}, config=config)print(res["messages"][-1].content)
复制代码 原文地址:https://blog.csdn.net/weixin_49390750/article/details/152042999 |