作者:CSDN博客
1.简介LangGraph
多智能体架构,
LangGraph 和 langChain的区别
LangGraph和 LngSmith 是LangGraph中的一个生态组件
LngSmith 主要用来调试
LangGraph 主要用来工作流 整合多个智能体
LangGraph Platform 提供了一个云环境服务部署
LangGraph最主要的是封装了很多基础功能,让我们只要关注Agent本身就行了
2.agent
langGraph 构建出来的智能体 类比一个好员工,听话,并且能及时反馈信息
- from langgraph.prebuilt import create_react_agent
- from langchain_openai import ChatOpenAI
- import dotenv
- dotenv.load_dotenv()
- llm = ChatOpenAI()
- # llm.invoke('你好啊\n')
- agent = create_react_agent(
- model=llm,
- tools=[],
- prompt='you are a helpful assistant'
- )
- #
- agent.invoke({"messages":[{"role":'user','content':'你好啊'}]})
复制代码 1
2.工具调用
3.记忆
分为
短期记忆 同窗口内的 类似豆包
长期记忆:就是类似于豆包记住的所有信息,例如左侧的聊天记录列表
短期记忆例子 短期记忆 主要是创建的时候 加入一个checkpointer参数, 发送消息的时候加一个config配置 区分是哪一个短期记忆- import uuid
- from typing import TypedDict,Optional, Dict, Any, List
- from langchain_core.messages import HumanMessage, BaseMessage, AIMessage, SystemMessage
- from langchain_core.messages.utils import count_tokens_approximately
- from langchain_core.prompts import PromptTemplate
- from langmem.short_term import SummarizationNode
- from langgraph.checkpoint.memory import InMemorySaver
- from langgraph.graph import StateGraph,END,START
- from langchain_deepseek import ChatDeepSeek
- from langgraph.prebuilt import create_react_agent
- import dotenv
- import asyncio
- import os
- import re
- from loguru import logger
- from pydantic import BaseModel, Field
- from langgraph.prebuilt.chat_agent_executor import AgentState
- dotenv.load_dotenv()
- if __name__ == "__main__":
- llm = ChatDeepSeek(
- model = 'qwen-plus',
- temperature=0.1,
- api_key=os.getenv("DASHSCOPE_API_KEY_1"),
- api_base="https://dashscope.aliyuncs.com/compatible-mode/v1",
- )
- # 1. 初始化检查点
- memory_saver = InMemorySaver()
- class CustomState(AgentState):
- context: dict[str,Any]
- # 3. 总结节点(如果需要保留)
- summarization_node = SummarizationNode(
- token_counter=count_tokens_approximately,
- model=llm,
- max_tokens=384,
- max_summary_tokens=128,
- output_messages_key="llm_input_messages",
- )
- # 4. 创建智能体
- agent = create_react_agent(
- model=llm,
- tools=[], # 如需查询实时信息,需添加工具(如搜索工具)
- checkpointer=memory_saver,
- state_schema=CustomState,
- # debug=True,
- pre_model_hook=summarization_node
- )
- # 5. 调用智能体(使用正确的消息格式)
- config = {"configurable": {"thread_id": "user_session_1"}}
- first_input_text = "你好我叫小明,你是谁"
- cs = agent.invoke({
- "messages": [HumanMessage(content=first_input_text,id=str(uuid.uuid4()))], # 符合BaseMessage类型
- "remaining_steps": 5, # 覆盖默认的3步
- }, config)
- print(cs)
- cs1 = agent.invoke({
- "messages": [
- # HumanMessage(content=first_input_text, id=str(uuid.uuid4())),
- HumanMessage(content="我叫什么名字", id=str(uuid.uuid4()))
- ], # 向同一线程注入上一轮用户自述 + 当前追问
- "remaining_steps": 5, # 覆盖默认的3步
- }, config)
- print(cs1['messages'][-1])
复制代码- import uuid
- from typing import TypedDict,Optional, Dict, Any, List
- from langchain_core.messages import HumanMessage, BaseMessage, AIMessage, SystemMessage
- from langchain_core.messages.utils import count_tokens_approximately
- from langchain_core.prompts import PromptTemplate
- from langmem.short_term import SummarizationNode
- from langgraph.checkpoint.memory import InMemorySaver
- from langgraph.graph import StateGraph, END, START, MessagesState
- from langchain_deepseek import ChatDeepSeek
- from langgraph.prebuilt import create_react_agent
- import dotenv
- import asyncio
- import os
- import re
- from loguru import logger
- from pydantic import BaseModel, Field
- try:
- from app.api.agent.action_task_workflow import AgentState
- except Exception:
- class AgentState(TypedDict):
- pass
- dotenv.load_dotenv()
- if __name__ == "__main__":
- llm = ChatDeepSeek(
- model = 'qwen-plus',
- temperature=0.1,
- api_key=os.getenv("DASHSCOPE_API_KEY_1"),
- api_base="https://dashscope.aliyuncs.com/compatible-mode/v1",
- )
- # 1. 初始化检查点
- memory_saver = InMemorySaver()
- config = {"configurable": {"thread_id": "user_session_1"}}
- def call_model(state:MessagesState):
- res = llm.invoke(state['messages'])
- print(res)
- return {"message":res}
- builder = StateGraph(MessagesState)
- builder.add_node(call_model)
- builder.add_edge(START,'call_model')
- graph = builder.compile(checkpointer=memory_saver)
- # for chunk in graph.stream(
- # {'messages':[{'role':'user',"content":"湖南的省会在哪?"}]},
- # config,
- # stream_mode="values"
- # ):
- # chunk['messages'][-1].pretty_print()
- for chunk in graph.stream(
- {'messages':[{'role':'user',"content":"杭州呢?"}]},
- config,
- stream_mode="values"
- ):
- chunk['messages'][-1].pretty_print()
复制代码 https://www.youtube.com/watch?v=e4HjTWhSH0M&list=PLltdCnbYIN5inyuDDTaUzv_B4W10Ktwz0&index=5
4.langGraph接入mcp
mcp 类似于http协议 借助浏览器等工具 访问远程服务器
mcp的客户端工具:vscode pytharm啊
两种引入方式
sse:基于http去构建的, 他会建立一个长链接,单向的,只允许服务端往客户端推送消息
STOIO,本质就是在客户端执行一个程序 需要nodejs
- # pip install langchain-mcp-adapters
- from langchain_openai import ChatOpenAI
- from langchain_mcp_adapters.client import MultiServerMCPClient
- from langgraph.prebuilt import create_react_agent
- import dotenv
- dotenv.load_dotenv()
- llm = ChatOpenAI()
- client = MultiServerMCPClient(
- {
- "amap-maps":{
- "commaand":"npx",
- "args":[
- '-y','@amap/amap-maps-mcp-server'
- ],
- 'env':{
- "AMAP_MAPS_API_KEY":"xxxxxx"
- },
- "transport":'stdio'
- }
- }
- )
- tools = await client.get_tools()
- agent = create_react_agent(
- model=llm,
- tools=tools
- )
复制代码 3.手写mcp
pip install mcp
在使用的文件下
5.工作流
state 数据全工作流共享 是一个字典数据,
state中对更新数据的方式也可以进行设置
- """构建完整工作流图"""
- workflow = StateGraph(ReactDSLWorkflowState)
- # 添加节点
- workflow.add_node("preprocess", self._preprocess_dsl)
- workflow.add_node("analyze_page", self._analyze_page_type)
- workflow.add_node("parse_styles", self._parse_styles)
- workflow.add_node("generate_code", self._generate_code)
- workflow.add_node("optimize_code", self._optimize_code)
- workflow.add_node("integrate_apis", self._integrate_apis)
- workflow.add_node("handle_error", self._handle_error)
- # 定义边
- workflow.set_entry_point("preprocess")
- workflow.add_conditional_edges(
- "preprocess",
- self._should_continue,
- {"continue": "analyze_page", "error": "handle_error"}
- )
- workflow.add_conditional_edges(
- "analyze_page",
- self._should_continue,
- {"continue": "parse_styles", "error": "handle_error"}
- )
- workflow.add_conditional_edges(
- "parse_styles",
- self._should_continue,
- {"continue": "generate_code", "error": "handle_error"}
- )
- workflow.add_conditional_edges(
- "generate_code",
- self._should_continue,
- {"continue": "optimize_code", "error": "handle_error"}
- )
- workflow.add_conditional_edges(
- "optimize_code",
- self._should_continue_or_skip_api, # 修改这里:根据api_docs判断是否执行API集成
- {"continue": "integrate_apis", "skip": END, "error": "handle_error"}
- )
- workflow.add_edge("integrate_apis", END)
- workflow.add_edge("handle_error", END)
-
-
- workflow.compile()
复制代码 流式举例
带记忆的checkpointer的流式输出
node 节点
其实就是对应一个python函数 state是第一个参数 可以额外传其他参数,返回 return state
缓存机制 还有 循环调用
edge 边 有开始和结束的固定参数
条件边edge
动态路由函数 send 执行多个步骤
Command 合并 不需要一直写边 直接在函数里更决定跳转方向
多工作流
智能体存储redis
- import asyncio
- import json
- import os
- import pickle
- from collections.abc import Sequence
- from typing import TypedDict, Optional, Dict, Any, List, Union
- from langgraph.checkpoint.base import BaseCheckpointSaver, Checkpoint, ChannelVersions, CheckpointTuple
- from loguru import logger
- from langchain_core.messages.utils import count_tokens_approximately
- from langchain_core.prompts import ChatPromptTemplate
- from langchain_deepseek import ChatDeepSeek
- from langmem.short_term import SummarizationNode
- from langgraph.prebuilt import create_react_agent
- import dotenv
- from langgraph.prebuilt.chat_agent_executor import AgentState
- from pydantic import BaseModel, Field
- from app.db.database import get_redis_session_task, get_redis_session
- dotenv.load_dotenv()
- # -------------------------- 数据模型定义(Pydantic)--------------------------
- class StartModel(BaseModel):
- id:int = Field(description="数据id,用于记忆存储的config")
- code:int = Field(description="用户工号,用于记忆存储的config")
- prompt:str = Field(description="提示词")
- enter_str:Optional[str] = Field(description="输入问题")
- is_memory:Optional[bool] = Field(description="是否需要记忆",default=True)
- # -------------------------- 核心模块--------------------------
- class RedisCheckpointSaver(BaseCheckpointSaver):
- """基于Redis的检查点保存器"""
- def __init__(self):
- pass
- def get_tuple(self, config: dict) -> Optional[Checkpoint]:
- """同步获取检查点"""
- try:
- # 在同步环境中调用异步方法
- import asyncio
- # 尝试获取现有的事件循环
- try:
- loop = asyncio.get_running_loop()
- # 如果事件循环已经在运行,我们需要在另一个线程中运行协程
- import concurrent.futures
- with concurrent.futures.ThreadPoolExecutor() as executor:
- future = executor.submit(self._run_async_in_thread, self.aget_tuple(config))
- return future.result(timeout=10.0)
- except RuntimeError:
- # 没有运行中的事件循环
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- result = loop.run_until_complete(self.aget_tuple(config))
- return result
- except Exception as e:
- logger.error(f"Failed to get checkpoint tuple: {str(e)}")
- return None
- async def aget_tuple(self, config: dict) -> Optional[Checkpoint]:
- """异步获取检查点"""
- redis_generator = None
- try:
- # 手动处理异步生成器
- redis_generator = get_redis_session()
- redis = await redis_generator.__anext__() # 获取生成器的第一个值
-
- thread_id = config['configurable']['thread_id']
- key = f"checkpoint:{thread_id}"
- data = await redis.get(key)
- if data:
- # 确保数据能正确反序列化
- try:
- result = pickle.loads(data)
- # 确保返回的是CheckpointTuple对象
- if isinstance(result, CheckpointTuple):
- return result
- elif isinstance(result, dict):
- # 检查是否包含必要的字段
- if 'checkpoint' in result:
- # 确保metadata包含必要的字段
- metadata = result.get('metadata', {})
- if 'step' not in metadata:
- metadata['step'] = 0 # 默认步骤为0
- if 'source' not in metadata:
- metadata['source'] = 'redis' # 默认来源
-
- return CheckpointTuple(
- config=config,
- checkpoint=result['checkpoint'],
- metadata=metadata
- )
- else:
- # 如果是checkpoint对象,创建CheckpointTuple
- return CheckpointTuple(
- config=config,
- checkpoint=result,
- metadata={
- 'step': 0,
- 'source': 'redis'
- }
- )
- except Exception as e:
- logger.error(f"Failed to deserialize checkpoint: {str(e)}")
- # 如果反序列化失败,删除损坏的数据
- await redis.delete(key)
- return None
- except RuntimeError as e:
- if "Event loop is closed" in str(e):
- logger.warning("Event loop is closed, cannot get checkpoint from Redis")
- return None
- else:
- logger.error(f"Failed to get checkpoint tuple from Redis: {str(e)}")
- return None
- except Exception as e:
- logger.error(f"Failed to get checkpoint tuple from Redis: {str(e)}")
- return None
- finally:
- # 确保正确清理异步生成器
- if redis_generator:
- try:
- await redis_generator.aclose()
- except Exception as e:
- logger.error(f"Failed to close redis generator: {str(e)}")
- def put(self, config: dict, checkpoint: Checkpoint, metadata: dict,new_versions:ChannelVersions) -> None:
- """同步保存检查点"""
- try:
- # 在同步环境中调用异步方法
- import asyncio
- # 尝试获取现有的事件循环
- try:
- loop = asyncio.get_running_loop()
- # 如果事件循环已经在运行,直接等待协程
- task = self.aput(config, checkpoint, metadata)
- # 在已运行的循环中创建任务并等待结果
- future = asyncio.run_coroutine_threadsafe(task, loop)
- future.result()
- except RuntimeError:
- # 没有运行中的事件循环
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- loop.run_until_complete(self.aput(config, checkpoint, metadata))
- except Exception as e:
- logger.error(f"Failed to put checkpoint: {str(e)}")
- async def aput(self, config: dict, checkpoint: Checkpoint, metadata: dict) -> None:
- """异步保存检查点"""
- try:
- # redis = self.redis
- async for redis in get_redis_session():
- thread_id = config['configurable']['thread_id']
- key = f"checkpoint:{thread_id}"
- # 确保数据能正确序列化
- try:
- data = pickle.dumps(checkpoint)
- await redis.set(key, data)
- # 设置过期时间,例如1小时
- await redis.expire(key, 3600)
- except Exception as e:
- logger.error(f"Failed to serialize checkpoint: {str(e)}")
- except Exception as e:
- logger.error(f"Failed to save checkpoint to Redis: {str(e)}")
-
- def put_writes(self, config: dict, writes: Sequence[tuple[str, Any]], task_id: str,task_path:str) -> None:
- """保存写入操作"""
- try:
- # 在同步环境中调用异步方法
- import asyncio
- # 尝试获取现有的事件循环
- try:
- loop = asyncio.get_running_loop()
- # 如果事件循环已经在运行,直接等待协程
- task = self.aput_writes(config, writes, task_id)
- # 在已运行的循环中创建任务并等待结果
- future = asyncio.run_coroutine_threadsafe(task, loop)
- future.result()
- except RuntimeError:
- # 没有运行中的事件循环
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- loop.run_until_complete(self.aput_writes(config, writes, task_id))
- except Exception as e:
- logger.error(f"Failed to put writes: {str(e)}")
- async def aput_writes(self, config: dict, writes: Sequence[tuple[str, Any]], task_id: str) -> None:
- """异步保存写入操作"""
- try:
- # redis = self.redis
- async for redis in get_redis_session():
- thread_id = config['configurable']['thread_id']
- # 为写入操作创建一个唯一的键
- key = f"checkpoint_writes:{thread_id}:{task_id}"
- # 确保数据能正确序列化
- try:
- data = pickle.dumps(writes)
- await redis.set(key, data)
- # 设置过期时间,例如1小时
- await redis.expire(key, 3600)
- except Exception as e:
- logger.error(f"Failed to serialize writes: {str(e)}")
- except Exception as e:
- logger.error(f"Failed to save writes to Redis: {str(e)}")
- def _run_async_in_thread(self, coro):
- """在线程中运行异步协程"""
- import asyncio
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- try:
- return loop.run_until_complete(coro)
- finally:
- loop.close()
- def create_agent(params:StartModel):
- """
- 创建智能体
- prompt:提示词
- is_memory:是否需要记忆
- """
- llm = ChatDeepSeek(
- model='qwen3-max',
- temperature=0.1,
- api_key=os.getenv("DASHSCOPE_API_KEY_1"),
- api_base=os.getenv("TONGYI_API_BASE"),
- )
- # 1. 初始化检查点
- # memory_saver = InMemorySaver()
- redis_saver = RedisCheckpointSaver() if params.is_memory else None
- class CustomState(AgentState):
- context: dict[str, Any]
- # 3. 总结节点(如果需要保留)
- summarization_node = SummarizationNode(
- token_counter=count_tokens_approximately,
- model=llm,
- max_tokens=384,
- max_summary_tokens=128,
- output_messages_key="llm_input_messages",
- )
- # 4. 创建智能体
- agent = create_react_agent(
- model=llm,
- tools=[], # 如需查询实时信息,需添加工具(如搜索工具)
- checkpointer= redis_saver ,
- state_schema=CustomState,
- pre_model_hook=summarization_node,
- )
- return agent
- async def start_ask(params:StartModel):
- """启动智能体"""
- logger.info(f"启动智能体...")
- agent = create_agent(params)
- config = {"configurable": {"thread_id":str( params.id)+"_"+str(params.code)}}
- # 模版
- prompt_template = ChatPromptTemplate.from_template("""
- {prompt}
- 用户问题: {input}
-
- """).format_messages(
- input=params.enter_str,
- prompt = params.prompt
- )[0]
- # 智能体流失输出
- logger.info(f"执行Agent...")
- return agent.stream(
- input={"messages": [prompt_template]},
- stream_mode="messages",
- config=config
- )
- if __name__ == "__main__":
- prompt = " "
- init_date = {"id":2,"prompt":prompt,"code":"60011305","enter_str":'你好,我叫小明'}
- async def main():
- graph_agent = await start_ask(StartModel.model_validate(init_date))
- for chunk in graph_agent:
- print(chunk)
- print(chunk[0].content)
- asyncio.run(main())
复制代码 原文地址:https://blog.csdn.net/qq_38935512/article/details/153066855 |