开启左侧

web前端学习LangGraph

[复制链接]
创想小编 发表于 昨天 22:39 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题
作者:CSDN博客
1.简介LangGraph

多智能体架构,
LangGraph 和 langChain的区别
LangGraph和 LngSmith 是LangGraph中的一个生态组件
LngSmith 主要用来调试
LangGraph 主要用来工作流 整合多个智能体
LangGraph Platform 提供了一个云环境服务部署
LangGraph最主要的是封装了很多基础功能,让我们只要关注Agent本身就行了
2.agent

langGraph 构建出来的智能体 类比一个好员工,听话,并且能及时反馈信息
  1. from langgraph.prebuilt import create_react_agent
  2. from langchain_openai import ChatOpenAI
  3. import dotenv
  4. dotenv.load_dotenv()
  5. llm = ChatOpenAI()
  6. # llm.invoke('你好啊\n')
  7. agent = create_react_agent(
  8.     model=llm,
  9.     tools=[],
  10.     prompt='you are a helpful assistant'
  11. )
  12. #
  13. agent.invoke({"messages":[{"role":'user','content':'你好啊'}]})
复制代码
1

web前端学习LangGraph-1.png


2.工具调用

web前端学习LangGraph-2.png


3.记忆

分为
短期记忆 同窗口内的 类似豆包
长期记忆:就是类似于豆包记住的所有信息,例如左侧的聊天记录列表
短期记忆例子 短期记忆 主要是创建的时候 加入一个checkpointer参数, 发送消息的时候加一个config配置 区分是哪一个短期记忆
  1. import uuid
  2. from typing import TypedDict,Optional, Dict, Any, List
  3. from langchain_core.messages import HumanMessage, BaseMessage, AIMessage, SystemMessage
  4. from langchain_core.messages.utils import count_tokens_approximately
  5. from langchain_core.prompts import PromptTemplate
  6. from langmem.short_term import SummarizationNode
  7. from langgraph.checkpoint.memory import InMemorySaver
  8. from langgraph.graph import StateGraph,END,START
  9. from langchain_deepseek import ChatDeepSeek
  10. from langgraph.prebuilt import create_react_agent
  11. import dotenv
  12. import asyncio
  13. import os
  14. import re
  15. from loguru import logger
  16. from pydantic import BaseModel, Field
  17. from langgraph.prebuilt.chat_agent_executor import AgentState
  18. dotenv.load_dotenv()
  19. if __name__ == "__main__":
  20.     llm = ChatDeepSeek(
  21.         model = 'qwen-plus',
  22.         temperature=0.1,
  23.         api_key=os.getenv("DASHSCOPE_API_KEY_1"),
  24.         api_base="https://dashscope.aliyuncs.com/compatible-mode/v1",
  25.     )
  26.     # 1. 初始化检查点
  27.     memory_saver = InMemorySaver()
  28.     class CustomState(AgentState):
  29.         context: dict[str,Any]
  30.     # 3. 总结节点(如果需要保留)
  31.     summarization_node = SummarizationNode(
  32.         token_counter=count_tokens_approximately,
  33.         model=llm,
  34.         max_tokens=384,
  35.         max_summary_tokens=128,
  36.         output_messages_key="llm_input_messages",
  37.     )
  38.     # 4. 创建智能体
  39.     agent = create_react_agent(
  40.         model=llm,
  41.         tools=[],  # 如需查询实时信息,需添加工具(如搜索工具)
  42.         checkpointer=memory_saver,
  43.         state_schema=CustomState,
  44.         # debug=True,
  45.         pre_model_hook=summarization_node
  46.     )
  47.     # 5. 调用智能体(使用正确的消息格式)
  48.     config = {"configurable": {"thread_id": "user_session_1"}}
  49.     first_input_text = "你好我叫小明,你是谁"
  50.     cs = agent.invoke({
  51.         "messages": [HumanMessage(content=first_input_text,id=str(uuid.uuid4()))],  # 符合BaseMessage类型
  52.         "remaining_steps": 5,  # 覆盖默认的3步
  53.     }, config)
  54.     print(cs)
  55.     cs1 = agent.invoke({
  56.         "messages": [
  57.             # HumanMessage(content=first_input_text, id=str(uuid.uuid4())),
  58.             HumanMessage(content="我叫什么名字", id=str(uuid.uuid4()))
  59.         ],  # 向同一线程注入上一轮用户自述 + 当前追问
  60.         "remaining_steps": 5,  # 覆盖默认的3步
  61.     }, config)
  62.     print(cs1['messages'][-1])
