forked from docker/genai-stack
-
Notifications
You must be signed in to change notification settings - Fork 0
/
api.py
179 lines (142 loc) · 4.95 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# api.py - This script defines a FastAPI application that provides HTTP API endpoints for
# interacting with a language model (LLM) and a Neo4j graph database. It supports both
# streaming and non-streaming responses for question-answering and can generate tickets
# based on user input. Essentially accomplishes everything that bot.py does but just with APIs and no UI.
import os
from langchain.graphs import Neo4jGraph
from dotenv import load_dotenv
from utils import (
create_vector_index,
BaseLogger,
)
from chains import (
load_embedding_model,
load_llm,
configure_llm_only_chain,
configure_qa_rag_chain,
generate_ticket,
)
from fastapi import FastAPI, Depends
from pydantic import BaseModel
from langchain.callbacks.base import BaseCallbackHandler
from threading import Thread
from queue import Queue, Empty
from collections.abc import Generator
from sse_starlette.sse import EventSourceResponse
from fastapi.middleware.cors import CORSMiddleware
import json
# Load environment variables from a .env file
load_dotenv(".env")
# Retrieve Neo4j and Ollama configuration from environment variables
url = os.getenv("NEO4J_URI")
username = os.getenv("NEO4J_USERNAME")
password = os.getenv("NEO4J_PASSWORD")
ollama_base_url = os.getenv("OLLAMA_BASE_URL")
embedding_model_name = os.getenv("EMBEDDING_MODEL")
llm_name = os.getenv("LLM")
# Remapping for Langchain Neo4j integration
os.environ["NEO4J_URL"] = url
# Load the embedding model and create a vector index in the Neo4j database
embeddings, dimension = load_embedding_model(
embedding_model_name,
config={"ollama_base_url": ollama_base_url},
logger=BaseLogger(),
)
# Initialize the Neo4j graph database connection
# if Neo4j is local, you can go to http://localhost:7474/ to browse the database
neo4j_graph = Neo4jGraph(url=url, username=username, password=password)
create_vector_index(neo4j_graph, dimension)
# Load the language model
llm = load_llm(
llm_name, logger=BaseLogger(), config={"ollama_base_url": ollama_base_url}
)
# Configure the chains for LLM only and RAG
llm_chain = configure_llm_only_chain(llm)
rag_chain = configure_qa_rag_chain(
llm, embeddings, embeddings_store_url=url, username=username, password=password
)
# Define a callback handler for streaming LLM responses to a queue
class QueueCallback(BaseCallbackHandler):
"""Callback handler for streaming LLM responses to a queue."""
def __init__(self, q):
self.q = q
def on_llm_new_token(self, token: str, **kwargs) -> None:
self.q.put(token)
def on_llm_end(self, *args, **kwargs) -> None:
return self.q.empty()
# Function to stream LLM responses using a queue
def stream(cb, q) -> Generator:
job_done = object()
def task():
x = cb()
q.put(job_done)
t = Thread(target=task)
t.start()
content = ""
# Get each new token from the queue and yield for our generator
while True:
try:
next_token = q.get(True, timeout=1)
if next_token is job_done:
break
content += next_token
yield next_token, content
except Empty:
continue
# Initialize the FastAPI application
app = FastAPI()
origins = ["*"]
# Add CORS middleware to allow cross-origin requests
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Root endpoint
@app.get("/")
async def root():
return {"message": "Hello World"}
# Pydantic models for request bodies
class Question(BaseModel):
text: str
rag: bool = False
class BaseTicket(BaseModel):
text: str
# Streaming endpoint for question-answering
@app.get("/query-stream")
def qstream(question: Question = Depends()):
output_function = llm_chain
if question.rag:
output_function = rag_chain
q = Queue()
def cb():
output_function(
{"question": question.text, "chat_history": []},
callbacks=[QueueCallback(q)],
)
def generate():
yield json.dumps({"init": True, "model": llm_name})
for token, _ in stream(cb, q):
yield json.dumps({"token": token})
return EventSourceResponse(generate(), media_type="text/event-stream")
# Non-streaming endpoint for question-answering
@app.get("/query")
async def ask(question: Question = Depends()):
output_function = llm_chain
if question.rag:
output_function = rag_chain
result = output_function(
{"question": question.text, "chat_history": []}, callbacks=[]
)
return {"result": result["answer"], "model": llm_name}
# Endpoint for generating a ticket based on user input
@app.get("/generate-ticket")
async def generate_ticket_api(question: BaseTicket = Depends()):
new_title, new_question = generate_ticket(
neo4j_graph=neo4j_graph,
llm_chain=llm_chain,
input_question=question.text,
)
return {"result": {"title": new_title, "text": new_question}, "model": llm_name}