AI创想

标题: LangGraph使用PostgreSaver持久化,初始化与异步实现的一些细节 [打印本页]

作者: AI小编    时间: 昨天 23:30
标题: LangGraph使用PostgreSaver持久化,初始化与异步实现的一些细节
作者:CSDN博客
博主环境:
  1. python==3.12.9
  2. langgraph==0.5.4
  3. langgraph-checkpoint-postgres==2.0.23
  4. psycopg==3.2.10
复制代码
一、初始化

初始化的两种方式

方法一:直接对PostgreSaver类进行初始化
  1. from psycopg import Connection
  2. from langgraph.checkpoint.postgres import PostgresSaver
  3. db_uri = "postgres://<user>:<password>@<host>:<port>/<database>"
  4. conn = Connection.connect(db_uri)
  5. conn.autocommit = True    # 关键:设置连接为自动提交模式,否则会报错psycopg.errors.ActiveSqlTransaction: CREATE INDEX CONCURRENTLY 无法在事物块中运行
  6. checkpointer = PostgresSaver(conn)
  7. checkpointer.setup()
复制代码
方法二:使用PostgreSaver.from_conn_string方法。
  1. from langgraph.checkpoint.postgres import PostgresSaver
  2. db_uri = "postgres://<user>:<password>@<host>:<port>/<database>"
  3. with PostgresSaver.from_conn_string(db_uri) as checkpointer:
  4.     checkpointer.setup()
复制代码
初始化时指定schema

langgraph.checkpoint.postgres初始化时默认在public schema下创建四张表
    """CREATE TABLE IF NOT EXISTS checkpoint_migrations (    v INTEGER PRIMARY KEY);""",    """CREATE TABLE IF NOT EXISTS checkpoints (    thread_id TEXT NOT NULL,    checkpoint_ns TEXT NOT NULL DEFAULT '',    checkpoint_id TEXT NOT NULL,    parent_checkpoint_id TEXT,    type TEXT,    checkpoint JSONB NOT NULL,    metadata JSONB NOT NULL DEFAULT '{}',    PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id));""",    """CREATE TABLE IF NOT EXISTS checkpoint_blobs (    thread_id TEXT NOT NULL,    checkpoint_ns TEXT NOT NULL DEFAULT '',    channel TEXT NOT NULL,    version TEXT NOT NULL,    type TEXT NOT NULL,    blob BYTEA,    PRIMARY KEY (thread_id, checkpoint_ns, channel, version));""",    """CREATE TABLE IF NOT EXISTS checkpoint_writes (    thread_id TEXT NOT NULL,    checkpoint_ns TEXT NOT NULL DEFAULT '',    checkpoint_id TEXT NOT NULL,    task_id TEXT NOT NULL,    idx INTEGER NOT NULL,    channel TEXT NOT NULL,    type TEXT,    blob BYTEA NOT NULL,    PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx));""",
如果要指定schema,可以在db_uri中加上option
  1. db_uri = "postgres://<user>:<password>@<host>:<port>/<database>?options=-csearch_path%3D<your_schema>,public"
  2. # 假设所有参数都为test
  3. # db_uri = "postgres://<user>:<password>@<host>:<port>/<database>?options=-csearch_path%3Dtest,public"
复制代码
二、异步实现

如果使用图流程的异步输出,需要搭配使用AsyncPostgresSaver,否则会报错 "Synchronous calls to AsyncPostgresSaver are only allowed from a different thread. From the main thread, use the async interface."
  1. from langgraph.graph import StateGraph
  2. workflow = StateGraph(MainState)
  3. workflow.add_node("node1", execute_node_func)
  4. # ......
  5. # 使用异步执行模式
  6. async def generate_async_response(*args, **kwargs):
  7.     async with AsyncPostgresSaver.from_conn_string(db_uri) as async_checkpointer:
  8.         # 编译工作流
  9.         graph = workflow.compile(checkpointer=async_checkpointer)
  10.         # 忽略其他一些细节
  11.         inputs = {
  12.             # 入参
  13.         }
  14.         config = {
  15.             "configurable": {
  16.                 "thread_id": "123456",
  17.             }
  18.         }
  19.         # 调用流式输出
  20.         async for output in graph.astream(inputs, config, subgraphs=True, stream_mode="updates"):
  21.             # do_something
  22.             pass
复制代码
注意,如果如果使用了AsyncPostgresSaver,graph的同步方法不能使用了,比如get_state:
  1. # 原本的方法不可用了
  2. def is_thread_interrupted(graph, thread_id: str):
  3.     state = graph.get_state({"configurable": {"thread_id": thread_id}})
  4.     return state.next not in [(), ("__end__",)]
  5. # 要换成这个
  6. async def is_thread_interrupted(graph, thread_id: str):
  7.     state = await graph.aget_state({"configurable": {"thread_id": thread_id}})
  8.     return state.next not in [(), ("__end__",)]
  9. # 对应的使用
  10. async def generate_async_response(*args, **kwargs):
  11.     async with AsyncPostgresSaver.from_conn_string(db_uri) as async_checkpointer:
  12.         # 编译工作流
  13.         graph = workflow.compile(checkpointer=async_checkpointer)
  14.         inputs = {
  15.             # 入参
  16.         }
  17.         config = {
  18.             "configurable": {
  19.                 "thread_id": "123456",
  20.             }
  21.         }
  22.         if await is_thread_interrupted(graph, thread_id=thread_id):
  23.             inputs = Command(resume=inputs)
  24.         # 调用流式输出
  25.         async for output in graph.astream(inputs, config, subgraphs=True, stream_mode="updates"):
  26.             # do_something
  27.             pass
复制代码
原文地址:https://blog.csdn.net/baobaodarenya/article/details/151855700




欢迎光临 AI创想 (https://www.llms-ai.com/) Powered by Discuz! X3.4