复制代码
  1. import uuid
  2. from typing import TypedDict,Optional, Dict, Any, List
  3. from langchain_core.messages import HumanMessage, BaseMessage, AIMessage, SystemMessage
  4. from langchain_core.messages.utils import count_tokens_approximately
  5. from langchain_core.prompts import PromptTemplate
  6. from langmem.short_term import SummarizationNode
  7. from langgraph.checkpoint.memory import InMemorySaver
  8. from langgraph.graph import StateGraph, END, START, MessagesState
  9. from langchain_deepseek import ChatDeepSeek
  10. from langgraph.prebuilt import create_react_agent
  11. import dotenv
  12. import asyncio
  13. import os
  14. import re
  15. from loguru import logger
  16. from pydantic import BaseModel, Field
  17. try:
  18.     from app.api.agent.action_task_workflow import AgentState
  19. except Exception:
  20.     class AgentState(TypedDict):
  21.         pass
  22. dotenv.load_dotenv()
  23. if __name__ == "__main__":
  24.     llm = ChatDeepSeek(
  25.         model = 'qwen-plus',
  26.         temperature=0.1,
  27.         api_key=os.getenv("DASHSCOPE_API_KEY_1"),
  28.         api_base="https://dashscope.aliyuncs.com/compatible-mode/v1",
  29.     )
  30.     # 1. 初始化检查点
  31.     memory_saver = InMemorySaver()
  32.     config = {"configurable": {"thread_id": "user_session_1"}}
  33.     def call_model(state:MessagesState):
  34.         res = llm.invoke(state['messages'])
  35.         print(res)
  36.         return {"message":res}
  37.     builder = StateGraph(MessagesState)
  38.     builder.add_node(call_model)
  39.     builder.add_edge(START,'call_model')
  40.     graph = builder.compile(checkpointer=memory_saver)
  41.     # for chunk in graph.stream(
  42.     #     {'messages':[{'role':'user',"content":"湖南的省会在哪?"}]},
  43.     #     config,
  44.     #     stream_mode="values"
  45.     # ):
  46.     #     chunk['messages'][-1].pretty_print()
  47.     for chunk in graph.stream(
  48.         {'messages':[{'role':'user',"content":"杭州呢?"}]},
  49.         config,
  50.         stream_mode="values"
  51.     ):
  52.         chunk['messages'][-1].pretty_print()
复制代码
https://www.youtube.com/watch?v=e4HjTWhSH0M&list=PLltdCnbYIN5inyuDDTaUzv_B4W10Ktwz0&index=5
4.langGraph接入mcp

mcp 类似于http协议 借助浏览器等工具 访问远程服务器
mcp的客户端工具:vscode pytharm啊
两种引入方式
sse:基于http去构建的, 他会建立一个长链接,单向的,只允许服务端往客户端推送消息

web前端学习LangGraph-3.png

STOIO,本质就是在客户端执行一个程序 需要nodejs

web前端学习LangGraph-4.png

