diff --git a/ai_ta_backend/beam/nomic_logging.py b/ai_ta_backend/beam/nomic_logging.py index 92db8a62..f8e9b2be 100644 --- a/ai_ta_backend/beam/nomic_logging.py +++ b/ai_ta_backend/beam/nomic_logging.py @@ -1,438 +1,431 @@ -import datetime -import os - -import nomic -import numpy as np -import pandas as pd -import sentry_sdk -import supabase -from langchain.embeddings import OpenAIEmbeddings -from nomic import AtlasProject, atlas - -OPENAI_API_TYPE = "azure" - -SUPABASE_CLIENT = supabase.create_client( # type: ignore - supabase_url=os.getenv('SUPABASE_URL'), # type: ignore - supabase_key=os.getenv('SUPABASE_API_KEY')) # type: ignore - -NOMIC_MAP_NAME_PREFIX = 'Document Map for ' - -## -------------------------------- DOCUMENT MAP FUNCTIONS --------------------------------- ## - -def create_document_map(course_name: str): - """ - This is a function which creates a document map for a given course from scratch - 1. Gets count of documents for the course - 2. If less than 20, returns a message that a map cannot be created - 3. If greater than 20, iteratively fetches documents in batches of 25 - 4. Prepares metadata and embeddings for nomic upload - 5. Creates a new map and uploads the data - - Args: - course_name: str - Returns: - str: success or failed - """ - print("in create_document_map()") - nomic.login(os.getenv('NOMIC_API_KEY')) - - try: - # check if map exists - response = SUPABASE_CLIENT.table("projects").select("doc_map_id").eq("course_name", course_name).execute() - if response.data: - if response.data[0]['doc_map_id']: - return "Map already exists for this course." - - # fetch relevant document data from Supabase - response = SUPABASE_CLIENT.table("documents").select("id", - count="exact").eq("course_name", - course_name).order('id', - desc=False).execute() - if not response.count: - return "No documents found for this course." - - total_doc_count = response.count - print("Total number of documents in Supabase: ", total_doc_count) - - # minimum 20 docs needed to create map - if total_doc_count < 20: - return "Cannot create a map because there are less than 20 documents in the course." - - first_id = response.data[0]['id'] - - combined_dfs = [] - curr_total_doc_count = 0 - doc_count = 0 - first_batch = True - - # iteratively query in batches of 25 - while curr_total_doc_count < total_doc_count: - - response = SUPABASE_CLIENT.table("documents").select( - "id, created_at, s3_path, url, base_url, readable_filename, contexts").eq("course_name", course_name).gte( - 'id', first_id).order('id', desc=False).limit(25).execute() - df = pd.DataFrame(response.data) - combined_dfs.append(df) # list of dfs - - curr_total_doc_count += len(response.data) - doc_count += len(response.data) - - if doc_count >= 1000: # upload to Nomic in batches of 1000 - - # concat all dfs from the combined_dfs list - final_df = pd.concat(combined_dfs, ignore_index=True) - - # prep data for nomic upload - embeddings, metadata = data_prep_for_doc_map(final_df) - - if first_batch: - # create a new map - print("Creating new map...") - project_name = NOMIC_MAP_NAME_PREFIX + course_name - index_name = course_name + "_doc_index" - topic_label_field = "text" - colorable_fields = ["readable_filename", "text", "base_url", "created_at"] - result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) - - if result == "success": - # update flag - first_batch = False - # log project info to supabase - project = AtlasProject(name=project_name, add_datums_if_exists=True) - project_id = project.id - last_id = int(final_df['id'].iloc[-1]) - project_info = {'course_name': course_name, 'doc_map_id': project_id, 'last_uploaded_doc_id': last_id} - project_response = SUPABASE_CLIENT.table("projects").select("*").eq("course_name", course_name).execute() - if project_response.data: - update_response = SUPABASE_CLIENT.table("projects").update(project_info).eq("course_name", course_name).execute() - print("Response from supabase: ", update_response) - else: - insert_response = SUPABASE_CLIENT.table("projects").insert(project_info).execute() - print("Insert Response from supabase: ", insert_response) - - - else: - # append to existing map - print("Appending data to existing map...") - project_name = NOMIC_MAP_NAME_PREFIX + course_name - # add project lock logic here - result = append_to_map(embeddings, metadata, project_name) - if result == "success": - # update the last uploaded id in supabase - last_id = int(final_df['id'].iloc[-1]) - info = {'last_uploaded_doc_id': last_id} - update_response = SUPABASE_CLIENT.table("projects").update(info).eq("course_name", course_name).execute() - print("Response from supabase: ", update_response) - - # reset variables - combined_dfs = [] - doc_count = 0 - print("Records uploaded: ", curr_total_doc_count) - - # set first_id for next iteration - first_id = response.data[-1]['id'] + 1 - - # upload last set of docs - if doc_count > 0: - final_df = pd.concat(combined_dfs, ignore_index=True) - embeddings, metadata = data_prep_for_doc_map(final_df) - project_name = NOMIC_MAP_NAME_PREFIX + course_name - if first_batch: - index_name = course_name + "_doc_index" - topic_label_field = "text" - colorable_fields = ["readable_filename", "text", "base_url", "created_at"] - result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) - else: - result = append_to_map(embeddings, metadata, project_name) - - # update the last uploaded id in supabase - if result == "success": - # update the last uploaded id in supabase - last_id = int(final_df['id'].iloc[-1]) - project = AtlasProject(name=project_name, add_datums_if_exists=True) - project_id = project.id - project_info = {'course_name': course_name, 'doc_map_id': project_id, 'last_uploaded_doc_id': last_id} - print("project_info: ", project_info) - project_response = SUPABASE_CLIENT.table("projects").select("*").eq("course_name", course_name).execute() - if project_response.data: - update_response = SUPABASE_CLIENT.table("projects").update(project_info).eq("course_name", course_name).execute() - print("Response from supabase: ", update_response) - else: - insert_response = SUPABASE_CLIENT.table("projects").insert(project_info).execute() - print("Insert Response from supabase: ", insert_response) - - - # rebuild the map - rebuild_map(course_name, "document") - - except Exception as e: - print(e) - sentry_sdk.capture_exception(e) - return "failed" - -def delete_from_document_map(course_name: str, ids: list): - """ - This function is used to delete datapoints from a document map. - Currently used within the delete_data() function in vector_database.py - Args: - course_name: str - ids: list of str - """ - print("in delete_from_document_map()") - - try: - # check if project exists - response = SUPABASE_CLIENT.table("projects").select("doc_map_id").eq("course_name", course_name).execute() - if response.data: - project_id = response.data[0]['doc_map_id'] - else: - return "No document map found for this course" - - # fetch project from Nomic - project = AtlasProject(project_id=project_id, add_datums_if_exists=True) - - # delete the ids from Nomic - print("Deleting point from document map:", project.delete_data(ids)) - with project.wait_for_project_lock(): - project.rebuild_maps() - return "success" - except Exception as e: - print(e) - sentry_sdk.capture_exception(e) - return "Error in deleting from document map: {e}" - - -def log_to_document_map(course_name: str): - """ - This is a function which appends new documents to an existing document map. It's called - at the end of split_and_upload() after inserting data to Supabase. - Args: - data: dict - the response data from Supabase insertion - """ - print("in add_to_document_map()") - - try: - # check if map exists - response = SUPABASE_CLIENT.table("projects").select("doc_map_id, last_uploaded_doc_id").eq("course_name", course_name).execute() - if response.data: - if response.data[0]['doc_map_id']: - project_id = response.data[0]['doc_map_id'] - last_uploaded_doc_id = response.data[0]['last_uploaded_doc_id'] - else: - # entry present in supabase, but doc map not present - create_document_map(course_name) - return "Document map not present, triggering map creation." - - else: - # create a map - create_document_map(course_name) - return "Document map not present, triggering map creation." - - project = AtlasProject(project_id=project_id, add_datums_if_exists=True) - project_name = "Document Map for " + course_name - - # check if project is LOCKED, if yes -> skip logging - if not project.is_accepting_data: - return "Skipping Nomic logging because project is locked." - - # fetch count of records greater than last_uploaded_doc_id - print("last uploaded doc id: ", last_uploaded_doc_id) - response = SUPABASE_CLIENT.table("documents").select("id", count="exact").eq("course_name", course_name).gt("id", last_uploaded_doc_id).execute() - print("Number of new documents: ", response.count) - - total_doc_count = response.count - current_doc_count = 0 - combined_dfs = [] - doc_count = 0 - first_id = last_uploaded_doc_id - while current_doc_count < total_doc_count: - # fetch all records from supabase greater than last_uploaded_doc_id - response = SUPABASE_CLIENT.table("documents").select("id, created_at, s3_path, url, base_url, readable_filename, contexts").eq("course_name", course_name).gt("id", first_id).limit(25).execute() - df = pd.DataFrame(response.data) - combined_dfs.append(df) # list of dfs - - current_doc_count += len(response.data) - doc_count += len(response.data) - - if doc_count >= 1000: # upload to Nomic in batches of 1000 - # concat all dfs from the combined_dfs list - final_df = pd.concat(combined_dfs, ignore_index=True) - # prep data for nomic upload - embeddings, metadata = data_prep_for_doc_map(final_df) - - # append to existing map - print("Appending data to existing map...") - - result = append_to_map(embeddings, metadata, project_name) - if result == "success": - # update the last uploaded id in supabase - last_id = int(final_df['id'].iloc[-1]) - info = {'last_uploaded_doc_id': last_id} - update_response = SUPABASE_CLIENT.table("projects").update(info).eq("course_name", course_name).execute() - print("Response from supabase: ", update_response) - - # reset variables - combined_dfs = [] - doc_count = 0 - print("Records uploaded: ", current_doc_count) - - # set first_id for next iteration - first_id = response.data[-1]['id'] + 1 - - # upload last set of docs - if doc_count > 0: - final_df = pd.concat(combined_dfs, ignore_index=True) - embeddings, metadata = data_prep_for_doc_map(final_df) - result = append_to_map(embeddings, metadata, project_name) - - # update the last uploaded id in supabase - if result == "success": - # update the last uploaded id in supabase - last_id = int(final_df['id'].iloc[-1]) - project_info = {'last_uploaded_doc_id': last_id} - update_response = SUPABASE_CLIENT.table("projects").update(project_info).eq("course_name", course_name).execute() - print("Response from supabase: ", update_response) - - return "success" - except Exception as e: - print(e) - return "failed" - - -def create_map(embeddings, metadata, map_name, index_name, topic_label_field, colorable_fields): - """ - Generic function to create a Nomic map from given parameters. - Args: - embeddings: np.array of embeddings - metadata: pd.DataFrame of metadata - map_name: str - index_name: str - topic_label_field: str - colorable_fields: list of str - """ - nomic.login(os.getenv('NOMIC_API_KEY')) - try: - project = atlas.map_embeddings(embeddings=embeddings, - data=metadata, - id_field="id", - build_topic_model=True, - topic_label_field=topic_label_field, - name=map_name, - colorable_fields=colorable_fields, - add_datums_if_exists=True) - project.create_index(name=index_name, build_topic_model=True) - return "success" - except Exception as e: - print(e) - return "Error in creating map: {e}" - -def append_to_map(embeddings, metadata, map_name): - """ - Generic function to append new data to an existing Nomic map. - Args: - embeddings: np.array of embeddings - metadata: pd.DataFrame of Nomic upload metadata - map_name: str - """ - - nomic.login(os.getenv('NOMIC_API_KEY')) - try: - project = atlas.AtlasProject(name=map_name, add_datums_if_exists=True) - with project.wait_for_project_lock(): - project.add_embeddings(embeddings=embeddings, data=metadata) - return "success" - except Exception as e: - print(e) - return "Error in appending to map: {e}" - -def data_prep_for_doc_map(df: pd.DataFrame): - """ - This function prepares embeddings and metadata for nomic upload in document map creation. - Args: - df: pd.DataFrame - the dataframe of documents from Supabase - Returns: - embeddings: np.array of embeddings - metadata: pd.DataFrame of metadata - """ - print("in data_prep_for_doc_map()") - - metadata = [] - embeddings = [] - - texts = [] - - for index, row in df.iterrows(): - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - created_at = datetime.datetime.strptime(row['created_at'], "%Y-%m-%dT%H:%M:%S.%f%z").strftime("%Y-%m-%d %H:%M:%S") - if row['url'] == None: - row['url'] = "" - if row['base_url'] == None: - row['base_url'] = "" - # iterate through all contexts and create separate entries for each - context_count = 0 - for context in row['contexts']: - context_count += 1 - text_row = context['text'] - embeddings_row = context['embedding'] - - meta_row = { - "id": str(row['id']) + "_" + str(context_count), - "created_at": created_at, - "s3_path": row['s3_path'], - "url": row['url'], - "base_url": row['base_url'], - "readable_filename": row['readable_filename'], - "modified_at": current_time, - "text": text_row - } - - embeddings.append(embeddings_row) - metadata.append(meta_row) - texts.append(text_row) - - embeddings_np = np.array(embeddings, dtype=object) - print("Shape of embeddings: ", embeddings_np.shape) - - # check dimension if embeddings_np is (n, 1536) - if len(embeddings_np.shape) < 2: - print("Creating new embeddings...") - - embeddings_model = OpenAIEmbeddings(openai_api_type="openai", - openai_api_base="https://api.openai.com/v1/", - openai_api_key=os.getenv('VLADS_OPENAI_KEY')) # type: ignore - embeddings = embeddings_model.embed_documents(texts) - - metadata = pd.DataFrame(metadata) - embeddings = np.array(embeddings) - - return embeddings, metadata - -def rebuild_map(course_name:str, map_type:str): - """ - This function rebuilds a given map in Nomic. - """ - print("in rebuild_map()") - nomic.login(os.getenv('NOMIC_API_KEY')) - if map_type.lower() == 'document': - NOMIC_MAP_NAME_PREFIX = 'Document Map for ' - else: - NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' - - try: - # fetch project from Nomic - project_name = NOMIC_MAP_NAME_PREFIX + course_name - project = AtlasProject(name=project_name, add_datums_if_exists=True) - - if project.is_accepting_data: # temporary fix - will skip rebuilding if project is locked - project.rebuild_maps() - return "success" - except Exception as e: - print(e) - sentry_sdk.capture_exception(e) - return "Error in rebuilding map: {e}" - - - -if __name__ == '__main__': - pass - +# import datetime +# import os + +# import nomic +# import numpy as np +# import pandas as pd +# import sentry_sdk +# import supabase +# from langchain.embeddings import OpenAIEmbeddings +# # from nomic import AtlasProject, atlas + +# OPENAI_API_TYPE = "azure" + +# SUPABASE_CLIENT = supabase.create_client( # type: ignore +# supabase_url=os.getenv('SUPABASE_URL'), # type: ignore +# supabase_key=os.getenv('SUPABASE_API_KEY')) # type: ignore + +# NOMIC_MAP_NAME_PREFIX = 'Document Map for ' + +# ## -------------------------------- DOCUMENT MAP FUNCTIONS --------------------------------- ## + +# def create_document_map(course_name: str): +# """ +# This is a function which creates a document map for a given course from scratch +# 1. Gets count of documents for the course +# 2. If less than 20, returns a message that a map cannot be created +# 3. If greater than 20, iteratively fetches documents in batches of 25 +# 4. Prepares metadata and embeddings for nomic upload +# 5. Creates a new map and uploads the data + +# Args: +# course_name: str +# Returns: +# str: success or failed +# """ +# print("in create_document_map()") +# nomic.login(os.getenv('NOMIC_API_KEY')) + +# try: +# # check if map exists +# response = SUPABASE_CLIENT.table("projects").select("doc_map_id").eq("course_name", course_name).execute() +# if response.data: +# if response.data[0]['doc_map_id']: +# return "Map already exists for this course." + +# # fetch relevant document data from Supabase +# response = SUPABASE_CLIENT.table("documents").select("id", +# count="exact").eq("course_name", +# course_name).order('id', +# desc=False).execute() +# if not response.count: +# return "No documents found for this course." + +# total_doc_count = response.count +# print("Total number of documents in Supabase: ", total_doc_count) + +# # minimum 20 docs needed to create map +# if total_doc_count < 20: +# return "Cannot create a map because there are less than 20 documents in the course." + +# first_id = response.data[0]['id'] + +# combined_dfs = [] +# curr_total_doc_count = 0 +# doc_count = 0 +# first_batch = True + +# # iteratively query in batches of 25 +# while curr_total_doc_count < total_doc_count: + +# response = SUPABASE_CLIENT.table("documents").select( +# "id, created_at, s3_path, url, base_url, readable_filename, contexts").eq("course_name", course_name).gte( +# 'id', first_id).order('id', desc=False).limit(25).execute() +# df = pd.DataFrame(response.data) +# combined_dfs.append(df) # list of dfs + +# curr_total_doc_count += len(response.data) +# doc_count += len(response.data) + +# if doc_count >= 1000: # upload to Nomic in batches of 1000 + +# # concat all dfs from the combined_dfs list +# final_df = pd.concat(combined_dfs, ignore_index=True) + +# # prep data for nomic upload +# embeddings, metadata = data_prep_for_doc_map(final_df) + +# if first_batch: +# # create a new map +# print("Creating new map...") +# project_name = NOMIC_MAP_NAME_PREFIX + course_name +# index_name = course_name + "_doc_index" +# topic_label_field = "text" +# colorable_fields = ["readable_filename", "text", "base_url", "created_at"] +# result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) + +# if result == "success": +# # update flag +# first_batch = False +# # log project info to supabase +# project = AtlasProject(name=project_name, add_datums_if_exists=True) +# project_id = project.id +# last_id = int(final_df['id'].iloc[-1]) +# project_info = {'course_name': course_name, 'doc_map_id': project_id, 'last_uploaded_doc_id': last_id} +# project_response = SUPABASE_CLIENT.table("projects").select("*").eq("course_name", course_name).execute() +# if project_response.data: +# update_response = SUPABASE_CLIENT.table("projects").update(project_info).eq("course_name", course_name).execute() +# print("Response from supabase: ", update_response) +# else: +# insert_response = SUPABASE_CLIENT.table("projects").insert(project_info).execute() +# print("Insert Response from supabase: ", insert_response) + +# else: +# # append to existing map +# print("Appending data to existing map...") +# project_name = NOMIC_MAP_NAME_PREFIX + course_name +# # add project lock logic here +# result = append_to_map(embeddings, metadata, project_name) +# if result == "success": +# # update the last uploaded id in supabase +# last_id = int(final_df['id'].iloc[-1]) +# info = {'last_uploaded_doc_id': last_id} +# update_response = SUPABASE_CLIENT.table("projects").update(info).eq("course_name", course_name).execute() +# print("Response from supabase: ", update_response) + +# # reset variables +# combined_dfs = [] +# doc_count = 0 +# print("Records uploaded: ", curr_total_doc_count) + +# # set first_id for next iteration +# first_id = response.data[-1]['id'] + 1 + +# # upload last set of docs +# if doc_count > 0: +# final_df = pd.concat(combined_dfs, ignore_index=True) +# embeddings, metadata = data_prep_for_doc_map(final_df) +# project_name = NOMIC_MAP_NAME_PREFIX + course_name +# if first_batch: +# index_name = course_name + "_doc_index" +# topic_label_field = "text" +# colorable_fields = ["readable_filename", "text", "base_url", "created_at"] +# result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) +# else: +# result = append_to_map(embeddings, metadata, project_name) + +# # update the last uploaded id in supabase +# if result == "success": +# # update the last uploaded id in supabase +# last_id = int(final_df['id'].iloc[-1]) +# project = AtlasProject(name=project_name, add_datums_if_exists=True) +# project_id = project.id +# project_info = {'course_name': course_name, 'doc_map_id': project_id, 'last_uploaded_doc_id': last_id} +# print("project_info: ", project_info) +# project_response = SUPABASE_CLIENT.table("projects").select("*").eq("course_name", course_name).execute() +# if project_response.data: +# update_response = SUPABASE_CLIENT.table("projects").update(project_info).eq("course_name", course_name).execute() +# print("Response from supabase: ", update_response) +# else: +# insert_response = SUPABASE_CLIENT.table("projects").insert(project_info).execute() +# print("Insert Response from supabase: ", insert_response) + +# # rebuild the map +# rebuild_map(course_name, "document") + +# except Exception as e: +# print(e) +# sentry_sdk.capture_exception(e) +# return "failed" + +# def delete_from_document_map(course_name: str, ids: list): +# """ +# This function is used to delete datapoints from a document map. +# Currently used within the delete_data() function in vector_database.py +# Args: +# course_name: str +# ids: list of str +# """ +# print("in delete_from_document_map()") + +# try: +# # check if project exists +# response = SUPABASE_CLIENT.table("projects").select("doc_map_id").eq("course_name", course_name).execute() +# if response.data: +# project_id = response.data[0]['doc_map_id'] +# else: +# return "No document map found for this course" + +# # fetch project from Nomic +# project = AtlasProject(project_id=project_id, add_datums_if_exists=True) + +# # delete the ids from Nomic +# print("Deleting point from document map:", project.delete_data(ids)) +# with project.wait_for_project_lock(): +# project.rebuild_maps() +# return "success" +# except Exception as e: +# print(e) +# sentry_sdk.capture_exception(e) +# return "Error in deleting from document map: {e}" + +# def log_to_document_map(course_name: str): +# """ +# This is a function which appends new documents to an existing document map. It's called +# at the end of split_and_upload() after inserting data to Supabase. +# Args: +# data: dict - the response data from Supabase insertion +# """ +# print("in add_to_document_map()") + +# try: +# # check if map exists +# response = SUPABASE_CLIENT.table("projects").select("doc_map_id, last_uploaded_doc_id").eq("course_name", course_name).execute() +# if response.data: +# if response.data[0]['doc_map_id']: +# project_id = response.data[0]['doc_map_id'] +# last_uploaded_doc_id = response.data[0]['last_uploaded_doc_id'] +# else: +# # entry present in supabase, but doc map not present +# create_document_map(course_name) +# return "Document map not present, triggering map creation." + +# else: +# # create a map +# create_document_map(course_name) +# return "Document map not present, triggering map creation." + +# project = AtlasProject(project_id=project_id, add_datums_if_exists=True) +# project_name = "Document Map for " + course_name + +# # check if project is LOCKED, if yes -> skip logging +# if not project.is_accepting_data: +# return "Skipping Nomic logging because project is locked." + +# # fetch count of records greater than last_uploaded_doc_id +# print("last uploaded doc id: ", last_uploaded_doc_id) +# response = SUPABASE_CLIENT.table("documents").select("id", count="exact").eq("course_name", course_name).gt("id", last_uploaded_doc_id).execute() +# print("Number of new documents: ", response.count) + +# total_doc_count = response.count +# current_doc_count = 0 +# combined_dfs = [] +# doc_count = 0 +# first_id = last_uploaded_doc_id +# while current_doc_count < total_doc_count: +# # fetch all records from supabase greater than last_uploaded_doc_id +# response = SUPABASE_CLIENT.table("documents").select("id, created_at, s3_path, url, base_url, readable_filename, contexts").eq("course_name", course_name).gt("id", first_id).limit(25).execute() +# df = pd.DataFrame(response.data) +# combined_dfs.append(df) # list of dfs + +# current_doc_count += len(response.data) +# doc_count += len(response.data) + +# if doc_count >= 1000: # upload to Nomic in batches of 1000 +# # concat all dfs from the combined_dfs list +# final_df = pd.concat(combined_dfs, ignore_index=True) +# # prep data for nomic upload +# embeddings, metadata = data_prep_for_doc_map(final_df) + +# # append to existing map +# print("Appending data to existing map...") + +# result = append_to_map(embeddings, metadata, project_name) +# if result == "success": +# # update the last uploaded id in supabase +# last_id = int(final_df['id'].iloc[-1]) +# info = {'last_uploaded_doc_id': last_id} +# update_response = SUPABASE_CLIENT.table("projects").update(info).eq("course_name", course_name).execute() +# print("Response from supabase: ", update_response) + +# # reset variables +# combined_dfs = [] +# doc_count = 0 +# print("Records uploaded: ", current_doc_count) + +# # set first_id for next iteration +# first_id = response.data[-1]['id'] + 1 + +# # upload last set of docs +# if doc_count > 0: +# final_df = pd.concat(combined_dfs, ignore_index=True) +# embeddings, metadata = data_prep_for_doc_map(final_df) +# result = append_to_map(embeddings, metadata, project_name) + +# # update the last uploaded id in supabase +# if result == "success": +# # update the last uploaded id in supabase +# last_id = int(final_df['id'].iloc[-1]) +# project_info = {'last_uploaded_doc_id': last_id} +# update_response = SUPABASE_CLIENT.table("projects").update(project_info).eq("course_name", course_name).execute() +# print("Response from supabase: ", update_response) + +# return "success" +# except Exception as e: +# print(e) +# return "failed" + +# def create_map(embeddings, metadata, map_name, index_name, topic_label_field, colorable_fields): +# """ +# Generic function to create a Nomic map from given parameters. +# Args: +# embeddings: np.array of embeddings +# metadata: pd.DataFrame of metadata +# map_name: str +# index_name: str +# topic_label_field: str +# colorable_fields: list of str +# """ +# nomic.login(os.getenv('NOMIC_API_KEY')) +# try: +# project = atlas.map_embeddings(embeddings=embeddings, +# data=metadata, +# id_field="id", +# build_topic_model=True, +# topic_label_field=topic_label_field, +# name=map_name, +# colorable_fields=colorable_fields, +# add_datums_if_exists=True) +# project.create_index(name=index_name, build_topic_model=True) +# return "success" +# except Exception as e: +# print(e) +# return "Error in creating map: {e}" + +# def append_to_map(embeddings, metadata, map_name): +# """ +# Generic function to append new data to an existing Nomic map. +# Args: +# embeddings: np.array of embeddings +# metadata: pd.DataFrame of Nomic upload metadata +# map_name: str +# """ + +# nomic.login(os.getenv('NOMIC_API_KEY')) +# try: +# project = atlas.AtlasProject(name=map_name, add_datums_if_exists=True) +# with project.wait_for_project_lock(): +# project.add_embeddings(embeddings=embeddings, data=metadata) +# return "success" +# except Exception as e: +# print(e) +# return "Error in appending to map: {e}" + +# def data_prep_for_doc_map(df: pd.DataFrame): +# """ +# This function prepares embeddings and metadata for nomic upload in document map creation. +# Args: +# df: pd.DataFrame - the dataframe of documents from Supabase +# Returns: +# embeddings: np.array of embeddings +# metadata: pd.DataFrame of metadata +# """ +# print("in data_prep_for_doc_map()") + +# metadata = [] +# embeddings = [] + +# texts = [] + +# for index, row in df.iterrows(): +# current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") +# created_at = datetime.datetime.strptime(row['created_at'], "%Y-%m-%dT%H:%M:%S.%f%z").strftime("%Y-%m-%d %H:%M:%S") +# if row['url'] == None: +# row['url'] = "" +# if row['base_url'] == None: +# row['base_url'] = "" +# # iterate through all contexts and create separate entries for each +# context_count = 0 +# for context in row['contexts']: +# context_count += 1 +# text_row = context['text'] +# embeddings_row = context['embedding'] + +# meta_row = { +# "id": str(row['id']) + "_" + str(context_count), +# "created_at": created_at, +# "s3_path": row['s3_path'], +# "url": row['url'], +# "base_url": row['base_url'], +# "readable_filename": row['readable_filename'], +# "modified_at": current_time, +# "text": text_row +# } + +# embeddings.append(embeddings_row) +# metadata.append(meta_row) +# texts.append(text_row) + +# embeddings_np = np.array(embeddings, dtype=object) +# print("Shape of embeddings: ", embeddings_np.shape) + +# # check dimension if embeddings_np is (n, 1536) +# if len(embeddings_np.shape) < 2: +# print("Creating new embeddings...") + +# embeddings_model = OpenAIEmbeddings(openai_api_type="openai", +# openai_api_base="https://api.openai.com/v1/", +# openai_api_key=os.getenv('VLADS_OPENAI_KEY')) # type: ignore +# embeddings = embeddings_model.embed_documents(texts) + +# metadata = pd.DataFrame(metadata) +# embeddings = np.array(embeddings) + +# return embeddings, metadata + +# def rebuild_map(course_name:str, map_type:str): +# """ +# This function rebuilds a given map in Nomic. +# """ +# print("in rebuild_map()") +# nomic.login(os.getenv('NOMIC_API_KEY')) +# if map_type.lower() == 'document': +# NOMIC_MAP_NAME_PREFIX = 'Document Map for ' +# else: +# NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' + +# try: +# # fetch project from Nomic +# project_name = NOMIC_MAP_NAME_PREFIX + course_name +# project = AtlasProject(name=project_name, add_datums_if_exists=True) + +# if project.is_accepting_data: # temporary fix - will skip rebuilding if project is locked +# project.rebuild_maps() +# return "success" +# except Exception as e: +# print(e) +# sentry_sdk.capture_exception(e) +# return "Error in rebuilding map: {e}" + +# if __name__ == '__main__': +# pass diff --git a/ai_ta_backend/database/pubmed_db_access.py b/ai_ta_backend/database/pubmed_db_access.py new file mode 100644 index 00000000..c9f233ec --- /dev/null +++ b/ai_ta_backend/database/pubmed_db_access.py @@ -0,0 +1,75 @@ +import sqlite3 +from contextlib import closing + +from flask import Flask, jsonify, request + +app = Flask(__name__) + +# Database configuration +DATABASE = 'v2-articles.db' + + +def get_db(): + """Get database connection""" + db = sqlite3.connect(DATABASE) + db.row_factory = sqlite3.Row + return db + + +@app.route('/getTextFromContextIDBulk', methods=['POST']) +def get_data_bulk(): + """API endpoint to fetch data from SQLite DB for multiple IDs + Example input: + { + "ids": ["---1PHuidFHAtTDU6WFIB", "p9JJ0YNq-ZqocIypkK__a"] + } + """ + try: + # Get list of IDs from request JSON + ids = request.get_json().get('ids', []) + if not ids: + return jsonify({'error': 'No IDs provided'}), 400 + + with closing(get_db()) as db: + cursor = db.cursor() + print(f"Searching for {len(ids)} IDs") + db.execute("PRAGMA busy_timeout = 5000") # Set 5 second timeout + + # Single query to get both text content and article title + placeholders = ','.join('?' * len(ids)) + query = f''' + SELECT c.ID, c.text, a.Title, a.Minio_Path + FROM contexts c + LEFT JOIN sections s ON c.Section_ID = s.ID + LEFT JOIN articles a ON s.Article_ID = a.ID + WHERE c.ID IN ({placeholders}) + ''' + cursor.execute(query, ids) + results = cursor.fetchall() + + if not results: + print("No results found for any ID") + return jsonify({'error': 'No records found'}), 404 + + # Create response dictionary with both text and title + response = { + str(row['ID']): { + 'page_content': row['text'], + 'readable_filename': row['Title'] if row['Title'] else 'Unknown Title, sorry!', + 'minio_path': row['Minio_Path'] + } for row in results + } + + # Report any IDs that weren't found + missing_ids = set(map(str, ids)) - set(response.keys()) + if missing_ids: + print(f"IDs not found: {missing_ids}") + + return jsonify(response) + + except Exception as e: + return jsonify({'error': str(e)}), 500 + + +if __name__ == '__main__': + app.run(debug=True, port=5001, host='0.0.0.0') diff --git a/ai_ta_backend/database/vector.py b/ai_ta_backend/database/vector.py index a5203783..065d028e 100644 --- a/ai_ta_backend/database/vector.py +++ b/ai_ta_backend/database/vector.py @@ -24,26 +24,32 @@ def __init__(self): api_key=os.environ['QDRANT_API_KEY'], timeout=20, # default is 5 seconds. Getting timeout errors w/ document groups. ) + self.vyriad_client = QdrantClient(url=os.environ['VYRIAD_QDRANT_URL'], + port=int(os.environ['VYRIAD_QDRANT_PORT']), + https=True, + api_key=os.environ['VYRIAD_QDRANT_API_KEY']) self.vectorstore = Qdrant(client=self.qdrant_client, collection_name=os.environ['QDRANT_COLLECTION_NAME'], embeddings=OpenAIEmbeddings(openai_api_key=os.environ['VLADS_OPENAI_KEY'])) def vector_search(self, search_query, course_name, doc_groups: List[str], user_query_embedding, top_n, - disabled_doc_groups: List[str], public_doc_groups: List[dict]): + disabled_doc_groups: List[str], public_doc_groups: List[dict], is_vyriad: bool): """ Search the vector database for a given query. """ # Search the vector database - search_results = self.qdrant_client.search( - collection_name=os.environ['QDRANT_COLLECTION_NAME'], - query_filter=self._create_search_filter(course_name, doc_groups, disabled_doc_groups, public_doc_groups), + client = self.vyriad_client if is_vyriad else self.qdrant_client + collection_name = os.environ['VYRIAD_QDRANT_COLLECTION_NAME'] if is_vyriad else os.environ['QDRANT_COLLECTION_NAME'] + search_results = client.search( + collection_name=collection_name, + query_filter=self._create_search_filter(course_name, doc_groups, disabled_doc_groups, public_doc_groups) + if not is_vyriad else None, with_vectors=False, query_vector=user_query_embedding, limit=top_n, # Return n closest points - # In a system with high disk latency, the re-scoring step may become a bottleneck: https://qdrant.tech/documentation/guides/quantization/ search_params=models.SearchParams(quantization=models.QuantizationSearchParams(rescore=False))) - # print(f"Search results: {search_results}") + print(f"Search results: {search_results}") return search_results def _create_search_filter(self, course_name: str, doc_groups: List[str], admin_disabled_doc_groups: List[str], diff --git a/ai_ta_backend/main.py b/ai_ta_backend/main.py index d9f0765d..1df80596 100644 --- a/ai_ta_backend/main.py +++ b/ai_ta_backend/main.py @@ -19,7 +19,7 @@ from flask_injector import FlaskInjector, RequestScope from injector import Binder, SingletonScope -from ai_ta_backend.beam.nomic_logging import create_document_map +# from ai_ta_backend.beam.nomic_logging import create_document_map from ai_ta_backend.database.aws import AWSStorage from ai_ta_backend.database.sql import SQLDatabase from ai_ta_backend.database.vector import VectorDatabase @@ -36,7 +36,8 @@ ThreadPoolExecutorInterface, ) from ai_ta_backend.service.export_service import ExportService -from ai_ta_backend.service.nomic_service import NomicService + +# from ai_ta_backend.service.nomic_service import NomicService from ai_ta_backend.service.posthog_service import PosthogService from ai_ta_backend.service.project_service import ProjectService from ai_ta_backend.service.retrieval_service import RetrievalService @@ -176,22 +177,21 @@ def delete(service: RetrievalService, flaskExecutor: ExecutorInterface): return response -@app.route('/getNomicMap', methods=['GET']) -def nomic_map(service: NomicService): - course_name: str = request.args.get('course_name', default='', type=str) - map_type: str = request.args.get('map_type', default='conversation', type=str) - - if course_name == '': - # proper web error "400 Bad request" - abort(400, description=f"Missing required parameter: 'course_name' must be provided. Course name: `{course_name}`") +# @app.route('/getNomicMap', methods=['GET']) +# def nomic_map(service: NomicService): +# course_name: str = request.args.get('course_name', default='', type=str) +# map_type: str = request.args.get('map_type', default='conversation', type=str) - map_id = service.get_nomic_map(course_name, map_type) - print("nomic map\n", map_id) +# if course_name == '': +# # proper web error "400 Bad request" +# abort(400, description=f"Missing required parameter: 'course_name' must be provided. Course name: `{course_name}`") - response = jsonify(map_id) - response.headers.add('Access-Control-Allow-Origin', '*') - return response +# map_id = service.get_nomic_map(course_name, map_type) +# print("nomic map\n", map_id) +# response = jsonify(map_id) +# response.headers.add('Access-Control-Allow-Origin', '*') +# return response # @app.route('/createDocumentMap', methods=['GET']) # def createDocumentMap(service: NomicService): @@ -236,28 +236,27 @@ def nomic_map(service: NomicService): # response.headers.add('Access-Control-Allow-Origin', '*') # return response +# @app.route('/onResponseCompletion', methods=['POST']) +# def logToNomic(service: NomicService, flaskExecutor: ExecutorInterface): +# data = request.get_json() +# course_name = data['course_name'] +# conversation = data['conversation'] -@app.route('/onResponseCompletion', methods=['POST']) -def logToNomic(service: NomicService, flaskExecutor: ExecutorInterface): - data = request.get_json() - course_name = data['course_name'] - conversation = data['conversation'] - - if course_name == '' or conversation == '': - # proper web error "400 Bad request" - abort( - 400, - description= - f"Missing one or more required parameters: 'course_name' and 'conversation' must be provided. Course name: `{course_name}`, Conversation: `{conversation}`" - ) - print(f"In /onResponseCompletion for course: {course_name}") - - # background execution of tasks!! - #response = flaskExecutor.submit(service.log_convo_to_nomic, course_name, data) - #result = flaskExecutor.submit(service.log_to_conversation_map, course_name, conversation).result() - response = jsonify({'outcome': 'success'}) - response.headers.add('Access-Control-Allow-Origin', '*') - return response +# if course_name == '' or conversation == '': +# # proper web error "400 Bad request" +# abort( +# 400, +# description= +# f"Missing one or more required parameters: 'course_name' and 'conversation' must be provided. Course name: `{course_name}`, Conversation: `{conversation}`" +# ) +# print(f"In /onResponseCompletion for course: {course_name}") + +# # background execution of tasks!! +# #response = flaskExecutor.submit(service.log_convo_to_nomic, course_name, data) +# #result = flaskExecutor.submit(service.log_to_conversation_map, course_name, conversation).result() +# response = jsonify({'outcome': 'success'}) +# response.headers.add('Access-Control-Allow-Origin', '*') +# return response @app.route('/export-convo-history-csv', methods=['GET']) @@ -612,7 +611,7 @@ def configure(binder: Binder) -> None: binder.bind(RetrievalService, to=RetrievalService, scope=RequestScope) binder.bind(PosthogService, to=PosthogService, scope=SingletonScope) binder.bind(SentryService, to=SentryService, scope=SingletonScope) - binder.bind(NomicService, to=NomicService, scope=SingletonScope) + # binder.bind(NomicService, to=NomicService, scope=SingletonScope) binder.bind(ExportService, to=ExportService, scope=SingletonScope) binder.bind(WorkflowService, to=WorkflowService, scope=SingletonScope) binder.bind(VectorDatabase, to=VectorDatabase, scope=SingletonScope) diff --git a/ai_ta_backend/service/nomic_service.py b/ai_ta_backend/service/nomic_service.py index 80ca86ca..eb66dcd2 100644 --- a/ai_ta_backend/service/nomic_service.py +++ b/ai_ta_backend/service/nomic_service.py @@ -1,562 +1,557 @@ -import datetime -import os -import time -from typing import Union - -import backoff -import nomic -import numpy as np -import pandas as pd -from injector import inject -from langchain.embeddings.openai import OpenAIEmbeddings -from nomic import AtlasProject, atlas - -from ai_ta_backend.database.sql import SQLDatabase -from ai_ta_backend.service.sentry_service import SentryService - -LOCK_EXCEPTIONS = [ - 'Project is locked for state access! Please wait until the project is unlocked to access embeddings.', - 'Project is locked for state access! Please wait until the project is unlocked to access data.', - 'Project is currently indexing and cannot ingest new datums. Try again later.' -] - - -class NomicService(): - - @inject - def __init__(self, sentry: SentryService, sql: SQLDatabase): - nomic.login(os.environ['NOMIC_API_KEY']) - self.sentry = sentry - self.sql = sql - - def get_nomic_map(self, course_name: str, type: str): - """ - Returns the variables necessary to construct an iframe of the Nomic map given a course name. - We just need the ID and URL. - Example values: - map link: https://atlas.nomic.ai/map/ed222613-97d9-46a9-8755-12bbc8a06e3a/f4967ad7-ff37-4098-ad06-7e1e1a93dd93 - map id: f4967ad7-ff37-4098-ad06-7e1e1a93dd93 - """ - # nomic.login(os.getenv('NOMIC_API_KEY')) # login during start of flask app - if type.lower() == 'document': - NOMIC_MAP_NAME_PREFIX = 'Document Map for ' - else: - NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' - - project_name = NOMIC_MAP_NAME_PREFIX + course_name - start_time = time.monotonic() - - try: - project = atlas.AtlasProject(name=project_name, add_datums_if_exists=True) - map = project.get_map(project_name) - - print(f"⏰ Nomic Full Map Retrieval: {(time.monotonic() - start_time):.2f} seconds") - return {"map_id": f"iframe{map.id}", "map_link": map.map_link} - except Exception as e: - # Error: ValueError: You must specify a unique_id_field when creating a new project. - if str(e) == 'You must specify a unique_id_field when creating a new project.': # type: ignore - print( - "Nomic map does not exist yet, probably because you have less than 20 queries/documents on your project: ", - e) - else: - print("ERROR in get_nomic_map():", e) - self.sentry.capture_exception(e) - return {"map_id": None, "map_link": None} - - - def log_to_conversation_map(self, course_name: str, conversation): - """ - This function logs new conversations to existing nomic maps. - 1. Check if nomic map exists - 2. If no, create it - 3. If yes, fetch all conversations since last upload and log it - """ - nomic.login(os.getenv('NOMIC_API_KEY')) - NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' - try: - # check if map exists - response = self.sql.getConvoMapFromProjects(course_name) - print("Response from supabase: ", response.data) - - # entry not present in projects table - if not response.data: - print("Map does not exist for this course. Redirecting to map creation...") - return self.create_conversation_map(course_name) - - # entry present for doc map, but not convo map - elif not response.data[0]['convo_map_id']: - print("Map does not exist for this course. Redirecting to map creation...") - return self.create_conversation_map(course_name) - - project_id = response.data[0]['convo_map_id'] - last_uploaded_convo_id = response.data[0]['last_uploaded_convo_id'] - - # check if project is accepting data - project = AtlasProject(project_id=project_id, add_datums_if_exists=True) - if not project.is_accepting_data: - return "Project is currently indexing and cannot ingest new datums. Try again later." - - # fetch count of conversations since last upload - response = self.sql.getCountFromLLMConvoMonitor(course_name, last_id=last_uploaded_convo_id) - total_convo_count = response.count - print("Total number of unlogged conversations in Supabase: ", total_convo_count) - - if total_convo_count == 0: - # log to an existing conversation - existing_convo = self.log_to_existing_conversation(course_name, conversation) - return existing_convo - - first_id = last_uploaded_convo_id - combined_dfs = [] - current_convo_count = 0 - convo_count = 0 - - while current_convo_count < total_convo_count: - response = self.sql.getAllConversationsBetweenIds(course_name, first_id, 0, 100) - print("Response count: ", len(response.data)) - if len(response.data) == 0: - break - df = pd.DataFrame(response.data) - combined_dfs.append(df) - current_convo_count += len(response.data) - convo_count += len(response.data) - print(current_convo_count) - - if convo_count >= 500: - # concat all dfs from the combined_dfs list - final_df = pd.concat(combined_dfs, ignore_index=True) - # prep data for nomic upload - embeddings, metadata = self.data_prep_for_convo_map(final_df) - # append to existing map - print("Appending data to existing map...") - result = self.append_to_map(embeddings, metadata, NOMIC_MAP_NAME_PREFIX + course_name) - if result == "success": - last_id = int(final_df['id'].iloc[-1]) - project_info = {'course_name': course_name, 'convo_map_id': project_id, 'last_uploaded_convo_id': last_id} - project_response = self.sql.updateProjects(course_name, project_info) - print("Update response from supabase: ", project_response) - # reset variables - combined_dfs = [] - convo_count = 0 - print("Records uploaded: ", current_convo_count) - - # set first_id for next iteration - first_id = response.data[-1]['id'] + 1 - - # upload last set of convos - if convo_count > 0: - print("Uploading last set of conversations...") - final_df = pd.concat(combined_dfs, ignore_index=True) - embeddings, metadata = self.data_prep_for_convo_map(final_df) - result = self.append_to_map(embeddings, metadata, NOMIC_MAP_NAME_PREFIX + course_name) - if result == "success": - last_id = int(final_df['id'].iloc[-1]) - project_info = {'course_name': course_name, 'convo_map_id': project_id, 'last_uploaded_convo_id': last_id} - project_response = self.sql.updateProjects(course_name, project_info) - print("Update response from supabase: ", project_response) - - # rebuild the map - self.rebuild_map(course_name, "conversation") - return "success" - - except Exception as e: - print(e) - self.sentry.capture_exception(e) - return "Error in logging to conversation map: {e}" - - - def log_to_existing_conversation(self, course_name: str, conversation): - """ - This function logs follow-up questions to existing conversations in the map. - """ - print(f"in log_to_existing_conversation() for course: {course_name}") - - try: - conversation_id = conversation['id'] - - # fetch id from supabase - incoming_id_response = self.sql.getConversation(course_name, key="convo_id", value=conversation_id) - - project_name = 'Conversation Map for ' + course_name - project = AtlasProject(name=project_name, add_datums_if_exists=True) - - prev_id = incoming_id_response.data[0]['id'] - uploaded_data = project.get_data(ids=[prev_id]) # fetch data point from nomic - prev_convo = uploaded_data[0]['conversation'] - - # update conversation - messages = conversation['messages'] - messages_to_be_logged = messages[-2:] - - for message in messages_to_be_logged: - if message['role'] == 'user': - emoji = "🙋 " - else: - emoji = "🤖 " - - if isinstance(message['content'], list): - text = message['content'][0]['text'] - else: - text = message['content'] - - prev_convo += "\n>>> " + emoji + message['role'] + ": " + text + "\n" - - # create embeddings of first query - embeddings_model = OpenAIEmbeddings(openai_api_type="openai", - openai_api_base="https://api.openai.com/v1/", - openai_api_key=os.environ['VLADS_OPENAI_KEY'], - openai_api_version="2020-11-07") - embeddings = embeddings_model.embed_documents([uploaded_data[0]['first_query']]) - - # modified timestamp - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - uploaded_data[0]['conversation'] = prev_convo - uploaded_data[0]['modified_at'] = current_time - - metadata = pd.DataFrame(uploaded_data) - embeddings = np.array(embeddings) - - print("Metadata shape:", metadata.shape) - print("Embeddings shape:", embeddings.shape) - - # deleting existing map - print("Deleting point from nomic:", project.delete_data([prev_id])) - - # re-build map to reflect deletion - project.rebuild_maps() - - # re-insert updated conversation - result = self.append_to_map(embeddings, metadata, project_name) - print("Result of appending to existing map:", result) - - return "success" - - except Exception as e: - print("Error in log_to_existing_conversation():", e) - self.sentry.capture_exception(e) - return "Error in logging to existing conversation: {e}" - - - def create_conversation_map(self, course_name: str): - """ - This function creates a conversation map for a given course from scratch. - """ - nomic.login(os.getenv('NOMIC_API_KEY')) - NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' - try: - # check if map exists - response = self.sql.getConvoMapFromProjects(course_name) - print("Response from supabase: ", response.data) - if response.data: - if response.data[0]['convo_map_id']: - return "Map already exists for this course." - - # if no, fetch total count of records - response = self.sql.getCountFromLLMConvoMonitor(course_name, last_id=0) - - # if <20, return message that map cannot be created - if not response.count: - return "No conversations found for this course." - elif response.count < 20: - return "Cannot create a map because there are less than 20 conversations in the course." - - # if >20, iteratively fetch records in batches of 100 - total_convo_count = response.count - print("Total number of conversations in Supabase: ", total_convo_count) - - first_id = response.data[0]['id'] - 1 - combined_dfs = [] - current_convo_count = 0 - convo_count = 0 - first_batch = True - project_name = NOMIC_MAP_NAME_PREFIX + course_name - - # iteratively query in batches of 50 - while current_convo_count < total_convo_count: - response = self.sql.getAllConversationsBetweenIds(course_name, first_id, 0, 100) - print("Response count: ", len(response.data)) - if len(response.data) == 0: - break - df = pd.DataFrame(response.data) - combined_dfs.append(df) - current_convo_count += len(response.data) - convo_count += len(response.data) - print(current_convo_count) - - if convo_count >= 500: - # concat all dfs from the combined_dfs list - final_df = pd.concat(combined_dfs, ignore_index=True) - # prep data for nomic upload - embeddings, metadata = self.data_prep_for_convo_map(final_df) - - if first_batch: - # create a new map - print("Creating new map...") - index_name = course_name + "_convo_index" - topic_label_field = "first_query" - colorable_fields = ["user_email", "first_query", "conversation_id", "created_at"] - result = self.create_map(embeddings, metadata, project_name, index_name, topic_label_field, - colorable_fields) - - if result == "success": - # update flag - first_batch = False - # log project info to supabase - project = AtlasProject(name=project_name, add_datums_if_exists=True) - project_id = project.id - last_id = int(final_df['id'].iloc[-1]) - project_info = {'course_name': course_name, 'convo_map_id': project_id, 'last_uploaded_convo_id': last_id} - # if entry already exists, update it - projects_record = self.sql.getConvoMapFromProjects(course_name) - if projects_record.data: - project_response = self.sql.updateProjects(course_name, project_info) - else: - project_response = self.sql.insertProjectInfo(project_info) - print("Update response from supabase: ", project_response) - else: - # append to existing map - print("Appending data to existing map...") - project = AtlasProject(name=project_name, add_datums_if_exists=True) - result = self.append_to_map(embeddings, metadata, project_name) - if result == "success": - print("map append successful") - last_id = int(final_df['id'].iloc[-1]) - project_info = {'last_uploaded_convo_id': last_id} - project_response = self.sql.updateProjects(course_name, project_info) - print("Update response from supabase: ", project_response) - - # reset variables - combined_dfs = [] - convo_count = 0 - print("Records uploaded: ", current_convo_count) - - # set first_id for next iteration - try: - print("response: ", response.data[-1]['id']) - except: - print("response: ", response.data) - first_id = response.data[-1]['id'] + 1 - - print("Convo count: ", convo_count) - # upload last set of convos - if convo_count > 0: - print("Uploading last set of conversations...") - final_df = pd.concat(combined_dfs, ignore_index=True) - embeddings, metadata = self.data_prep_for_convo_map(final_df) - if first_batch: - # create map - index_name = course_name + "_convo_index" - topic_label_field = "first_query" - colorable_fields = ["user_email", "first_query", "conversation_id", "created_at"] - result = self.create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) - - else: - # append to map - print("in map append") - result = self.append_to_map(embeddings, metadata, project_name) - - if result == "success": - print("last map append successful") - last_id = int(final_df['id'].iloc[-1]) - project = AtlasProject(name=project_name, add_datums_if_exists=True) - project_id = project.id - project_info = {'course_name': course_name, 'convo_map_id': project_id, 'last_uploaded_convo_id': last_id} - print("Project info: ", project_info) - # if entry already exists, update it - projects_record = self.sql.getConvoMapFromProjects(course_name) - if projects_record.data: - project_response = self.sql.updateProjects(course_name, project_info) - else: - project_response = self.sql.insertProjectInfo(project_info) - print("Response from supabase: ", project_response) - - - # rebuild the map - self.rebuild_map(course_name, "conversation") - return "success" - except Exception as e: - print(e) - self.sentry.capture_exception(e) - return "Error in creating conversation map:" + str(e) - - ## -------------------------------- SUPPLEMENTARY MAP FUNCTIONS --------------------------------- ## - - def rebuild_map(self, course_name: str, map_type: str): - """ - This function rebuilds a given map in Nomic. - """ - print("in rebuild_map()") - nomic.login(os.getenv('NOMIC_API_KEY')) - - if map_type.lower() == 'document': - NOMIC_MAP_NAME_PREFIX = 'Document Map for ' - else: - NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' - - try: - # fetch project from Nomic - project_name = NOMIC_MAP_NAME_PREFIX + course_name - project = AtlasProject(name=project_name, add_datums_if_exists=True) - - if project.is_accepting_data: - project.rebuild_maps() - return "success" - except Exception as e: - print(e) - self.sentry.capture_exception(e) - return "Error in rebuilding map: {e}" - - def create_map(self, embeddings, metadata, map_name, index_name, topic_label_field, colorable_fields): - """ - Generic function to create a Nomic map from given parameters. - Args: - embeddings: np.array of embeddings - metadata: pd.DataFrame of metadata - map_name: str - index_name: str - topic_label_field: str - colorable_fields: list of str - """ - nomic.login(os.environ['NOMIC_API_KEY']) - print("in create_map()") - try: - project = atlas.map_embeddings(embeddings=embeddings, - data=metadata, - id_field="id", - build_topic_model=True, - name=map_name, - topic_label_field=topic_label_field, - colorable_fields=colorable_fields, - add_datums_if_exists=True) - project.create_index(index_name, build_topic_model=True) - return "success" - except Exception as e: - print(e) - return "Error in creating map: {e}" - - def append_to_map(self, embeddings, metadata, map_name): - """ - Generic function to append new data to an existing Nomic map. - Args: - embeddings: np.array of embeddings - metadata: pd.DataFrame of Nomic upload metadata - map_name: str - """ - nomic.login(os.environ['NOMIC_API_KEY']) - try: - project = atlas.AtlasProject(name=map_name, add_datums_if_exists=True) - with project.wait_for_project_lock(): - project.add_embeddings(embeddings=embeddings, data=metadata) - return "success" - except Exception as e: - print(e) - return "Error in appending to map: {e}" - - def data_prep_for_convo_map(self, df: pd.DataFrame): - """ - This function prepares embeddings and metadata for nomic upload in conversation map creation. - Args: - df: pd.DataFrame - the dataframe of documents from Supabase - Returns: - embeddings: np.array of embeddings - metadata: pd.DataFrame of metadata - """ - print("in data_prep_for_convo_map()") - - try: - metadata = [] - embeddings = [] - user_queries = [] - - for _index, row in df.iterrows(): - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - created_at = datetime.datetime.strptime(row['created_at'], "%Y-%m-%dT%H:%M:%S.%f%z").strftime("%Y-%m-%d %H:%M:%S") - conversation_exists = False - conversation = "" - emoji = "" - - if row['user_email'] is None: - user_email = "" - else: - user_email = row['user_email'] - - messages = row['convo']['messages'] - - # some conversations include images, so the data structure is different - if isinstance(messages[0]['content'], list): - if 'text' in messages[0]['content'][0]: - first_message = messages[0]['content'][0]['text'] - #print("First message:", first_message) - else: - first_message = messages[0]['content'] - user_queries.append(first_message) - - # construct metadata for multi-turn conversation - for message in messages: - if message['role'] == 'user': - emoji = "🙋 " - else: - emoji = "🤖 " - - if isinstance(message['content'], list): - - if 'text' in message['content'][0]: - text = message['content'][0]['text'] - else: - text = message['content'] - - conversation += "\n>>> " + emoji + message['role'] + ": " + text + "\n" - - meta_row = { - "course": row['course_name'], - "conversation": conversation, - "conversation_id": row['convo']['id'], - "id": row['id'], - "user_email": user_email, - "first_query": first_message, - "created_at": created_at, - "modified_at": current_time - } - #print("Metadata row:", meta_row) - metadata.append(meta_row) - - embeddings_model = OpenAIEmbeddings(openai_api_type="openai", - openai_api_base="https://api.openai.com/v1/", - openai_api_key=os.environ['VLADS_OPENAI_KEY'], - openai_api_version="2020-11-07") - embeddings = embeddings_model.embed_documents(user_queries) - - metadata = pd.DataFrame(metadata) - embeddings = np.array(embeddings) - print("Metadata shape:", metadata.shape) - print("Embeddings shape:", embeddings.shape) - return embeddings, metadata - - except Exception as e: - print("Error in data_prep_for_convo_map():", e) - self.sentry.capture_exception(e) - return None, None - - def delete_from_document_map(self, project_id: str, ids: list): - """ - This function is used to delete datapoints from a document map. - Currently used within the delete_data() function in vector_database.py - Args: - course_name: str - ids: list of str - """ - print("in delete_from_document_map()") - - try: - # fetch project from Nomic - project = AtlasProject(project_id=project_id, add_datums_if_exists=True) - - # delete the ids from Nomic - print("Deleting point from document map:", project.delete_data(ids)) - with project.wait_for_project_lock(): - project.rebuild_maps() - return "Successfully deleted from Nomic map" - except Exception as e: - print(e) - self.sentry.capture_exception(e) - return "Error in deleting from document map: {e}" +# import datetime +# import os +# import time +# from typing import Union + +# import backoff +# import nomic +# import numpy as np +# import pandas as pd +# from injector import inject +# from langchain.embeddings.openai import OpenAIEmbeddings +# # from nomic import AtlasProject, atlas + +# from ai_ta_backend.database.sql import SQLDatabase +# from ai_ta_backend.service.sentry_service import SentryService + +# LOCK_EXCEPTIONS = [ +# 'Project is locked for state access! Please wait until the project is unlocked to access embeddings.', +# 'Project is locked for state access! Please wait until the project is unlocked to access data.', +# 'Project is currently indexing and cannot ingest new datums. Try again later.' +# ] + +# class NomicService(): + +# @inject +# def __init__(self, sentry: SentryService, sql: SQLDatabase): +# nomic.login(os.environ['NOMIC_API_KEY']) +# self.sentry = sentry +# self.sql = sql + +# def get_nomic_map(self, course_name: str, type: str): +# """ +# Returns the variables necessary to construct an iframe of the Nomic map given a course name. +# We just need the ID and URL. +# Example values: +# map link: https://atlas.nomic.ai/map/ed222613-97d9-46a9-8755-12bbc8a06e3a/f4967ad7-ff37-4098-ad06-7e1e1a93dd93 +# map id: f4967ad7-ff37-4098-ad06-7e1e1a93dd93 +# """ +# # nomic.login(os.getenv('NOMIC_API_KEY')) # login during start of flask app +# if type.lower() == 'document': +# NOMIC_MAP_NAME_PREFIX = 'Document Map for ' +# else: +# NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' + +# project_name = NOMIC_MAP_NAME_PREFIX + course_name +# start_time = time.monotonic() + +# try: +# project = atlas.AtlasProject(name=project_name, add_datums_if_exists=True) +# map = project.get_map(project_name) + +# print(f"⏰ Nomic Full Map Retrieval: {(time.monotonic() - start_time):.2f} seconds") +# return {"map_id": f"iframe{map.id}", "map_link": map.map_link} +# except Exception as e: +# # Error: ValueError: You must specify a unique_id_field when creating a new project. +# if str(e) == 'You must specify a unique_id_field when creating a new project.': # type: ignore +# print( +# "Nomic map does not exist yet, probably because you have less than 20 queries/documents on your project: ", +# e) +# else: +# print("ERROR in get_nomic_map():", e) +# self.sentry.capture_exception(e) +# return {"map_id": None, "map_link": None} + +# def log_to_conversation_map(self, course_name: str, conversation): +# """ +# This function logs new conversations to existing nomic maps. +# 1. Check if nomic map exists +# 2. If no, create it +# 3. If yes, fetch all conversations since last upload and log it +# """ +# nomic.login(os.getenv('NOMIC_API_KEY')) +# NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' +# try: +# # check if map exists +# response = self.sql.getConvoMapFromProjects(course_name) +# print("Response from supabase: ", response.data) + +# # entry not present in projects table +# if not response.data: +# print("Map does not exist for this course. Redirecting to map creation...") +# return self.create_conversation_map(course_name) + +# # entry present for doc map, but not convo map +# elif not response.data[0]['convo_map_id']: +# print("Map does not exist for this course. Redirecting to map creation...") +# return self.create_conversation_map(course_name) + +# project_id = response.data[0]['convo_map_id'] +# last_uploaded_convo_id = response.data[0]['last_uploaded_convo_id'] + +# # check if project is accepting data +# project = AtlasProject(project_id=project_id, add_datums_if_exists=True) +# if not project.is_accepting_data: +# return "Project is currently indexing and cannot ingest new datums. Try again later." + +# # fetch count of conversations since last upload +# response = self.sql.getCountFromLLMConvoMonitor(course_name, last_id=last_uploaded_convo_id) +# total_convo_count = response.count +# print("Total number of unlogged conversations in Supabase: ", total_convo_count) + +# if total_convo_count == 0: +# # log to an existing conversation +# existing_convo = self.log_to_existing_conversation(course_name, conversation) +# return existing_convo + +# first_id = last_uploaded_convo_id +# combined_dfs = [] +# current_convo_count = 0 +# convo_count = 0 + +# while current_convo_count < total_convo_count: +# response = self.sql.getAllConversationsBetweenIds(course_name, first_id, 0, 100) +# print("Response count: ", len(response.data)) +# if len(response.data) == 0: +# break +# df = pd.DataFrame(response.data) +# combined_dfs.append(df) +# current_convo_count += len(response.data) +# convo_count += len(response.data) +# print(current_convo_count) + +# if convo_count >= 500: +# # concat all dfs from the combined_dfs list +# final_df = pd.concat(combined_dfs, ignore_index=True) +# # prep data for nomic upload +# embeddings, metadata = self.data_prep_for_convo_map(final_df) +# # append to existing map +# print("Appending data to existing map...") +# result = self.append_to_map(embeddings, metadata, NOMIC_MAP_NAME_PREFIX + course_name) +# if result == "success": +# last_id = int(final_df['id'].iloc[-1]) +# project_info = {'course_name': course_name, 'convo_map_id': project_id, 'last_uploaded_convo_id': last_id} +# project_response = self.sql.updateProjects(course_name, project_info) +# print("Update response from supabase: ", project_response) +# # reset variables +# combined_dfs = [] +# convo_count = 0 +# print("Records uploaded: ", current_convo_count) + +# # set first_id for next iteration +# first_id = response.data[-1]['id'] + 1 + +# # upload last set of convos +# if convo_count > 0: +# print("Uploading last set of conversations...") +# final_df = pd.concat(combined_dfs, ignore_index=True) +# embeddings, metadata = self.data_prep_for_convo_map(final_df) +# result = self.append_to_map(embeddings, metadata, NOMIC_MAP_NAME_PREFIX + course_name) +# if result == "success": +# last_id = int(final_df['id'].iloc[-1]) +# project_info = {'course_name': course_name, 'convo_map_id': project_id, 'last_uploaded_convo_id': last_id} +# project_response = self.sql.updateProjects(course_name, project_info) +# print("Update response from supabase: ", project_response) + +# # rebuild the map +# self.rebuild_map(course_name, "conversation") +# return "success" + +# except Exception as e: +# print(e) +# self.sentry.capture_exception(e) +# return "Error in logging to conversation map: {e}" + +# def log_to_existing_conversation(self, course_name: str, conversation): +# """ +# This function logs follow-up questions to existing conversations in the map. +# """ +# print(f"in log_to_existing_conversation() for course: {course_name}") + +# try: +# conversation_id = conversation['id'] + +# # fetch id from supabase +# incoming_id_response = self.sql.getConversation(course_name, key="convo_id", value=conversation_id) + +# project_name = 'Conversation Map for ' + course_name +# project = AtlasProject(name=project_name, add_datums_if_exists=True) + +# prev_id = incoming_id_response.data[0]['id'] +# uploaded_data = project.get_data(ids=[prev_id]) # fetch data point from nomic +# prev_convo = uploaded_data[0]['conversation'] + +# # update conversation +# messages = conversation['messages'] +# messages_to_be_logged = messages[-2:] + +# for message in messages_to_be_logged: +# if message['role'] == 'user': +# emoji = "🙋 " +# else: +# emoji = "🤖 " + +# if isinstance(message['content'], list): +# text = message['content'][0]['text'] +# else: +# text = message['content'] + +# prev_convo += "\n>>> " + emoji + message['role'] + ": " + text + "\n" + +# # create embeddings of first query +# embeddings_model = OpenAIEmbeddings(openai_api_type="openai", +# openai_api_base="https://api.openai.com/v1/", +# openai_api_key=os.environ['VLADS_OPENAI_KEY'], +# openai_api_version="2020-11-07") +# embeddings = embeddings_model.embed_documents([uploaded_data[0]['first_query']]) + +# # modified timestamp +# current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") +# uploaded_data[0]['conversation'] = prev_convo +# uploaded_data[0]['modified_at'] = current_time + +# metadata = pd.DataFrame(uploaded_data) +# embeddings = np.array(embeddings) + +# print("Metadata shape:", metadata.shape) +# print("Embeddings shape:", embeddings.shape) + +# # deleting existing map +# print("Deleting point from nomic:", project.delete_data([prev_id])) + +# # re-build map to reflect deletion +# project.rebuild_maps() + +# # re-insert updated conversation +# result = self.append_to_map(embeddings, metadata, project_name) +# print("Result of appending to existing map:", result) + +# return "success" + +# except Exception as e: +# print("Error in log_to_existing_conversation():", e) +# self.sentry.capture_exception(e) +# return "Error in logging to existing conversation: {e}" + +# def create_conversation_map(self, course_name: str): +# """ +# This function creates a conversation map for a given course from scratch. +# """ +# nomic.login(os.getenv('NOMIC_API_KEY')) +# NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' +# try: +# # check if map exists +# response = self.sql.getConvoMapFromProjects(course_name) +# print("Response from supabase: ", response.data) +# if response.data: +# if response.data[0]['convo_map_id']: +# return "Map already exists for this course." + +# # if no, fetch total count of records +# response = self.sql.getCountFromLLMConvoMonitor(course_name, last_id=0) + +# # if <20, return message that map cannot be created +# if not response.count: +# return "No conversations found for this course." +# elif response.count < 20: +# return "Cannot create a map because there are less than 20 conversations in the course." + +# # if >20, iteratively fetch records in batches of 100 +# total_convo_count = response.count +# print("Total number of conversations in Supabase: ", total_convo_count) + +# first_id = response.data[0]['id'] - 1 +# combined_dfs = [] +# current_convo_count = 0 +# convo_count = 0 +# first_batch = True +# project_name = NOMIC_MAP_NAME_PREFIX + course_name + +# # iteratively query in batches of 50 +# while current_convo_count < total_convo_count: +# response = self.sql.getAllConversationsBetweenIds(course_name, first_id, 0, 100) +# print("Response count: ", len(response.data)) +# if len(response.data) == 0: +# break +# df = pd.DataFrame(response.data) +# combined_dfs.append(df) +# current_convo_count += len(response.data) +# convo_count += len(response.data) +# print(current_convo_count) + +# if convo_count >= 500: +# # concat all dfs from the combined_dfs list +# final_df = pd.concat(combined_dfs, ignore_index=True) +# # prep data for nomic upload +# embeddings, metadata = self.data_prep_for_convo_map(final_df) + +# if first_batch: +# # create a new map +# print("Creating new map...") +# index_name = course_name + "_convo_index" +# topic_label_field = "first_query" +# colorable_fields = ["user_email", "first_query", "conversation_id", "created_at"] +# result = self.create_map(embeddings, metadata, project_name, index_name, topic_label_field, +# colorable_fields) + +# if result == "success": +# # update flag +# first_batch = False +# # log project info to supabase +# project = AtlasProject(name=project_name, add_datums_if_exists=True) +# project_id = project.id +# last_id = int(final_df['id'].iloc[-1]) +# project_info = {'course_name': course_name, 'convo_map_id': project_id, 'last_uploaded_convo_id': last_id} +# # if entry already exists, update it +# projects_record = self.sql.getConvoMapFromProjects(course_name) +# if projects_record.data: +# project_response = self.sql.updateProjects(course_name, project_info) +# else: +# project_response = self.sql.insertProjectInfo(project_info) +# print("Update response from supabase: ", project_response) +# else: +# # append to existing map +# print("Appending data to existing map...") +# project = AtlasProject(name=project_name, add_datums_if_exists=True) +# result = self.append_to_map(embeddings, metadata, project_name) +# if result == "success": +# print("map append successful") +# last_id = int(final_df['id'].iloc[-1]) +# project_info = {'last_uploaded_convo_id': last_id} +# project_response = self.sql.updateProjects(course_name, project_info) +# print("Update response from supabase: ", project_response) + +# # reset variables +# combined_dfs = [] +# convo_count = 0 +# print("Records uploaded: ", current_convo_count) + +# # set first_id for next iteration +# try: +# print("response: ", response.data[-1]['id']) +# except: +# print("response: ", response.data) +# first_id = response.data[-1]['id'] + 1 + +# print("Convo count: ", convo_count) +# # upload last set of convos +# if convo_count > 0: +# print("Uploading last set of conversations...") +# final_df = pd.concat(combined_dfs, ignore_index=True) +# embeddings, metadata = self.data_prep_for_convo_map(final_df) +# if first_batch: +# # create map +# index_name = course_name + "_convo_index" +# topic_label_field = "first_query" +# colorable_fields = ["user_email", "first_query", "conversation_id", "created_at"] +# result = self.create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) + +# else: +# # append to map +# print("in map append") +# result = self.append_to_map(embeddings, metadata, project_name) + +# if result == "success": +# print("last map append successful") +# last_id = int(final_df['id'].iloc[-1]) +# project = AtlasProject(name=project_name, add_datums_if_exists=True) +# project_id = project.id +# project_info = {'course_name': course_name, 'convo_map_id': project_id, 'last_uploaded_convo_id': last_id} +# print("Project info: ", project_info) +# # if entry already exists, update it +# projects_record = self.sql.getConvoMapFromProjects(course_name) +# if projects_record.data: +# project_response = self.sql.updateProjects(course_name, project_info) +# else: +# project_response = self.sql.insertProjectInfo(project_info) +# print("Response from supabase: ", project_response) + +# # rebuild the map +# self.rebuild_map(course_name, "conversation") +# return "success" +# except Exception as e: +# print(e) +# self.sentry.capture_exception(e) +# return "Error in creating conversation map:" + str(e) + +# ## -------------------------------- SUPPLEMENTARY MAP FUNCTIONS --------------------------------- ## + +# def rebuild_map(self, course_name: str, map_type: str): +# """ +# This function rebuilds a given map in Nomic. +# """ +# print("in rebuild_map()") +# nomic.login(os.getenv('NOMIC_API_KEY')) + +# if map_type.lower() == 'document': +# NOMIC_MAP_NAME_PREFIX = 'Document Map for ' +# else: +# NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' + +# try: +# # fetch project from Nomic +# project_name = NOMIC_MAP_NAME_PREFIX + course_name +# project = AtlasProject(name=project_name, add_datums_if_exists=True) + +# if project.is_accepting_data: +# project.rebuild_maps() +# return "success" +# except Exception as e: +# print(e) +# self.sentry.capture_exception(e) +# return "Error in rebuilding map: {e}" + +# def create_map(self, embeddings, metadata, map_name, index_name, topic_label_field, colorable_fields): +# """ +# Generic function to create a Nomic map from given parameters. +# Args: +# embeddings: np.array of embeddings +# metadata: pd.DataFrame of metadata +# map_name: str +# index_name: str +# topic_label_field: str +# colorable_fields: list of str +# """ +# nomic.login(os.environ['NOMIC_API_KEY']) +# print("in create_map()") +# try: +# project = atlas.map_embeddings(embeddings=embeddings, +# data=metadata, +# id_field="id", +# build_topic_model=True, +# name=map_name, +# topic_label_field=topic_label_field, +# colorable_fields=colorable_fields, +# add_datums_if_exists=True) +# project.create_index(index_name, build_topic_model=True) +# return "success" +# except Exception as e: +# print(e) +# return "Error in creating map: {e}" + +# def append_to_map(self, embeddings, metadata, map_name): +# """ +# Generic function to append new data to an existing Nomic map. +# Args: +# embeddings: np.array of embeddings +# metadata: pd.DataFrame of Nomic upload metadata +# map_name: str +# """ +# nomic.login(os.environ['NOMIC_API_KEY']) +# try: +# project = atlas.AtlasProject(name=map_name, add_datums_if_exists=True) +# with project.wait_for_project_lock(): +# project.add_embeddings(embeddings=embeddings, data=metadata) +# return "success" +# except Exception as e: +# print(e) +# return "Error in appending to map: {e}" + +# def data_prep_for_convo_map(self, df: pd.DataFrame): +# """ +# This function prepares embeddings and metadata for nomic upload in conversation map creation. +# Args: +# df: pd.DataFrame - the dataframe of documents from Supabase +# Returns: +# embeddings: np.array of embeddings +# metadata: pd.DataFrame of metadata +# """ +# print("in data_prep_for_convo_map()") + +# try: +# metadata = [] +# embeddings = [] +# user_queries = [] + +# for _index, row in df.iterrows(): +# current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") +# created_at = datetime.datetime.strptime(row['created_at'], "%Y-%m-%dT%H:%M:%S.%f%z").strftime("%Y-%m-%d %H:%M:%S") +# conversation_exists = False +# conversation = "" +# emoji = "" + +# if row['user_email'] is None: +# user_email = "" +# else: +# user_email = row['user_email'] + +# messages = row['convo']['messages'] + +# # some conversations include images, so the data structure is different +# if isinstance(messages[0]['content'], list): +# if 'text' in messages[0]['content'][0]: +# first_message = messages[0]['content'][0]['text'] +# #print("First message:", first_message) +# else: +# first_message = messages[0]['content'] +# user_queries.append(first_message) + +# # construct metadata for multi-turn conversation +# for message in messages: +# if message['role'] == 'user': +# emoji = "🙋 " +# else: +# emoji = "🤖 " + +# if isinstance(message['content'], list): + +# if 'text' in message['content'][0]: +# text = message['content'][0]['text'] +# else: +# text = message['content'] + +# conversation += "\n>>> " + emoji + message['role'] + ": " + text + "\n" + +# meta_row = { +# "course": row['course_name'], +# "conversation": conversation, +# "conversation_id": row['convo']['id'], +# "id": row['id'], +# "user_email": user_email, +# "first_query": first_message, +# "created_at": created_at, +# "modified_at": current_time +# } +# #print("Metadata row:", meta_row) +# metadata.append(meta_row) + +# embeddings_model = OpenAIEmbeddings(openai_api_type="openai", +# openai_api_base="https://api.openai.com/v1/", +# openai_api_key=os.environ['VLADS_OPENAI_KEY'], +# openai_api_version="2020-11-07") +# embeddings = embeddings_model.embed_documents(user_queries) + +# metadata = pd.DataFrame(metadata) +# embeddings = np.array(embeddings) +# print("Metadata shape:", metadata.shape) +# print("Embeddings shape:", embeddings.shape) +# return embeddings, metadata + +# except Exception as e: +# print("Error in data_prep_for_convo_map():", e) +# self.sentry.capture_exception(e) +# return None, None + +# def delete_from_document_map(self, project_id: str, ids: list): +# """ +# This function is used to delete datapoints from a document map. +# Currently used within the delete_data() function in vector_database.py +# Args: +# course_name: str +# ids: list of str +# """ +# print("in delete_from_document_map()") + +# try: +# # fetch project from Nomic +# project = AtlasProject(project_id=project_id, add_datums_if_exists=True) + +# # delete the ids from Nomic +# print("Deleting point from document map:", project.delete_data(ids)) +# with project.wait_for_project_lock(): +# project.rebuild_maps() +# return "Successfully deleted from Nomic map" +# except Exception as e: +# print(e) +# self.sentry.capture_exception(e) +# return "Error in deleting from document map: {e}" diff --git a/ai_ta_backend/service/retrieval_service.py b/ai_ta_backend/service/retrieval_service.py index 87032c48..82afd7d0 100644 --- a/ai_ta_backend/service/retrieval_service.py +++ b/ai_ta_backend/service/retrieval_service.py @@ -8,9 +8,11 @@ import openai import pytz +import requests from dateutil import parser from injector import inject from langchain.chat_models import AzureChatOpenAI +from langchain.embeddings.ollama import OllamaEmbeddings from langchain.embeddings.openai import OpenAIEmbeddings from langchain.schema import Document @@ -18,7 +20,8 @@ from ai_ta_backend.database.sql import SQLDatabase from ai_ta_backend.database.vector import VectorDatabase from ai_ta_backend.executors.thread_pool_executor import ThreadPoolExecutorAdapter -from ai_ta_backend.service.nomic_service import NomicService + +# from ai_ta_backend.service.nomic_service import NomicService from ai_ta_backend.service.posthog_service import PosthogService from ai_ta_backend.service.sentry_service import SentryService @@ -30,13 +33,13 @@ class RetrievalService: @inject def __init__(self, vdb: VectorDatabase, sqlDb: SQLDatabase, aws: AWSStorage, posthog: PosthogService, - sentry: SentryService, nomicService: NomicService, thread_pool_executor: ThreadPoolExecutorAdapter): + sentry: SentryService, thread_pool_executor: ThreadPoolExecutorAdapter): self.vdb = vdb self.sqlDb = sqlDb self.aws = aws self.sentry = sentry self.posthog = posthog - self.nomicService = nomicService + # self.nomicService = nomicService self.thread_pool_executor = thread_pool_executor openai.api_key = os.environ["VLADS_OPENAI_KEY"] @@ -48,15 +51,7 @@ def __init__(self, vdb: VectorDatabase, sqlDb: SQLDatabase, aws: AWSStorage, pos # openai_api_type=os.environ['OPENAI_API_TYPE'], # openai_api_version=os.environ["OPENAI_API_VERSION"], ) - - # self.llm = AzureChatOpenAI( - # temperature=0, - # deployment_name=os.environ["AZURE_OPENAI_ENGINE"], - # openai_api_base=os.environ["AZURE_OPENAI_ENDPOINT"], - # openai_api_key=os.environ["AZURE_OPENAI_KEY"], - # openai_api_version=os.environ["OPENAI_API_VERSION"], - # openai_api_type=os.environ['OPENAI_API_TYPE'], - # ) + self.vyriad_embeddings = OllamaEmbeddings(base_url=os.environ['OLLAMA_SERVER_URL'], model='nomic-embed-text:v1.5') async def getTopContexts( self, @@ -76,6 +71,9 @@ async def getTopContexts( """ if doc_groups is None: doc_groups = [] + + vyriad_project_name = os.getenv('VYRIAD_PROJECT_NAME', '') + is_vyriad = (vyriad_project_name == course_name) try: start_time_overall = time.monotonic() # Improvement of performance by parallelizing independent operations: @@ -102,7 +100,7 @@ async def getTopContexts( tasks = [ loop.run_in_executor(executor, self.sqlDb.getDisabledDocGroups, course_name), loop.run_in_executor(executor, self.sqlDb.getPublicDocGroups, course_name), - loop.run_in_executor(executor, self._embed_query_and_measure_latency, search_query) + loop.run_in_executor(executor, self._embed_query_and_measure_latency, search_query, is_vyriad) ] disabled_doc_groups_response, public_doc_groups_response, user_query_embedding = await asyncio.gather(*tasks) @@ -119,7 +117,8 @@ async def getTopContexts( doc_groups=doc_groups, user_query_embedding=user_query_embedding, disabled_doc_groups=disabled_doc_groups, - public_doc_groups=public_doc_groups) + public_doc_groups=public_doc_groups, + is_vyriad=is_vyriad) time_to_retrieve_docs = time.monotonic() - start_time_vector_search @@ -378,7 +377,7 @@ def delete_from_nomic_and_supabase(self, course_name: str, identifier_key: str, self.sentry.capture_exception(e) def vector_search(self, search_query, course_name, doc_groups: List[str], user_query_embedding, disabled_doc_groups, - public_doc_groups): + public_doc_groups, is_vyriad): """ Search the vector database for a given query, course name, and document groups. """ @@ -392,7 +391,7 @@ def vector_search(self, search_query, course_name, doc_groups: List[str], user_q public_doc_groups = [] # Max number of search results to return - top_n = 60 + top_n = 120 # Capture the search invoked event to PostHog self._capture_search_invoked_event(search_query, course_name, doc_groups) @@ -400,8 +399,12 @@ def vector_search(self, search_query, course_name, doc_groups: List[str], user_q # Perform the vector search start_time_vector_search = time.monotonic() search_results = self._perform_vector_search(search_query, course_name, doc_groups, user_query_embedding, top_n, - disabled_doc_groups, public_doc_groups) + disabled_doc_groups, public_doc_groups, is_vyriad) time_for_vector_search = time.monotonic() - start_time_vector_search + # SPECIAL CASE FOR VYRIAD + if is_vyriad: + vyriad_search_results = self._vyriad_special_case(search_results) + print(f"Vyriad search results: {vyriad_search_results}") # Process the search results by extracting the page content and metadata start_time_process_search_results = time.monotonic() @@ -420,17 +423,20 @@ def vector_search(self, search_query, course_name, doc_groups: List[str], user_q return found_docs def _perform_vector_search(self, search_query, course_name, doc_groups, user_query_embedding, top_n, - disabled_doc_groups, public_doc_groups): + disabled_doc_groups, public_doc_groups, is_vyriad): qdrant_start_time = time.monotonic() search_results = self.vdb.vector_search(search_query, course_name, doc_groups, user_query_embedding, top_n, - disabled_doc_groups, public_doc_groups) + disabled_doc_groups, public_doc_groups, is_vyriad) self.qdrant_latency_sec = time.monotonic() - qdrant_start_time return search_results - def _embed_query_and_measure_latency(self, search_query): + def _embed_query_and_measure_latency(self, search_query, is_vyriad): openai_start_time = time.monotonic() - user_query_embedding = self.embeddings.embed_query(search_query) + embeddings = self.vyriad_embeddings if is_vyriad else self.embeddings + # print(f"Embeddings: {embeddings}") + user_query_embedding = embeddings.embed_query(search_query) self.openai_embedding_latency = time.monotonic() - openai_start_time + return user_query_embedding def _capture_search_invoked_event(self, search_query, course_name, doc_groups): @@ -443,6 +449,45 @@ def _capture_search_invoked_event(self, search_query, course_name, doc_groups): }, ) + def _vyriad_special_case(self, search_results): + """ + Special case for Vyriad search results - fetches page content from API in bulk. + """ + if not search_results: + return [] + + try: + # Get context IDs from search results + context_ids = [result.payload['context_id'] for result in search_results] + + # Call API to get text for all context IDs in bulk + api_url = "https://pubmed-db-query.ncsa.ai/getTextFromContextIDBulk" + response = requests.post(api_url, json={"ids": context_ids}, timeout=30) + + if not response.ok: + print(f"Error in bulk API request: {response.status_code}") + return [] + + # Create mapping of context_id to text from response + context_texts = response.json() + + # Update search results with texts from bulk response + updated_results = [] + for result in search_results: + context_id = result.payload['context_id'] + if context_id in context_texts: + result.payload['page_content'] = context_texts[context_id]['page_content'] + result.payload['readable_filename'] = context_texts[context_id]['readable_filename'] + result.payload['s3_path'] = str(result.payload['minio_path']).replace('pubmed/', '') # remove bucket name + updated_results.append(result) + + return updated_results + + except Exception as e: + print(f"Error in _vyriad_special_case: {e}") + # sentry.capture_exception(e) + return [] + def _process_search_results(self, search_results, course_name): found_docs: list[Document] = [] for d in search_results: @@ -502,18 +547,19 @@ def format_for_json(self, found_docs: List[Document]) -> List[Dict]: Returns: List[Dict]: _description_ """ - for found_doc in found_docs: - if "pagenumber" not in found_doc.metadata.keys(): - print("found no pagenumber") - found_doc.metadata["pagenumber"] = found_doc.metadata["pagenumber_or_timestamp"] + # for found_doc in found_docs: + # if "pagenumber" not in found_doc.metadata.keys(): + # print("found no pagenumber") + # found_doc.metadata["pagenumber"] = found_doc.metadata["pagenumber_or_timestamp"] contexts = [ { "text": doc.page_content, - "readable_filename": doc.metadata["readable_filename"], - "course_name ": doc.metadata["course_name"], - "s3_path": doc.metadata["s3_path"], - "pagenumber": doc.metadata["pagenumber"], # this because vector db schema is older... + "readable_filename": doc.metadata.get("readable_filename", ""), + "course_name ": doc.metadata.get("course_name", ""), + # "s3_path": doc.metadata["s3_path"], # TODO @Rohan: We should get this from qdrant directly. + "s3_path": doc.metadata.get("s3_path", ""), + "pagenumber": doc.metadata.get("pagenumber", None), # this because vector db schema is older... # OPTIONAL PARAMS... "url": doc.metadata.get("url"), # wouldn't this error out? "base_url": doc.metadata.get("base_url"),