作者:E的工程笔记
本文翻译整理自 Hierarchical Agent Teams
https://langchain-ai.github.io/langgraph/tutorials/multi_agent/hierarchical_agent_teams/
文章目录
一、前言二、设置三、创建工具四、Helper Utilities五、定义代理 Team
六、添加图层
一、前言
在前面的示例(Agent Supervisor)中,我们引入了单个主管节点的概念,用于在不同的工作节点之间路由工作。
但是如果一个工人的工作变得太复杂怎么办?如果工人数量太多怎么办?
对于某些应用程序,如果工作按层次分布,系统可能会更有效。
您可以通过组合不同的子图并创建顶级主管以及中级主管来做到这一点。
为此,让我们构建一个简单的研究助手!该图如下所示:
AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation
本笔记本的灵感来自Wu等人的论文AutoGen:通过多代理对话启用下一代LLM应用程序。在本笔记本的其余部分,您将:
Define the agents’ tools to access the web and write files定义代理访问Web和写入文件的工具Define some utilities to help create the graph and agents定义一些实用程序来帮助创建图和代理Create and define each team (web research + doc writing)创建和定义每个Team (网络研究+文档编写)Compose everything together.把一切组合在一起。
二、设置
首先,让我们安装所需的包并设置API密钥- %%capture --no-stderr
- %pip install -U langgraph langchain langchain_openai langchain_experimental
- import getpass
- import os
- def_set_if_undefined(var:str):ifnot os.environ.get(var):
- os.environ[var]= getpass.getpass(f"Please provide your {var}")
- _set_if_undefined("OPENAI_API_KEY")
- _set_if_undefined("TAVILY_API_KEY")
复制代码 设置LangSmith用于LangGraph开发
注册LangSmith以快速发现问题并提高LangGraph项目的性能。LangSmith允许您使用跟踪数据来调试、测试和监控使用LangGraph构建的LLM应用程序 - 在此处阅读有关如何开始的更多信息。
三、创建工具
每个Team 将由一个或多个代理组成,每个代理都有一个或多个工具。下面,定义您的不同Team 要使用的所有工具。
我们将从研究Team 开始。
ResearchTeam 工具
研究Team 工具
研究Team 可以使用搜索引擎和url刮刀在网络上查找信息。请随时在下面添加其他功能以提高Team 绩效!- from typing import Annotated, List
- from langchain_community.document_loaders import WebBaseLoader
- from langchain_community.tools.tavily_search import TavilySearchResults
- from langchain_core.tools import tool
- tavily_tool = TavilySearchResults(max_results=5)@tooldefscrape_webpages(urls: List[str])->str:"""Use requests and bs4 to scrape the provided web pages for detailed information."""
- loader = WebBaseLoader(urls)
- docs = loader.load()return"\n\n".join([f'<Document name="{doc.metadata.get("title","")}">\n{doc.page_content}\n</Document>'for doc in docs
- ])
复制代码 API Reference:WebBaseLoaderTavilySearchResultstool
文档编写Team 工具
接下来,我们将提供一些工具供文档编写Team 使用。我们在下面定义了一些基本的文件访问工具。
请注意,这使代理可以访问您的文件系统,这可能是不安全的。我们也没有优化工具描述以提高性能。- from pathlib import Path
- from tempfile import TemporaryDirectory
- from typing import Dict, Optional
- from langchain_experimental.utilities import PythonREPL
- from typing_extensions import TypedDict
- _TEMP_DIRECTORY = TemporaryDirectory()
- WORKING_DIRECTORY = Path(_TEMP_DIRECTORY.name)@tooldefcreate_outline(
- points: Annotated[List[str],"List of main points or sections."],
- file_name: Annotated[str,"File path to save the outline."],)-> Annotated[str,"Path of the saved outline file."]:"""Create and save an outline."""with(WORKING_DIRECTORY / file_name).open("w")asfile:for i, point inenumerate(points):file.write(f"{i +1}. {point}\n")returnf"Outline saved to {file_name}"@tooldefread_document(
- file_name: Annotated[str,"File path to save the document."],
- start: Annotated[Optional[int],"The start line. Default is 0"]=None,
- end: Annotated[Optional[int],"The end line. Default is None"]=None,)->str:"""Read the specified document."""with(WORKING_DIRECTORY / file_name).open("r")asfile:
- lines =file.readlines()if start isnotNone:
- start =0return"\n".join(lines[start:end])@tooldefwrite_document(
- content: Annotated[str,"Text content to be written into the document."],
- file_name: Annotated[str,"File path to save the document."],)-> Annotated[str,"Path of the saved document file."]:"""Create and save a text document."""with(WORKING_DIRECTORY / file_name).open("w")asfile:file.write(content)returnf"Document saved to {file_name}"@tooldefedit_document(
- file_name: Annotated[str,"Path of the document to be edited."],
- inserts: Annotated[
- Dict[int,str],"Dictionary where key is the line number (1-indexed) and value is the text to be inserted at that line.",],)-> Annotated[str,"Path of the edited document file."]:"""Edit a document by inserting text at specific line numbers."""with(WORKING_DIRECTORY / file_name).open("r")asfile:
- lines =file.readlines()
- sorted_inserts =sorted(inserts.items())for line_number, text in sorted_inserts:if1<= line_number <=len(lines)+1:
- lines.insert(line_number -1, text +"\n")else:returnf"Error: Line number {line_number} is out of range."with(WORKING_DIRECTORY / file_name).open("w")asfile:file.writelines(lines)returnf"Document edited and saved to {file_name}"# Warning: This executes code locally, which can be unsafe when not sandboxed
- repl = PythonREPL()@tooldefpython_repl(
- code: Annotated[str,"The python code to execute to generate your chart."],):"""Use this to execute python code. If you want to see the output of a value,
- you should print it out with `print(...)`. This is visible to the user."""try:
- result = repl.run(code)except BaseException as e:returnf"Failed to execute. Error: {repr(e)}"returnf"Successfully executed:\n\`\`\`python\n{code}\n\`\`\`\nStdout: {result}"
复制代码 API Reference:PythonREPL
四、Helper Utilities
当我们想要时,我们将创建一些实用函数以使其更加简洁:
这些将为我们简化最后的图形组合代码,以便更容易看到发生了什么。- from typing import List, Optional
- from langchain.output_parsers.openai_functions import JsonOutputFunctionsParser
- from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
- from langchain_openai import ChatOpenAI
- from langgraph.graph import END, StateGraph, START
- from langchain_core.messages import HumanMessage, trim_messages
- llm = ChatOpenAI(model="gpt-4o-mini")
- trimmer = trim_messages(
- max_tokens=100000,
- strategy="last",
- token_counter=llm,
- include_system=True,)defagent_node(state, agent, name):
- result = agent.invoke(state)return{"messages":[HumanMessage(content=result["messages"][-1].content, name=name)]}defcreate_team_supervisor(llm: ChatOpenAI, system_prompt, members)->str:"""An LLM-based router."""
- options =["FINISH"]+ members
- function_def ={"name":"route","description":"Select the next role.","parameters":{"title":"routeSchema","type":"object","properties":{"next":{"title":"Next","anyOf":[{"enum": options},],},},"required":["next"],},}
- prompt = ChatPromptTemplate.from_messages([("system", system_prompt),
- MessagesPlaceholder(variable_name="messages"),("system","Given the conversation above, who should act next?"" Or should we FINISH? Select one of: {options}",),]).partial(options=str(options), team_members=", ".join(members))return(
- prompt
- | trimmer
- | llm.bind_functions(functions=[function_def], function_call="route")| JsonOutputFunctionsParser())
复制代码 API Reference:JsonOutputFunctionsParserChatPromptTemplateMessagesPlaceholderChatOpenAIHumanMessagetrim_messagesENDStateGraphSTART
五、定义代理 Team
现在我们可以定义我们的等级Team 了。“选择你的球员!”
研究 Team
研究Team 将有一个搜索代理和一个网页抓取“research_agent”作为两个工作节点。让我们创建这些,以及Team 主管。- import functools
- import operator
- from langchain_core.messages import BaseMessage, HumanMessage
- from langchain_openai.chat_models import ChatOpenAI
- from langgraph.prebuilt import create_react_agent
- # ResearchTeam graph stateclassResearchTeamState(TypedDict):# A message is added after each team member finishes
- messages: Annotated[List[BaseMessage], operator.add]# The team members are tracked so they are aware of# the others' skill-sets
- team_members: List[str]# Used to route work. The supervisor calls a function# that will update this every time it makes a decisionnext:str
- llm = ChatOpenAI(model="gpt-4o")
- search_agent = create_react_agent(llm, tools=[tavily_tool])
- search_node = functools.partial(agent_node, agent=search_agent, name="Search")
- research_agent = create_react_agent(llm, tools=[scrape_webpages])
- research_node = functools.partial(agent_node, agent=research_agent, name="WebScraper")
- supervisor_agent = create_team_supervisor(
- llm,"You are a supervisor tasked with managing a conversation between the"" following workers: Search, WebScraper. Given the following user request,"" respond with the worker to act next. Each worker will perform a"" task and respond with their results and status. When finished,"" respond with FINISH.",["Search","WebScraper"],)
复制代码 API Reference:BaseMessageHumanMessageChatOpenAIcreate_react_agent
现在我们已经创建了必要的组件,定义它们的交互很容易。将节点添加到团队图中,并定义确定转换标准的边。- research_graph = StateGraph(ResearchTeamState)
- research_graph.add_node("Search", search_node)
- research_graph.add_node("WebScraper", research_node)
- research_graph.add_node("supervisor", supervisor_agent)# Define the control flow
- research_graph.add_edge("Search","supervisor")
- research_graph.add_edge("WebScraper","supervisor")
- research_graph.add_conditional_edges("supervisor",lambda x: x["next"],{"Search":"Search","WebScraper":"WebScraper","FINISH": END},)
- research_graph.add_edge(START,"supervisor")
- chain = research_graph.compile()# The following functions interoperate between the top level graph state# and the state of the research sub-graph# this makes it so that the states of each graph don't get intermixeddefenter_chain(message:str):
- results ={"messages":[HumanMessage(content=message)],}return results
- research_chain = enter_chain | chain
- from IPython.display import Image, display
- display(Image(chain.get_graph(xray=True).draw_mermaid_png()))
复制代码
我们可以直接给这个团队工作。在下面试试。- for s in research_chain.stream("when is Taylor Swift's next tour?",{"recursion_limit":100}):if"__end__"notin s:print(s)print("---")
复制代码 文档写作Team
使用类似的方法在下面创建文档编写团队。这一次,我们将授予每个代理访问不同文件编写工具的权限。
请注意,我们在这里向我们的代理提供文件系统访问权限,这在所有情况下都不安全。- import operator
- from pathlib import Path
- # Document writing team graph stateclassDocWritingState(TypedDict):# This tracks the team's conversation internally
- messages: Annotated[List[BaseMessage], operator.add]# This provides each worker with context on the others' skill sets
- team_members:str# This is how the supervisor tells langgraph who to work nextnext:str# This tracks the shared directory state
- current_files:str# This will be run before each worker agent begins work# It makes it so they are more aware of the current state# of the working directory.defprelude(state):
- written_files =[]ifnot WORKING_DIRECTORY.exists():
- WORKING_DIRECTORY.mkdir()try:
- written_files =[
- f.relative_to(WORKING_DIRECTORY)for f in WORKING_DIRECTORY.rglob("*")]except Exception:passifnot written_files:return{**state,"current_files":"No files written."}return{**state,"current_files":"\nBelow are files your team has written to the directory:\n"+"\n".join([f" - {f}"for f in written_files]),}
- llm = ChatOpenAI(model="gpt-4o")
- doc_writer_agent = create_react_agent(
- llm, tools=[write_document, edit_document, read_document])# Injects current directory working state before each call
- context_aware_doc_writer_agent = prelude | doc_writer_agent
- doc_writing_node = functools.partial(
- agent_node, agent=context_aware_doc_writer_agent, name="DocWriter")
- note_taking_agent = create_react_agent(llm, tools=[create_outline, read_document])
- context_aware_note_taking_agent = prelude | note_taking_agent
- note_taking_node = functools.partial(
- agent_node, agent=context_aware_note_taking_agent, name="NoteTaker")
- chart_generating_agent = create_react_agent(llm, tools=[read_document, python_repl])
- context_aware_chart_generating_agent = prelude | chart_generating_agent
- chart_generating_node = functools.partial(
- agent_node, agent=context_aware_note_taking_agent, name="ChartGenerator")
- doc_writing_supervisor = create_team_supervisor(
- llm,"You are a supervisor tasked with managing a conversation between the"" following workers: {team_members}. Given the following user request,"" respond with the worker to act next. Each worker will perform a"" task and respond with their results and status. When finished,"" respond with FINISH.",["DocWriter","NoteTaker","ChartGenerator"],)
复制代码 通过创建对象本身,我们可以形成图形。- # Create the graph here:# Note that we have unrolled the loop for the sake of this doc
- authoring_graph = StateGraph(DocWritingState)
- authoring_graph.add_node("DocWriter", doc_writing_node)
- authoring_graph.add_node("NoteTaker", note_taking_node)
- authoring_graph.add_node("ChartGenerator", chart_generating_node)
- authoring_graph.add_node("supervisor", doc_writing_supervisor)# Add the edges that always occur
- authoring_graph.add_edge("DocWriter","supervisor")
- authoring_graph.add_edge("NoteTaker","supervisor")
- authoring_graph.add_edge("ChartGenerator","supervisor")# Add the edges where routing applies
- authoring_graph.add_conditional_edges("supervisor",lambda x: x["next"],{"DocWriter":"DocWriter","NoteTaker":"NoteTaker","ChartGenerator":"ChartGenerator","FINISH": END,},)
- authoring_graph.add_edge(START,"supervisor")
- chain = authoring_graph.compile()# The following functions interoperate between the top level graph state# and the state of the research sub-graph# this makes it so that the states of each graph don't get intermixeddefenter_chain(message:str, members: List[str]):
- results ={"messages":[HumanMessage(content=message)],"team_members":", ".join(members),}return results
- # We reuse the enter/exit functions to wrap the graph
- authoring_chain =(
- functools.partial(enter_chain, members=authoring_graph.nodes)| authoring_graph.compile())from IPython.display import Image, display
- display(Image(chain.get_graph().draw_mermaid_png()))
复制代码
- for s in authoring_chain.stream("Write an outline for poem and then write the poem to disk.",{"recursion_limit":100},):if"__end__"notin s:print(s)print("---")
复制代码- {'supervisor':{'next':'NoteTaker'}}---{'NoteTaker':{'messages':[HumanMessage(content='The poem has been written and saved to "poem.txt".', name='NoteTaker')]}}---{'supervisor':{'next':'FINISH'}}---
复制代码 六、添加图层
在这个设计中,我们执行了一个自上而下的规划策略。我们已经创建了两个图表,但是我们必须决定如何在两者之间分配工作。
我们将创建第三个图来协调前两个图,并添加一些连接器来定义如何在不同图之间共享这个顶级状态。- from langchain_core.messages import BaseMessage
- from langchain_openai.chat_models import ChatOpenAI
- llm = ChatOpenAI(model="gpt-4o")
- supervisor_node = create_team_supervisor(
- llm,"You are a supervisor tasked with managing a conversation between the"" following teams: {team_members}. Given the following user request,"" respond with the worker to act next. Each worker will perform a"" task and respond with their results and status. When finished,"" respond with FINISH.",["ResearchTeam","PaperWritingTeam"],)
复制代码 API Reference:BaseMessageChatOpenAI
- # Top-level graph stateclassState(TypedDict):
- messages: Annotated[List[BaseMessage], operator.add]next:strdefget_last_message(state: State)->str:return state["messages"][-1].content
- defjoin_graph(response:dict):return{"messages":[response["messages"][-1]]}# Define the graph.
- super_graph = StateGraph(State)# First add the nodes, which will do the work
- super_graph.add_node("ResearchTeam", get_last_message | research_chain | join_graph)
- super_graph.add_node("PaperWritingTeam", get_last_message | authoring_chain | join_graph
- )
- super_graph.add_node("supervisor", supervisor_node)# Define the graph connections, which controls how the logic# propagates through the program
- super_graph.add_edge("ResearchTeam","supervisor")
- super_graph.add_edge("PaperWritingTeam","supervisor")
- super_graph.add_conditional_edges("supervisor",lambda x: x["next"],{"PaperWritingTeam":"PaperWritingTeam","ResearchTeam":"ResearchTeam","FINISH": END,},)
- super_graph.add_edge(START,"supervisor")
- super_graph = super_graph.compile()from IPython.display import Image, display
- display(Image(super_graph.get_graph().draw_mermaid_png()))
复制代码
- for s in super_graph.stream({"messages":[
- HumanMessage(
- content="Write a brief research report on the North American sturgeon. Include a chart.")],},{"recursion_limit":150},):if"__end__"notin s:print(s)print("---")
复制代码
- {'supervisor':{'next':'ResearchTeam'}}---{'ResearchTeam':{'messages':[HumanMessage(content="Unfortunately, ... documents.", name='WebScraper')]}}---{'supervisor':{'next':'PaperWritingTeam'}}
复制代码 2024-10-18(五)
原文地址:https://blog.csdn.net/lovechris00/article/details/143062257 |