web前端学习LangGraph-5.png

  1. # pip install langchain-mcp-adapters
  2. from langchain_openai import ChatOpenAI
  3. from langchain_mcp_adapters.client  import MultiServerMCPClient
  4. from langgraph.prebuilt import create_react_agent
  5. import dotenv
  6. dotenv.load_dotenv()
  7. llm = ChatOpenAI()
  8. client = MultiServerMCPClient(
  9.     {
  10.         "amap-maps":{
  11.             "commaand":"npx",
  12.             "args":[
  13.                 '-y','@amap/amap-maps-mcp-server'
  14.             ],
  15.             'env':{
  16.                 "AMAP_MAPS_API_KEY":"xxxxxx"
  17.             },
  18.             "transport":'stdio'
  19.         }
  20.     }
  21. )
  22. tools = await client.get_tools()
  23. agent = create_react_agent(
  24.     model=llm,
  25.     tools=tools
  26. )
复制代码
3.手写mcp

pip install mcp
web前端学习LangGraph-6.png

在使用的文件下

web前端学习LangGraph-7.png

web前端学习LangGraph-8.png


5.工作流

state 数据全工作流共享 是一个字典数据,

state中对更新数据的方式也可以进行设置

web前端学习LangGraph-9.png


  1. """构建完整工作流图"""
  2.         workflow = StateGraph(ReactDSLWorkflowState)
  3.         # 添加节点
  4.         workflow.add_node("preprocess", self._preprocess_dsl)
  5.         workflow.add_node("analyze_page", self._analyze_page_type)
  6.         workflow.add_node("parse_styles", self._parse_styles)
  7.         workflow.add_node("generate_code", self._generate_code)
  8.         workflow.add_node("optimize_code", self._optimize_code)
  9.         workflow.add_node("integrate_apis", self._integrate_apis)
  10.         workflow.add_node("handle_error", self._handle_error)
  11.         # 定义边
  12.         workflow.set_entry_point("preprocess")
  13.         workflow.add_conditional_edges(
  14.             "preprocess",
  15.             self._should_continue,
  16.             {"continue": "analyze_page", "error": "handle_error"}
  17.         )
  18.         workflow.add_conditional_edges(
  19.             "analyze_page",
  20.             self._should_continue,
  21.             {"continue": "parse_styles", "error": "handle_error"}
  22.         )
  23.         workflow.add_conditional_edges(
  24.             "parse_styles",
  25.             self._should_continue,
  26.             {"continue": "generate_code", "error": "handle_error"}
  27.         )
  28.         workflow.add_conditional_edges(
  29.             "generate_code",
  30.             self._should_continue,
  31.             {"continue": "optimize_code", "error": "handle_error"}
  32.         )
  33.         workflow.add_conditional_edges(
  34.             "optimize_code",
  35.             self._should_continue_or_skip_api,  # 修改这里:根据api_docs判断是否执行API集成
  36.             {"continue": "integrate_apis", "skip": END, "error": "handle_error"}
  37.         )
  38.         workflow.add_edge("integrate_apis", END)
  39.         workflow.add_edge("handle_error", END)
  40.                
  41.                
  42.                 workflow.compile()
复制代码
流式举例

web前端学习LangGraph-10.png

带记忆的checkpointer的流式输出

web前端学习LangGraph-11.png


web前端学习LangGraph-12.png


node 节点

其实就是对应一个python函数 state是第一个参数 可以额外传其他参数,返回 return state
缓存机制 还有 循环调用
web前端学习LangGraph-13.png


web前端学习LangGraph-14.png


edge 边 有开始和结束的固定参数

web前端学习LangGraph-15.png


条件边edge

web前端学习LangGraph-16.png


web前端学习LangGraph-17.png


动态路由函数 send 执行多个步骤
web前端学习LangGraph-18.png


Command 合并 不需要一直写边 直接在函数里更决定跳转方向

web前端学习LangGraph-19.png


多工作流

web前端学习LangGraph-20.png


