作者:CSDN博客
博主环境:- python==3.12.9
- langgraph==0.5.4
- langgraph-checkpoint-postgres==2.0.23
- psycopg==3.2.10
复制代码 一、初始化
初始化的两种方式
方法一:直接对PostgreSaver类进行初始化- from psycopg import Connection
- from langgraph.checkpoint.postgres import PostgresSaver
- db_uri = "postgres://<user>:<password>@<host>:<port>/<database>"
- conn = Connection.connect(db_uri)
- conn.autocommit = True # 关键:设置连接为自动提交模式,否则会报错psycopg.errors.ActiveSqlTransaction: CREATE INDEX CONCURRENTLY 无法在事物块中运行
- checkpointer = PostgresSaver(conn)
- checkpointer.setup()
复制代码 方法二:使用PostgreSaver.from_conn_string方法。- from langgraph.checkpoint.postgres import PostgresSaver
- db_uri = "postgres://<user>:<password>@<host>:<port>/<database>"
- with PostgresSaver.from_conn_string(db_uri) as checkpointer:
- checkpointer.setup()
复制代码 初始化时指定schema
langgraph.checkpoint.postgres初始化时默认在public schema下创建四张表
"""CREATE TABLE IF NOT EXISTS checkpoint_migrations ( v INTEGER PRIMARY KEY);""", """CREATE TABLE IF NOT EXISTS checkpoints ( thread_id TEXT NOT NULL, checkpoint_ns TEXT NOT NULL DEFAULT '', checkpoint_id TEXT NOT NULL, parent_checkpoint_id TEXT, type TEXT, checkpoint JSONB NOT NULL, metadata JSONB NOT NULL DEFAULT '{}', PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id));""", """CREATE TABLE IF NOT EXISTS checkpoint_blobs ( thread_id TEXT NOT NULL, checkpoint_ns TEXT NOT NULL DEFAULT '', channel TEXT NOT NULL, version TEXT NOT NULL, type TEXT NOT NULL, blob BYTEA, PRIMARY KEY (thread_id, checkpoint_ns, channel, version));""", """CREATE TABLE IF NOT EXISTS checkpoint_writes ( thread_id TEXT NOT NULL, checkpoint_ns TEXT NOT NULL DEFAULT '', checkpoint_id TEXT NOT NULL, task_id TEXT NOT NULL, idx INTEGER NOT NULL, channel TEXT NOT NULL, type TEXT, blob BYTEA NOT NULL, PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx));""", 如果要指定schema,可以在db_uri中加上option- db_uri = "postgres://<user>:<password>@<host>:<port>/<database>?options=-csearch_path%3D<your_schema>,public"
- # 假设所有参数都为test
- # db_uri = "postgres://<user>:<password>@<host>:<port>/<database>?options=-csearch_path%3Dtest,public"
复制代码 二、异步实现
如果使用图流程的异步输出,需要搭配使用AsyncPostgresSaver,否则会报错 "Synchronous calls to AsyncPostgresSaver are only allowed from a different thread. From the main thread, use the async interface."- from langgraph.graph import StateGraph
- workflow = StateGraph(MainState)
- workflow.add_node("node1", execute_node_func)
- # ......
- # 使用异步执行模式
- async def generate_async_response(*args, **kwargs):
- async with AsyncPostgresSaver.from_conn_string(db_uri) as async_checkpointer:
- # 编译工作流
- graph = workflow.compile(checkpointer=async_checkpointer)
- # 忽略其他一些细节
- inputs = {
- # 入参
- }
- config = {
- "configurable": {
- "thread_id": "123456",
- }
- }
- # 调用流式输出
- async for output in graph.astream(inputs, config, subgraphs=True, stream_mode="updates"):
- # do_something
- pass
复制代码 注意,如果如果使用了AsyncPostgresSaver,graph的同步方法不能使用了,比如get_state:- # 原本的方法不可用了
- def is_thread_interrupted(graph, thread_id: str):
- state = graph.get_state({"configurable": {"thread_id": thread_id}})
- return state.next not in [(), ("__end__",)]
- # 要换成这个
- async def is_thread_interrupted(graph, thread_id: str):
- state = await graph.aget_state({"configurable": {"thread_id": thread_id}})
- return state.next not in [(), ("__end__",)]
- # 对应的使用
- async def generate_async_response(*args, **kwargs):
- async with AsyncPostgresSaver.from_conn_string(db_uri) as async_checkpointer:
- # 编译工作流
- graph = workflow.compile(checkpointer=async_checkpointer)
- inputs = {
- # 入参
- }
- config = {
- "configurable": {
- "thread_id": "123456",
- }
- }
- if await is_thread_interrupted(graph, thread_id=thread_id):
- inputs = Command(resume=inputs)
- # 调用流式输出
- async for output in graph.astream(inputs, config, subgraphs=True, stream_mode="updates"):
- # do_something
- pass
复制代码 原文地址:https://blog.csdn.net/baobaodarenya/article/details/151855700 |