diff --git a/ai_ta_backend/beam/nomic_logging.py b/ai_ta_backend/beam/nomic_logging.py index 5d7fff03..41ae392d 100644 --- a/ai_ta_backend/beam/nomic_logging.py +++ b/ai_ta_backend/beam/nomic_logging.py @@ -40,7 +40,8 @@ def create_document_map(course_name: str): # check if map exists response = SUPABASE_CLIENT.table("projects").select("doc_map_id").eq("course_name", course_name).execute() if response.data: - return "Map already exists for this course." + if response.data[0]['doc_map_id']: + return "Map already exists for this course." # fetch relevant document data from Supabase response = SUPABASE_CLIENT.table("documents").select("id", diff --git a/ai_ta_backend/database/sql.py b/ai_ta_backend/database/sql.py index ae77e6fe..ecd775d2 100644 --- a/ai_ta_backend/database/sql.py +++ b/ai_ta_backend/database/sql.py @@ -90,16 +90,19 @@ def insertProjectInfo(self, project_info): return self.supabase_client.table("projects").insert(project_info).execute() def getAllFromLLMConvoMonitor(self, course_name: str): - return self.supabase_client.table("llm-convo-monitor").select("*").eq("course_name", course_name).execute() + return self.supabase_client.table("llm-convo-monitor").select("*").eq("course_name", course_name).order('id', desc=False).execute() - def getCountFromLLMConvoMonitor(self, course_name: str): - return self.supabase_client.table("llm-convo-monitor").select("id", count='exact').eq("course_name", course_name).execute() + def getCountFromLLMConvoMonitor(self, course_name: str, last_id: int): + if last_id == 0: + return self.supabase_client.table("llm-convo-monitor").select("id", count='exact').eq("course_name", course_name).order('id', desc=False).execute() + else: + return self.supabase_client.table("llm-convo-monitor").select("id", count='exact').eq("course_name", course_name).gt("id", last_id).order('id', desc=False).execute() def getDocMapFromProjects(self, course_name: str): return self.supabase_client.table("projects").select("doc_map_id").eq("course_name", course_name).execute() def getConvoMapFromProjects(self, course_name: str): - return self.supabase_client.table("projects").select("convo_map_id").eq("course_name", course_name).execute() + return self.supabase_client.table("projects").select("*").eq("course_name", course_name).execute() def updateProjects(self, course_name: str, data: dict): return self.supabase_client.table("projects").update(data).eq("course_name", course_name).execute() diff --git a/ai_ta_backend/main.py b/ai_ta_backend/main.py index cfe9cbf0..b27e97c3 100644 --- a/ai_ta_backend/main.py +++ b/ai_ta_backend/main.py @@ -38,6 +38,8 @@ from ai_ta_backend.service.retrieval_service import RetrievalService from ai_ta_backend.service.sentry_service import SentryService +from ai_ta_backend.beam.nomic_logging import create_document_map + app = Flask(__name__) CORS(app) executor = Executor(app) @@ -191,7 +193,7 @@ def createDocumentMap(service: NomicService): # proper web error "400 Bad request" abort(400, description=f"Missing required parameter: 'course_name' must be provided. Course name: `{course_name}`") - map_id = service.create_document_map(course_name) + map_id = create_document_map(course_name) response = jsonify(map_id) response.headers.add('Access-Control-Allow-Origin', '*') @@ -211,6 +213,20 @@ def createConversationMap(service: NomicService): response.headers.add('Access-Control-Allow-Origin', '*') return response +@app.route('/logToConversationMap', methods=['GET']) +def logToConversationMap(service: NomicService): + course_name: str = request.args.get('course_name', default='', type=str) + + if course_name == '': + # proper web error "400 Bad request" + abort(400, description=f"Missing required parameter: 'course_name' must be provided. Course name: `{course_name}`") + + map_id = service.log_to_conversation_map(course_name) + + response = jsonify(map_id) + response.headers.add('Access-Control-Allow-Origin', '*') + return response + @app.route('/onResponseCompletion', methods=['POST']) def logToNomic(service: NomicService, flaskExecutor: ExecutorInterface): diff --git a/ai_ta_backend/service/nomic_service.py b/ai_ta_backend/service/nomic_service.py index 2066468a..fee6ee21 100644 --- a/ai_ta_backend/service/nomic_service.py +++ b/ai_ta_backend/service/nomic_service.py @@ -258,6 +258,84 @@ def log_to_conversation_map(self, course_name: str): 2. If no, create it 3. If yes, fetch all conversations since last upload and log it """ + nomic.login(os.getenv('NOMIC_API_KEY')) + NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' + + # check if map exists + response = self.sql.getConvoMapFromProjects(course_name) + print("Response from supabase: ", response.data) + + if not response.data[0]['convo_map_id']: + print("Map does not exist for this course. Redirecting to map creation...") + return self.create_conversation_map(course_name) + + project_id = response.data[0]['convo_map_id'] + last_uploaded_convo_id = response.data[0]['last_uploaded_convo_id'] + + # check if project is accepting data + project = AtlasProject(project_id=project_id, add_datums_if_exists=True) + if not project.is_accepting_data: + return "Project is currently indexing and cannot ingest new datums. Try again later." + + # fetch count of conversations since last upload + response = self.sql.getCountFromLLMConvoMonitor(course_name, last_id=last_uploaded_convo_id) + total_convo_count = response.count + print("Total number of unlogged conversations in Supabase: ", total_convo_count) + + if total_convo_count == 0: + return "No new conversations to log." + + first_id = last_uploaded_convo_id + combined_dfs = [] + current_convo_count = 0 + convo_count = 0 + + while current_convo_count < total_convo_count: + response = self.sql.getAllConversationsBetweenIds(course_name, first_id, 0, 100) + print("Response count: ", len(response.data)) + if len(response.data) == 0: + break + df = pd.DataFrame(response.data) + combined_dfs.append(df) + current_convo_count += len(response.data) + convo_count += len(response.data) + print(current_convo_count) + + if convo_count >= 500: + # concat all dfs from the combined_dfs list + final_df = pd.concat(combined_dfs, ignore_index=True) + # prep data for nomic upload + embeddings, metadata = self.data_prep_for_convo_map(final_df) + # append to existing map + print("Appending data to existing map...") + result = self.append_to_map(embeddings, metadata, NOMIC_MAP_NAME_PREFIX + course_name) + if result == "success": + last_id = int(final_df['id'].iloc[-1]) + project_info = {'course_name': course_name, 'convo_map_id': project_id, 'last_uploaded_convo_id': last_id} + project_response = self.sql.updateProjects(course_name, project_info) + print("Update response from supabase: ", project_response) + # reset variables + combined_dfs = [] + convo_count = 0 + print("Records uploaded: ", current_convo_count) + + # set first_id for next iteration + first_id = response.data[-1]['id'] + 1 + + # upload last set of convos + if convo_count > 0: + print("Uploading last set of conversations...") + final_df = pd.concat(combined_dfs, ignore_index=True) + embeddings, metadata = self.data_prep_for_convo_map(final_df) + result = self.append_to_map(embeddings, metadata, NOMIC_MAP_NAME_PREFIX + course_name) + if result == "success": + last_id = int(final_df['id'].iloc[-1]) + project_info = {'course_name': course_name, 'convo_map_id': project_id, 'last_uploaded_convo_id': last_id} + project_response = self.sql.updateProjects(course_name, project_info) + print("Update response from supabase: ", project_response) + + return "success" + def create_conversation_map(self, course_name: str): @@ -270,11 +348,12 @@ def create_conversation_map(self, course_name: str): # check if map exists response = self.sql.getConvoMapFromProjects(course_name) print("Response from supabase: ", response.data) - if response.data[0]['convo_map_id']: - return "Map already exists for this course." + if response.data: + if response.data[0]['convo_map_id']: + return "Map already exists for this course." # if no, fetch total count of records - response = self.sql.getCountFromLLMConvoMonitor(course_name) + response = self.sql.getCountFromLLMConvoMonitor(course_name, last_id=0) # if <20, return message that map cannot be created if not response.count: @@ -340,7 +419,9 @@ def create_conversation_map(self, course_name: str): project = AtlasProject(name=project_name, add_datums_if_exists=True) result = self.append_to_map(embeddings, metadata, project_name) if result == "success": + print("map append successful") last_id = int(final_df['id'].iloc[-1]) + project_info = {'last_uploaded_convo_id': last_id} project_response = self.sql.updateProjects(course_name, project_info) print("Update response from supabase: ", project_response) @@ -470,7 +551,7 @@ def append_to_map(self, embeddings, metadata, map_name): project = atlas.AtlasProject(name=map_name, add_datums_if_exists=True) with project.wait_for_project_lock(): project.add_embeddings(embeddings=embeddings, data=metadata) - return "Successfully appended to Nomic map" + return "success" except Exception as e: print(e) return "Error in appending to map: {e}" @@ -486,45 +567,51 @@ def data_prep_for_convo_map(self, df: pd.DataFrame): metadata: pd.DataFrame of metadata """ print("in data_prep_for_convo_map()") - try: - metadata = [] - embeddings = [] - user_queries = [] - - for _index, row in df.iterrows(): - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - created_at = datetime.datetime.strptime(row['created_at'], "%Y-%m-%dT%H:%M:%S.%f%z").strftime("%Y-%m-%d %H:%M:%S") - conversation_exists = False - conversation = "" - emoji = "" - - if row['user_email'] is None: - user_email = "" - else: - user_email = row['user_email'] - - messages = row['convo']['messages'] + + metadata = [] + embeddings = [] + user_queries = [] + + for _index, row in df.iterrows(): + current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + created_at = datetime.datetime.strptime(row['created_at'], "%Y-%m-%dT%H:%M:%S.%f%z").strftime("%Y-%m-%d %H:%M:%S") + conversation_exists = False + conversation = "" + emoji = "" + + if row['user_email'] is None: + user_email = "" + else: + user_email = row['user_email'] + + messages = row['convo']['messages'] + + # some conversations include images, so the data structure is different + if isinstance(messages[0]['content'], list): + if 'text' in messages[0]['content'][0]: + first_message = messages[0]['content'][0]['text'] + #print("First message:", first_message) + else: first_message = messages[0]['content'] - # some conversations include images, so the data structure is different - if isinstance(first_message, list): - first_message = first_message[0]['text'] - user_queries.append(first_message) - - # construct metadata for multi-turn conversation - for message in messages: - if message['role'] == 'user': - emoji = "🙋 " - else: - emoji = "🤖 " + user_queries.append(first_message) - if isinstance(message['content'], list): + # construct metadata for multi-turn conversation + for message in messages: + if message['role'] == 'user': + emoji = "🙋 " + else: + emoji = "🤖 " + + if isinstance(message['content'], list): + + if 'text' in message['content'][0]: text = message['content'][0]['text'] - else: - text = message['content'] + else: + text = message['content'] - conversation += "\n>>> " + emoji + message['role'] + ": " + text + "\n" + conversation += "\n>>> " + emoji + message['role'] + ": " + text + "\n" - meta_row = { + meta_row = { "course": row['course_name'], "conversation": conversation, "conversation_id": row['convo']['id'], @@ -533,23 +620,25 @@ def data_prep_for_convo_map(self, df: pd.DataFrame): "first_query": first_message, "created_at": created_at, "modified_at": current_time - } - - metadata.append(meta_row) + } + #print("Metadata row:", meta_row) + metadata.append(meta_row) - embeddings_model = OpenAIEmbeddings(openai_api_type="openai", + embeddings_model = OpenAIEmbeddings(openai_api_type="openai", openai_api_base="https://api.openai.com/v1/", openai_api_key=os.environ['VLADS_OPENAI_KEY']) - embeddings = embeddings_model.embed_documents(user_queries) + embeddings = embeddings_model.embed_documents(user_queries) - metadata = pd.DataFrame(metadata) - embeddings = np.array(embeddings) - return embeddings, metadata - - except Exception as e: - print("Error in data_prep_for_convo_map():", e) - self.sentry.capture_exception(e) - return None, None + metadata = pd.DataFrame(metadata) + embeddings = np.array(embeddings) + print("Metadata shape:", metadata.shape) + print("Embeddings shape:", embeddings.shape) + return embeddings, metadata + + # except Exception as e: + # print("Error in data_prep_for_convo_map():", e) + # self.sentry.capture_exception(e) + # return None, None def delete_from_document_map(self, project_id: str, ids: list): """