作者:CSDN博客
简介:
LangGraph 与 MCP(模型上下文协议)的结合使用,实现了大语言模型与外部工具、数据源的标准化交互,核心功能包括多轮对话记忆、动态工作流分支及工具调用链管理。
下面例子有两个python文件,一个用作服务器端,一个用作客户端。
服务器端:
FastMCP 也可以非常方便创建 MCP Server
- from mcp.server.fastmcp import FastMCP
复制代码 创建名为 “Logistics” 的MCP服务器
- mcp = FastMCP("Logistics")
复制代码 工具1:查询包裹状态
- @mcp.tool()defget_package_status(tracking_id:str)->str:"""Get the current status of a package by tracking ID."""
- statuses ={"LGT123456":"已发货,在途中","LGT789012":"本地分拣中心,待派送","LGT345678":"已签收","LGT000000":"无效单号,请检查"}return statuses.get(tracking_id,"未找到该包裹信息")
复制代码 工具2:计算运算费用
- @mcp.tool()defestimate_delivery_time(distance_km:float)->str:"""Estimate delivery time in hours based on distance."""
- avg_speed_km_per_hour =40
- time_hours = distance_km / avg_speed_km_per_hour
- days =int(time_hours //24)
- hours =round(time_hours %24)returnf"预计送达时间:{days} 天 {hours} 小时"
复制代码 工具3:预计到达时间
- @mcp.tool()defestimate_delivery_time(distance_km:float)->str:"""Estimate delivery time in hours based on distance."""
- avg_speed_km_per_hour =40
- time_hours = distance_km / avg_speed_km_per_hour
- days =int(time_hours //24)
- hours =round(time_hours %24)returnf"预计送达时间:{days} 天 {hours} 小时"
复制代码 完整代码:
- from mcp.server.fastmcp import FastMCP
- mcp = FastMCP("Logistics")@mcp.tool()defget_package_status(tracking_id:str)->str:"""Get the current status of a package by tracking ID."""
- statuses ={"LGT123456":"已发货,在途中","LGT789012":"本地分拣中心,待派送","LGT345678":"已签收","LGT000000":"无效单号,请检查"}return statuses.get(tracking_id,"未找到该包裹信息")@mcp.tool()defcalculate_shipping_cost(weight_kg:float, distance_km:float)->str:"""Calculate shipping cost based on weight and distance."""
- base_rate =5.0
- per_kg_rate =2.0
- per_km_rate =0.01
- cost = base_rate +(weight_kg * per_kg_rate)+(distance_km * per_km_rate)returnf"运费估算:{round(cost,2)} 元"@mcp.tool()defestimate_delivery_time(distance_km:float)->str:"""Estimate delivery time in hours based on distance."""
- avg_speed_km_per_hour =40
- time_hours = distance_km / avg_speed_km_per_hour
- days =int(time_hours //24)
- hours =round(time_hours %24)returnf"预计送达时间:{days} 天 {hours} 小时"if __name__ =="__main__":
- mcp.run(transport="streamable-http")
复制代码 客戶端:
完整代码:
- # pip install langchain-mcp-adaptersimport os
- from typing import List
- from typing_extensions import TypedDict
- from typing import Annotated
- from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
- from langgraph.prebuilt import tools_condition, ToolNode
- from langgraph.graph import StateGraph, START, END
- from langgraph.graph.message import AnyMessage, add_messages
- from langgraph.checkpoint.memory import MemorySaver
- from langchain_mcp_adapters.client import MultiServerMCPClient
- from langchain_mcp_adapters.tools import load_mcp_tools
- from langchain_mcp_adapters.prompts import load_mcp_prompt
- from langchain_openai import ChatOpenAI
- import asyncio
- # 初始化多服务器客户端
- client = MultiServerMCPClient({# "math": {# "command": "python",# "args": ["math_mcp_server.py"], # 确保这个文件存在且可运行# "transport": "stdio",# },"logistics":{"url":"http://localhost:8000/mcp",# 物流MCP服务地址"transport":"streamable_http",}})asyncdefcreate_graph(logistics_session):# 使用 OpenAI 模型
- llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, api_key="sk-淘宝35块apikey")# 加载工具
- logistics_tools =await load_mcp_tools(logistics_session)
- tools = logistics_tools
- # 绑定工具到 LLM
- llm_with_tool = llm.bind_tools(tools)# 可选:从 MCP 加载 system prompt - 静默处理
- system_prompt ="你是一个智能助手,可以调用工具来回答用户问题。"try:
- system_prompt_msg =await load_mcp_prompt(logistics_session,"system_prompt")if system_prompt_msg andlen(system_prompt_msg)>0:
- system_prompt = system_prompt_msg[0].content
- except Exception:# 静默失败,使用默认提示,不打印错误信息pass
- prompt_template = ChatPromptTemplate.from_messages([("system", system_prompt),
- MessagesPlaceholder(variable_name="messages")])
- chat_llm = prompt_template | llm_with_tool
- # 状态定义classState(TypedDict):
- messages: Annotated[List[AnyMessage], add_messages]# 节点函数defchat_node(state: State)-> State:
- response = chat_llm.invoke({"messages": state["messages"]})return{"messages":[response]}
- tool_node = ToolNode(tools=tools)# 构建图
- graph_builder = StateGraph(State)
- graph_builder.add_node("chat_node", chat_node)
- graph_builder.add_node("tool_node", tool_node)
- graph_builder.add_edge(START,"chat_node")
- graph_builder.add_conditional_edges("chat_node",
- tools_condition,{"tools":"tool_node","__end__": END})
- graph_builder.add_edge("tool_node","chat_node")
-
- graph = graph_builder.compile(checkpointer=MemorySaver())return graph
- asyncdefmain():
- config ={"configurable":{"thread_id":"logistics_thread_001"}}asyncwith client.session("logistics")as logistics_session:print("MCP 客户端已连接:Logistics 服务")
- agent =await create_graph(logistics_session)print("\n欢迎使用智能物流客服!你可以询问:")print("包裹状态、运费、送达时间等")print("输入 'quit' 退出\n")whileTrue:try:
- user_input =input("User: ").strip()if user_input.lower()in["quit","exit","退出"]:print("再见!")break# 调用代理
- response =await agent.ainvoke({"messages":[{"role":"user","content": user_input}]},
- config=config
- )
- ai_message = response["messages"][-1].content
- print(f"AI: {ai_message}\n")except KeyboardInterrupt:print("\n已退出。")breakexcept Exception as e:print(f"出错: {e}")if __name__ =="__main__":
- asyncio.run(main())
复制代码 Graph框架
测试案例:
启动服务器端:
启动客户器端:
测试案例:
1.我的包裹 LGT123456 到哪了?
2.从北京到上海寄一个5公斤的包裹要多少钱?距离大约是1200公里。
3.那大概多久能到?
因为LangGraph 自带记忆功能,所以可以直接问下一个问题。
原文地址:https://blog.csdn.net/u013261578/article/details/152050394 |