智能体存储redis
  1. import asyncio
  2. import json
  3. import os
  4. import pickle
  5. from collections.abc import Sequence
  6. from typing import TypedDict, Optional, Dict, Any, List, Union
  7. from langgraph.checkpoint.base import BaseCheckpointSaver, Checkpoint, ChannelVersions, CheckpointTuple
  8. from loguru import logger
  9. from langchain_core.messages.utils import count_tokens_approximately
  10. from langchain_core.prompts import ChatPromptTemplate
  11. from langchain_deepseek import ChatDeepSeek
  12. from langmem.short_term import SummarizationNode
  13. from langgraph.prebuilt import create_react_agent
  14. import dotenv
  15. from langgraph.prebuilt.chat_agent_executor import AgentState
  16. from pydantic import BaseModel, Field
  17. from app.db.database import get_redis_session_task, get_redis_session
  18. dotenv.load_dotenv()
  19. # -------------------------- 数据模型定义(Pydantic)--------------------------
  20. class StartModel(BaseModel):
  21.     id:int = Field(description="数据id,用于记忆存储的config")
  22.     code:int = Field(description="用户工号,用于记忆存储的config")
  23.     prompt:str = Field(description="提示词")
  24.     enter_str:Optional[str] = Field(description="输入问题")
  25.     is_memory:Optional[bool] = Field(description="是否需要记忆",default=True)
  26. # -------------------------- 核心模块--------------------------
  27. class RedisCheckpointSaver(BaseCheckpointSaver):
  28.     """基于Redis的检查点保存器"""
  29.     def __init__(self):
  30.         pass
  31.     def get_tuple(self, config: dict) -> Optional[Checkpoint]:
  32.         """同步获取检查点"""
  33.         try:
  34.             # 在同步环境中调用异步方法
  35.             import asyncio
  36.             # 尝试获取现有的事件循环
  37.             try:
  38.                 loop = asyncio.get_running_loop()
  39.                 # 如果事件循环已经在运行,我们需要在另一个线程中运行协程
  40.                 import concurrent.futures
  41.                 with concurrent.futures.ThreadPoolExecutor() as executor:
  42.                     future = executor.submit(self._run_async_in_thread, self.aget_tuple(config))
  43.                     return future.result(timeout=10.0)
  44.             except RuntimeError:
  45.                 # 没有运行中的事件循环
  46.                 loop = asyncio.new_event_loop()
  47.                 asyncio.set_event_loop(loop)
  48.                 result = loop.run_until_complete(self.aget_tuple(config))
  49.                 return result
  50.         except Exception as e:
  51.             logger.error(f"Failed to get checkpoint tuple: {str(e)}")
  52.             return None
  53.     async def aget_tuple(self, config: dict) -> Optional[Checkpoint]:
  54.         """异步获取检查点"""
  55.         redis_generator = None
  56.         try:
  57.             # 手动处理异步生成器
  58.             redis_generator = get_redis_session()
  59.             redis = await redis_generator.__anext__()  # 获取生成器的第一个值
  60.             
  61.             thread_id = config['configurable']['thread_id']
  62.             key = f"checkpoint:{thread_id}"
  63.             data = await redis.get(key)
  64.             if data:
  65.                 # 确保数据能正确反序列化
  66.                 try:
  67.                     result = pickle.loads(data)
  68.                     # 确保返回的是CheckpointTuple对象
  69.                     if isinstance(result, CheckpointTuple):
  70.                         return result
  71.                     elif isinstance(result, dict):
  72.                         # 检查是否包含必要的字段
  73.                         if 'checkpoint' in result:
  74.                             # 确保metadata包含必要的字段
  75.                             metadata = result.get('metadata', {})
  76.                             if 'step' not in metadata:
  77.                                 metadata['step'] = 0  # 默认步骤为0
  78.                             if 'source' not in metadata:
  79.                                 metadata['source'] = 'redis'  # 默认来源
  80.                                 
  81.                             return CheckpointTuple(
  82.                                 config=config,
  83.                                 checkpoint=result['checkpoint'],
  84.                                 metadata=metadata
  85.                             )
  86.                         else:
  87.                             # 如果是checkpoint对象,创建CheckpointTuple
  88.                             return CheckpointTuple(
  89.                                 config=config,
  90.                                 checkpoint=result,
  91.                                 metadata={
  92.                                     'step': 0,
  93.                                     'source': 'redis'
  94.                                 }
  95.                             )
  96.                 except Exception as e:
  97.                     logger.error(f"Failed to deserialize checkpoint: {str(e)}")
  98.                     # 如果反序列化失败,删除损坏的数据
  99.                     await redis.delete(key)
  100.             return None
  101.         except RuntimeError as e:
  102.             if "Event loop is closed" in str(e):
  103.                 logger.warning("Event loop is closed, cannot get checkpoint from Redis")
  104.                 return None
  105.             else:
  106.                 logger.error(f"Failed to get checkpoint tuple from Redis: {str(e)}")
  107.                 return None
  108.         except Exception as e:
  109.             logger.error(f"Failed to get checkpoint tuple from Redis: {str(e)}")
  110.             return None
  111.         finally:
  112.             # 确保正确清理异步生成器
  113.             if redis_generator:
  114.                 try:
  115.                     await redis_generator.aclose()
  116.                 except Exception as e:
  117.                     logger.error(f"Failed to close redis generator: {str(e)}")
  118.     def put(self, config: dict, checkpoint: Checkpoint, metadata: dict,new_versions:ChannelVersions) -> None:
  119.         """同步保存检查点"""
  120.         try:
  121.             # 在同步环境中调用异步方法
  122.             import asyncio
  123.             # 尝试获取现有的事件循环
  124.             try:
  125.                 loop = asyncio.get_running_loop()
  126.                 # 如果事件循环已经在运行,直接等待协程
  127.                 task = self.aput(config, checkpoint, metadata)
  128.                 # 在已运行的循环中创建任务并等待结果
  129.                 future = asyncio.run_coroutine_threadsafe(task, loop)
  130.                 future.result()
  131.             except RuntimeError:
  132.                 # 没有运行中的事件循环
  133.                 loop = asyncio.new_event_loop()
  134.                 asyncio.set_event_loop(loop)
  135.                 loop.run_until_complete(self.aput(config, checkpoint, metadata))
  136.         except Exception as e:
  137.             logger.error(f"Failed to put checkpoint: {str(e)}")
  138.     async def aput(self, config: dict, checkpoint: Checkpoint, metadata: dict) -> None:
  139.         """异步保存检查点"""
  140.         try:
  141.             # redis = self.redis
  142.             async for redis in get_redis_session():
  143.                 thread_id = config['configurable']['thread_id']
  144.                 key = f"checkpoint:{thread_id}"
  145.                 # 确保数据能正确序列化
  146.                 try:
  147.                     data = pickle.dumps(checkpoint)
  148.                     await redis.set(key, data)
  149.                     # 设置过期时间,例如1小时
  150.                     await redis.expire(key, 3600)
  151.                 except Exception as e:
  152.                     logger.error(f"Failed to serialize checkpoint: {str(e)}")
  153.         except Exception as e:
  154.             logger.error(f"Failed to save checkpoint to Redis: {str(e)}")
  155.    
  156.     def put_writes(self, config: dict, writes: Sequence[tuple[str, Any]], task_id: str,task_path:str) -> None:
  157.         """保存写入操作"""
  158.         try:
  159.             # 在同步环境中调用异步方法
  160.             import asyncio
  161.             # 尝试获取现有的事件循环
  162.             try:
  163.                 loop = asyncio.get_running_loop()
  164.                 # 如果事件循环已经在运行,直接等待协程
  165.                 task = self.aput_writes(config, writes, task_id)
  166.                 # 在已运行的循环中创建任务并等待结果
  167.                 future = asyncio.run_coroutine_threadsafe(task, loop)
  168.                 future.result()
  169.             except RuntimeError:
  170.                 # 没有运行中的事件循环
  171.                 loop = asyncio.new_event_loop()
  172.                 asyncio.set_event_loop(loop)
  173.                 loop.run_until_complete(self.aput_writes(config, writes, task_id))
  174.         except Exception as e:
  175.             logger.error(f"Failed to put writes: {str(e)}")
  176.     async def aput_writes(self, config: dict, writes: Sequence[tuple[str, Any]], task_id: str) -> None:
  177.         """异步保存写入操作"""
  178.         try:
  179.             # redis = self.redis
  180.             async for redis in get_redis_session():
  181.                 thread_id = config['configurable']['thread_id']
  182.                 # 为写入操作创建一个唯一的键
  183.                 key = f"checkpoint_writes:{thread_id}:{task_id}"
  184.                 # 确保数据能正确序列化
  185.                 try:
  186.                     data = pickle.dumps(writes)
  187.                     await redis.set(key, data)
  188.                     # 设置过期时间,例如1小时
  189.                     await redis.expire(key, 3600)
  190.                 except Exception as e:
  191.                     logger.error(f"Failed to serialize writes: {str(e)}")
  192.         except Exception as e:
  193.             logger.error(f"Failed to save writes to Redis: {str(e)}")
  194.     def _run_async_in_thread(self, coro):
  195.         """在线程中运行异步协程"""
  196.         import asyncio
  197.         loop = asyncio.new_event_loop()
  198.         asyncio.set_event_loop(loop)
  199.         try:
  200.             return loop.run_until_complete(coro)
  201.         finally:
  202.             loop.close()
  203. def create_agent(params:StartModel):
  204.     """
  205.     创建智能体
  206.         prompt:提示词
  207.         is_memory:是否需要记忆
  208.     """
  209.     llm = ChatDeepSeek(
  210.         model='qwen3-max',
  211.         temperature=0.1,
  212.         api_key=os.getenv("DASHSCOPE_API_KEY_1"),
  213.         api_base=os.getenv("TONGYI_API_BASE"),
  214.     )
  215.     # 1. 初始化检查点
  216.     # memory_saver = InMemorySaver()
  217.     redis_saver = RedisCheckpointSaver() if params.is_memory else None
  218.     class CustomState(AgentState):
  219.         context: dict[str, Any]
  220.     # 3. 总结节点(如果需要保留)
  221.     summarization_node = SummarizationNode(
  222.         token_counter=count_tokens_approximately,
  223.         model=llm,
  224.         max_tokens=384,
  225.         max_summary_tokens=128,
  226.         output_messages_key="llm_input_messages",
  227.     )
  228.     # 4. 创建智能体
  229.     agent = create_react_agent(
  230.         model=llm,
  231.         tools=[],  # 如需查询实时信息,需添加工具(如搜索工具)
  232.         checkpointer= redis_saver ,
  233.         state_schema=CustomState,
  234.         pre_model_hook=summarization_node,
  235.     )
  236.     return agent
  237. async def start_ask(params:StartModel):
  238.     """启动智能体"""
  239.     logger.info(f"启动智能体...")
  240.     agent = create_agent(params)
  241.     config = {"configurable": {"thread_id":str( params.id)+"_"+str(params.code)}}
  242.     # 模版
  243.     prompt_template = ChatPromptTemplate.from_template("""
  244.         {prompt}
  245.         用户问题: {input}
  246.    
  247.         """).format_messages(
  248.         input=params.enter_str,
  249.         prompt = params.prompt
  250.     )[0]
  251.     # 智能体流失输出
  252.     logger.info(f"执行Agent...")
  253.     return agent.stream(
  254.             input={"messages":  [prompt_template]},
  255.             stream_mode="messages",
  256.             config=config
  257.     )
  258. if __name__ == "__main__":
  259.     prompt = " "
  260.     init_date = {"id":2,"prompt":prompt,"code":"60011305","enter_str":'你好,我叫小明'}
  261.     async def main():
  262.         graph_agent = await start_ask(StartModel.model_validate(init_date))
  263.         for chunk in  graph_agent:
  264.             print(chunk)
  265.             print(chunk[0].content)
  266.     asyncio.run(main())
复制代码
原文地址:https://blog.csdn.net/qq_38935512/article/details/153066855
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

发布主题
阅读排行更多+

Powered by Discuz! X3.4© 2001-2013 Discuz Team.( 京ICP备17022993号-3 )