import os4)安装依赖
import json
import getpass
from typing import Annotated, Literal
from typing_extensions import TypedDict
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langgraph.config import get_stream_writer
from langchain_tavily import TavilySearch
from langgraph.prebuilt import ToolNode, tools_condition
os.environ["TAVILY_API_KEY"] = "tvly-……"
tool = TavilySearch(max_results=2)
tools = [tool]
'''
Define Langgraph
'''
def generate_custom_stream(type: Literal["think","normal"], content: str):
content = "\n"+content+"\n"
custom_stream_writer = get_stream_writer()
return custom_stream_writer({type:content})
class State(TypedDict):
messages: Annotated[list, add_messages]
llm = ChatOpenAI(
model = 'qwen-plus',
api_key = "sk-3……",
base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1")
llm_with_tools = llm.bind_tools(tools)
def chatbot(state: State):
think_response = llm_with_tools.invoke(["Please reasoning:"] + state["messages"])
normal_response = llm_with_tools.invoke(state["messages"])
generate_custom_stream("think", think_response.content)
generate_custom_stream("normal", normal_response.content)
return {"messages": [normal_response]}
graph_builder = StateGraph(State)
graph_builder.add_node("chatbot", chatbot)#增加chatbot节点到工作流图
tool_node = ToolNode(tools=tools) #生成工具节点
graph_builder.add_node("tools", tool_node) #把工具节点增加到工作流图中
graph_builder.add_conditional_edges( "chatbot", tools_condition,)#增加条件边
graph_builder.add_edge("tools", "chatbot")#增加从tools—>chatbot的边
graph_builder.add_edge(START, "chatbot")#增加从START—>chatbot的边
graph = graph_builder.compile()
'''
Define api processing
'''
app = FastAPI(
title="Langgraph API",
description="Langgraph API",
)
@app.get("/test")
async def test():
return {"message": "Hello World"}
@app.post("/stream")
async def stream(inputs: State):
async def event_stream():
try:
stream_start_msg = {
'choices':
[
{
'delta': {},
'finish_reason': None
}
]
}
# Stream start
yield f"data: {json.dumps(stream_start_msg)}\n\n"
# Processing langgraph stream response with <think> block support
async for event in graph.astream(input=inputs, stream_mode="custom"):
print(event)
think_content = event.get("think", None)
normal_content = event.get("normal", None)
think_msg = {
'choices':
[
{
'delta':
{
'reasoning_content': think_content,
},
'finish_reason': None
}
]
}
normal_msg = {
'choices':
[
{
'delta':
{
'content': normal_content,
},
'finish_reason': None
}
]
}
yield f"data: {json.dumps(think_msg)}\n\n"
yield f"data: {json.dumps(normal_msg)}\n\n"
# End of the stream
stream_end_msg = {
'choices': [
{
'delta': {},
'finish_reason': 'stop'
}
]
}
yield f"data: {json.dumps(stream_end_msg)}\n\n"
except Exception as e:
# Simply print the error information
print(f"An error occurred: {e}")
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)
#pip install -r requirements.txt5)启动服务
#unicorn langgraph_example:app --reload #服务启动后运行端口缺省为8000第二步:安装流水线
import os第三步:使用langgraph的chatbot
import requests
from pydantic import BaseModel, Field
from typing import List, Union, Generator, Iterator
class Pipeline:
class Valves(BaseModel):
API_URL: str = Field(default="http://127.0.0.1:9000/stream", description="Langgraph API URL")
def __init__(self):
self.id = "LangGraph stream" #模型列表中显示的名字,可自行修改
self.name = "LangGraph stream"
# Initialize valve paramaters
self.valves = self.Valves(
**{k: os.getenv(k, v.default) for k, v in self.Valves.model_fields.items()}
)
async def on_startup(self):
# This function is called when the server is started.
print(f"on_startup: {__name__}")
pass
async def on_shutdown(self):
# This function is called when the server is shutdown.
print(f"on_shutdown: {__name__}")
pass
def pipe(
self,
user_message: str,
model_id: str,
messages: List[dict],
body: dict
) -> Union[str, Generator, Iterator]:
data = {
"messages": [[msg['role'], msg['content']] for msg in messages],
}
headers = {
'accept': 'text/event-stream',
'Content-Type': 'application/json',
}
response = requests.post(self.valves.API_URL, json=data, headers=headers, stream=True)
response.raise_for_status()
return response.iter_lines()
| 欢迎光临 AI创想 (https://www.llms-ai.com/) | Powered by Discuz! X3.4 |