diff --git a/ai_ta_backend/beam/ingest.py b/ai_ta_backend/beam/ingest.py index 7a0a5f19..2783a124 100644 --- a/ai_ta_backend/beam/ingest.py +++ b/ai_ta_backend/beam/ingest.py @@ -50,7 +50,6 @@ from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.vectorstores import Qdrant - #from nomic_logging import delete_from_document_map, log_to_document_map, rebuild_map from OpenaiEmbeddings import OpenAIAPIProcessor from PIL import Image from posthog import Posthog @@ -91,7 +90,6 @@ "GitPython==3.1.40", "beautifulsoup4==4.12.2", "sentry-sdk==1.39.1", - "nomic==2.0.14", "pdfplumber==0.11.0", # PDF OCR, better performance than Fitz/PyMuPDF in my Gies PDF testing. ] @@ -253,8 +251,6 @@ def run_ingest(course_name, s3_paths, base_url, url, readable_filename, content, # response = supabase_client.table('documents_failed').insert(document).execute() # type: ignore # print(f"Supabase ingest failure response: {response}") else: - # Success case: rebuild nomic document map after all ingests are done - # rebuild_status = rebuild_map(str(course_name), map_type='document') pass # Success ingest! @@ -1198,11 +1194,6 @@ def split_and_upload(self, texts: List[str], metadatas: List[Dict[str, Any]], ** response = self.supabase_client.table( os.getenv('REFACTORED_MATERIALS_SUPABASE_TABLE')).insert(document).execute() # type: ignore - # add to Nomic document map - # if len(response.data) > 0: - # course_name = contexts[0].metadata.get('course_name') - # log_to_document_map(course_name) - # need to update Supabase tables with doc group info if len(response.data) > 0: # get groups from kwargs @@ -1386,22 +1377,6 @@ def delete_data(self, course_name: str, s3_path: str, source_url: str): else: print("Error in deleting file from Qdrant:", e) sentry_sdk.capture_exception(e) - # try: - # # delete from Nomic - # response = self.supabase_client.from_( - # os.environ['REFACTORED_MATERIALS_SUPABASE_TABLE']).select("id, s3_path, contexts").eq('s3_path', s3_path).eq( - # 'course_name', course_name).execute() - # data = response.data[0] #single record fetched - # nomic_ids_to_delete = [] - # context_count = len(data['contexts']) - # for i in range(1, context_count + 1): - # nomic_ids_to_delete.append(str(data['id']) + "_" + str(i)) - - # # delete from Nomic - # delete_from_document_map(course_name, nomic_ids_to_delete) - # except Exception as e: - # print("Error in deleting file from Nomic:", e) - # sentry_sdk.capture_exception(e) try: self.supabase_client.from_(os.environ['REFACTORED_MATERIALS_SUPABASE_TABLE']).delete().eq( @@ -1431,22 +1406,7 @@ def delete_data(self, course_name: str, s3_path: str, source_url: str): else: print("Error in deleting file from Qdrant:", e) sentry_sdk.capture_exception(e) - # try: - # # delete from Nomic - # response = self.supabase_client.from_(os.environ['REFACTORED_MATERIALS_SUPABASE_TABLE']).select("id, url, contexts").eq( - # 'url', source_url).eq('course_name', course_name).execute() - # data = response.data[0] #single record fetched - # nomic_ids_to_delete = [] - # context_count = len(data['contexts']) - # for i in range(1, context_count + 1): - # nomic_ids_to_delete.append(str(data['id']) + "_" + str(i)) - - # # delete from Nomic - # delete_from_document_map(course_name, nomic_ids_to_delete) - # except Exception as e: - # print("Error in deleting file from Nomic:", e) - # sentry_sdk.capture_exception(e) - + try: # delete from Supabase self.supabase_client.from_(os.environ['REFACTORED_MATERIALS_SUPABASE_TABLE']).delete().eq( diff --git a/ai_ta_backend/beam/nomic_logging.py b/ai_ta_backend/beam/nomic_logging.py deleted file mode 100644 index 92db8a62..00000000 --- a/ai_ta_backend/beam/nomic_logging.py +++ /dev/null @@ -1,438 +0,0 @@ -import datetime -import os - -import nomic -import numpy as np -import pandas as pd -import sentry_sdk -import supabase -from langchain.embeddings import OpenAIEmbeddings -from nomic import AtlasProject, atlas - -OPENAI_API_TYPE = "azure" - -SUPABASE_CLIENT = supabase.create_client( # type: ignore - supabase_url=os.getenv('SUPABASE_URL'), # type: ignore - supabase_key=os.getenv('SUPABASE_API_KEY')) # type: ignore - -NOMIC_MAP_NAME_PREFIX = 'Document Map for ' - -## -------------------------------- DOCUMENT MAP FUNCTIONS --------------------------------- ## - -def create_document_map(course_name: str): - """ - This is a function which creates a document map for a given course from scratch - 1. Gets count of documents for the course - 2. If less than 20, returns a message that a map cannot be created - 3. If greater than 20, iteratively fetches documents in batches of 25 - 4. Prepares metadata and embeddings for nomic upload - 5. Creates a new map and uploads the data - - Args: - course_name: str - Returns: - str: success or failed - """ - print("in create_document_map()") - nomic.login(os.getenv('NOMIC_API_KEY')) - - try: - # check if map exists - response = SUPABASE_CLIENT.table("projects").select("doc_map_id").eq("course_name", course_name).execute() - if response.data: - if response.data[0]['doc_map_id']: - return "Map already exists for this course." - - # fetch relevant document data from Supabase - response = SUPABASE_CLIENT.table("documents").select("id", - count="exact").eq("course_name", - course_name).order('id', - desc=False).execute() - if not response.count: - return "No documents found for this course." - - total_doc_count = response.count - print("Total number of documents in Supabase: ", total_doc_count) - - # minimum 20 docs needed to create map - if total_doc_count < 20: - return "Cannot create a map because there are less than 20 documents in the course." - - first_id = response.data[0]['id'] - - combined_dfs = [] - curr_total_doc_count = 0 - doc_count = 0 - first_batch = True - - # iteratively query in batches of 25 - while curr_total_doc_count < total_doc_count: - - response = SUPABASE_CLIENT.table("documents").select( - "id, created_at, s3_path, url, base_url, readable_filename, contexts").eq("course_name", course_name).gte( - 'id', first_id).order('id', desc=False).limit(25).execute() - df = pd.DataFrame(response.data) - combined_dfs.append(df) # list of dfs - - curr_total_doc_count += len(response.data) - doc_count += len(response.data) - - if doc_count >= 1000: # upload to Nomic in batches of 1000 - - # concat all dfs from the combined_dfs list - final_df = pd.concat(combined_dfs, ignore_index=True) - - # prep data for nomic upload - embeddings, metadata = data_prep_for_doc_map(final_df) - - if first_batch: - # create a new map - print("Creating new map...") - project_name = NOMIC_MAP_NAME_PREFIX + course_name - index_name = course_name + "_doc_index" - topic_label_field = "text" - colorable_fields = ["readable_filename", "text", "base_url", "created_at"] - result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) - - if result == "success": - # update flag - first_batch = False - # log project info to supabase - project = AtlasProject(name=project_name, add_datums_if_exists=True) - project_id = project.id - last_id = int(final_df['id'].iloc[-1]) - project_info = {'course_name': course_name, 'doc_map_id': project_id, 'last_uploaded_doc_id': last_id} - project_response = SUPABASE_CLIENT.table("projects").select("*").eq("course_name", course_name).execute() - if project_response.data: - update_response = SUPABASE_CLIENT.table("projects").update(project_info).eq("course_name", course_name).execute() - print("Response from supabase: ", update_response) - else: - insert_response = SUPABASE_CLIENT.table("projects").insert(project_info).execute() - print("Insert Response from supabase: ", insert_response) - - - else: - # append to existing map - print("Appending data to existing map...") - project_name = NOMIC_MAP_NAME_PREFIX + course_name - # add project lock logic here - result = append_to_map(embeddings, metadata, project_name) - if result == "success": - # update the last uploaded id in supabase - last_id = int(final_df['id'].iloc[-1]) - info = {'last_uploaded_doc_id': last_id} - update_response = SUPABASE_CLIENT.table("projects").update(info).eq("course_name", course_name).execute() - print("Response from supabase: ", update_response) - - # reset variables - combined_dfs = [] - doc_count = 0 - print("Records uploaded: ", curr_total_doc_count) - - # set first_id for next iteration - first_id = response.data[-1]['id'] + 1 - - # upload last set of docs - if doc_count > 0: - final_df = pd.concat(combined_dfs, ignore_index=True) - embeddings, metadata = data_prep_for_doc_map(final_df) - project_name = NOMIC_MAP_NAME_PREFIX + course_name - if first_batch: - index_name = course_name + "_doc_index" - topic_label_field = "text" - colorable_fields = ["readable_filename", "text", "base_url", "created_at"] - result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) - else: - result = append_to_map(embeddings, metadata, project_name) - - # update the last uploaded id in supabase - if result == "success": - # update the last uploaded id in supabase - last_id = int(final_df['id'].iloc[-1]) - project = AtlasProject(name=project_name, add_datums_if_exists=True) - project_id = project.id - project_info = {'course_name': course_name, 'doc_map_id': project_id, 'last_uploaded_doc_id': last_id} - print("project_info: ", project_info) - project_response = SUPABASE_CLIENT.table("projects").select("*").eq("course_name", course_name).execute() - if project_response.data: - update_response = SUPABASE_CLIENT.table("projects").update(project_info).eq("course_name", course_name).execute() - print("Response from supabase: ", update_response) - else: - insert_response = SUPABASE_CLIENT.table("projects").insert(project_info).execute() - print("Insert Response from supabase: ", insert_response) - - - # rebuild the map - rebuild_map(course_name, "document") - - except Exception as e: - print(e) - sentry_sdk.capture_exception(e) - return "failed" - -def delete_from_document_map(course_name: str, ids: list): - """ - This function is used to delete datapoints from a document map. - Currently used within the delete_data() function in vector_database.py - Args: - course_name: str - ids: list of str - """ - print("in delete_from_document_map()") - - try: - # check if project exists - response = SUPABASE_CLIENT.table("projects").select("doc_map_id").eq("course_name", course_name).execute() - if response.data: - project_id = response.data[0]['doc_map_id'] - else: - return "No document map found for this course" - - # fetch project from Nomic - project = AtlasProject(project_id=project_id, add_datums_if_exists=True) - - # delete the ids from Nomic - print("Deleting point from document map:", project.delete_data(ids)) - with project.wait_for_project_lock(): - project.rebuild_maps() - return "success" - except Exception as e: - print(e) - sentry_sdk.capture_exception(e) - return "Error in deleting from document map: {e}" - - -def log_to_document_map(course_name: str): - """ - This is a function which appends new documents to an existing document map. It's called - at the end of split_and_upload() after inserting data to Supabase. - Args: - data: dict - the response data from Supabase insertion - """ - print("in add_to_document_map()") - - try: - # check if map exists - response = SUPABASE_CLIENT.table("projects").select("doc_map_id, last_uploaded_doc_id").eq("course_name", course_name).execute() - if response.data: - if response.data[0]['doc_map_id']: - project_id = response.data[0]['doc_map_id'] - last_uploaded_doc_id = response.data[0]['last_uploaded_doc_id'] - else: - # entry present in supabase, but doc map not present - create_document_map(course_name) - return "Document map not present, triggering map creation." - - else: - # create a map - create_document_map(course_name) - return "Document map not present, triggering map creation." - - project = AtlasProject(project_id=project_id, add_datums_if_exists=True) - project_name = "Document Map for " + course_name - - # check if project is LOCKED, if yes -> skip logging - if not project.is_accepting_data: - return "Skipping Nomic logging because project is locked." - - # fetch count of records greater than last_uploaded_doc_id - print("last uploaded doc id: ", last_uploaded_doc_id) - response = SUPABASE_CLIENT.table("documents").select("id", count="exact").eq("course_name", course_name).gt("id", last_uploaded_doc_id).execute() - print("Number of new documents: ", response.count) - - total_doc_count = response.count - current_doc_count = 0 - combined_dfs = [] - doc_count = 0 - first_id = last_uploaded_doc_id - while current_doc_count < total_doc_count: - # fetch all records from supabase greater than last_uploaded_doc_id - response = SUPABASE_CLIENT.table("documents").select("id, created_at, s3_path, url, base_url, readable_filename, contexts").eq("course_name", course_name).gt("id", first_id).limit(25).execute() - df = pd.DataFrame(response.data) - combined_dfs.append(df) # list of dfs - - current_doc_count += len(response.data) - doc_count += len(response.data) - - if doc_count >= 1000: # upload to Nomic in batches of 1000 - # concat all dfs from the combined_dfs list - final_df = pd.concat(combined_dfs, ignore_index=True) - # prep data for nomic upload - embeddings, metadata = data_prep_for_doc_map(final_df) - - # append to existing map - print("Appending data to existing map...") - - result = append_to_map(embeddings, metadata, project_name) - if result == "success": - # update the last uploaded id in supabase - last_id = int(final_df['id'].iloc[-1]) - info = {'last_uploaded_doc_id': last_id} - update_response = SUPABASE_CLIENT.table("projects").update(info).eq("course_name", course_name).execute() - print("Response from supabase: ", update_response) - - # reset variables - combined_dfs = [] - doc_count = 0 - print("Records uploaded: ", current_doc_count) - - # set first_id for next iteration - first_id = response.data[-1]['id'] + 1 - - # upload last set of docs - if doc_count > 0: - final_df = pd.concat(combined_dfs, ignore_index=True) - embeddings, metadata = data_prep_for_doc_map(final_df) - result = append_to_map(embeddings, metadata, project_name) - - # update the last uploaded id in supabase - if result == "success": - # update the last uploaded id in supabase - last_id = int(final_df['id'].iloc[-1]) - project_info = {'last_uploaded_doc_id': last_id} - update_response = SUPABASE_CLIENT.table("projects").update(project_info).eq("course_name", course_name).execute() - print("Response from supabase: ", update_response) - - return "success" - except Exception as e: - print(e) - return "failed" - - -def create_map(embeddings, metadata, map_name, index_name, topic_label_field, colorable_fields): - """ - Generic function to create a Nomic map from given parameters. - Args: - embeddings: np.array of embeddings - metadata: pd.DataFrame of metadata - map_name: str - index_name: str - topic_label_field: str - colorable_fields: list of str - """ - nomic.login(os.getenv('NOMIC_API_KEY')) - try: - project = atlas.map_embeddings(embeddings=embeddings, - data=metadata, - id_field="id", - build_topic_model=True, - topic_label_field=topic_label_field, - name=map_name, - colorable_fields=colorable_fields, - add_datums_if_exists=True) - project.create_index(name=index_name, build_topic_model=True) - return "success" - except Exception as e: - print(e) - return "Error in creating map: {e}" - -def append_to_map(embeddings, metadata, map_name): - """ - Generic function to append new data to an existing Nomic map. - Args: - embeddings: np.array of embeddings - metadata: pd.DataFrame of Nomic upload metadata - map_name: str - """ - - nomic.login(os.getenv('NOMIC_API_KEY')) - try: - project = atlas.AtlasProject(name=map_name, add_datums_if_exists=True) - with project.wait_for_project_lock(): - project.add_embeddings(embeddings=embeddings, data=metadata) - return "success" - except Exception as e: - print(e) - return "Error in appending to map: {e}" - -def data_prep_for_doc_map(df: pd.DataFrame): - """ - This function prepares embeddings and metadata for nomic upload in document map creation. - Args: - df: pd.DataFrame - the dataframe of documents from Supabase - Returns: - embeddings: np.array of embeddings - metadata: pd.DataFrame of metadata - """ - print("in data_prep_for_doc_map()") - - metadata = [] - embeddings = [] - - texts = [] - - for index, row in df.iterrows(): - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - created_at = datetime.datetime.strptime(row['created_at'], "%Y-%m-%dT%H:%M:%S.%f%z").strftime("%Y-%m-%d %H:%M:%S") - if row['url'] == None: - row['url'] = "" - if row['base_url'] == None: - row['base_url'] = "" - # iterate through all contexts and create separate entries for each - context_count = 0 - for context in row['contexts']: - context_count += 1 - text_row = context['text'] - embeddings_row = context['embedding'] - - meta_row = { - "id": str(row['id']) + "_" + str(context_count), - "created_at": created_at, - "s3_path": row['s3_path'], - "url": row['url'], - "base_url": row['base_url'], - "readable_filename": row['readable_filename'], - "modified_at": current_time, - "text": text_row - } - - embeddings.append(embeddings_row) - metadata.append(meta_row) - texts.append(text_row) - - embeddings_np = np.array(embeddings, dtype=object) - print("Shape of embeddings: ", embeddings_np.shape) - - # check dimension if embeddings_np is (n, 1536) - if len(embeddings_np.shape) < 2: - print("Creating new embeddings...") - - embeddings_model = OpenAIEmbeddings(openai_api_type="openai", - openai_api_base="https://api.openai.com/v1/", - openai_api_key=os.getenv('VLADS_OPENAI_KEY')) # type: ignore - embeddings = embeddings_model.embed_documents(texts) - - metadata = pd.DataFrame(metadata) - embeddings = np.array(embeddings) - - return embeddings, metadata - -def rebuild_map(course_name:str, map_type:str): - """ - This function rebuilds a given map in Nomic. - """ - print("in rebuild_map()") - nomic.login(os.getenv('NOMIC_API_KEY')) - if map_type.lower() == 'document': - NOMIC_MAP_NAME_PREFIX = 'Document Map for ' - else: - NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' - - try: - # fetch project from Nomic - project_name = NOMIC_MAP_NAME_PREFIX + course_name - project = AtlasProject(name=project_name, add_datums_if_exists=True) - - if project.is_accepting_data: # temporary fix - will skip rebuilding if project is locked - project.rebuild_maps() - return "success" - except Exception as e: - print(e) - sentry_sdk.capture_exception(e) - return "Error in rebuilding map: {e}" - - - -if __name__ == '__main__': - pass - diff --git a/ai_ta_backend/database/sql.py b/ai_ta_backend/database/sql.py index 9e154ab4..f63a7237 100644 --- a/ai_ta_backend/database/sql.py +++ b/ai_ta_backend/database/sql.py @@ -103,6 +103,14 @@ def getCountFromLLMConvoMonitor(self, course_name: str, last_id: int): return self.supabase_client.table("llm-convo-monitor").select("id", count='exact').eq( "course_name", course_name).gt("id", last_id).order('id', desc=False).execute() + def getCountFromDocuments(self, course_name: str, last_id: int): + if last_id == 0: + return self.supabase_client.table("documents").select("id", count='exact').eq("course_name", + course_name).order('id', desc=False).execute() + else: + return self.supabase_client.table("documents").select("id", count='exact').eq("course_name", + course_name).gt("id", last_id).order('id', desc=False).execute() + def getDocMapFromProjects(self, course_name: str): return self.supabase_client.table("projects").select("doc_map_id").eq("course_name", course_name).execute() @@ -219,4 +227,7 @@ def getProjectStats(self, project_name: str): # Return default values if anything fails return {"total_messages": 0, "total_conversations": 0, "unique_users": 0} - \ No newline at end of file + + + def getAllProjects(self): + return self.supabase_client.table("projects").select("course_name, doc_map_id, convo_map_id, last_uploaded_doc_id, last_uploaded_convo_id").execute() \ No newline at end of file diff --git a/ai_ta_backend/main.py b/ai_ta_backend/main.py index b20c7322..631a0b4b 100644 --- a/ai_ta_backend/main.py +++ b/ai_ta_backend/main.py @@ -19,7 +19,6 @@ from flask_injector import FlaskInjector, RequestScope from injector import Binder, SingletonScope -from ai_ta_backend.beam.nomic_logging import create_document_map from ai_ta_backend.database.aws import AWSStorage from ai_ta_backend.database.sql import SQLDatabase from ai_ta_backend.database.vector import VectorDatabase @@ -193,69 +192,52 @@ def nomic_map(service: NomicService): return response -# @app.route('/createDocumentMap', methods=['GET']) -# def createDocumentMap(service: NomicService): -# course_name: str = request.args.get('course_name', default='', type=str) +@app.route('/updateConversationMaps', methods=['GET']) +def updateConversationMaps(service: NomicService, flaskExecutor: ExecutorInterface): -# if course_name == '': -# # proper web error "400 Bad request" -# abort(400, description=f"Missing required parameter: 'course_name' must be provided. Course name: `{course_name}`") + response = flaskExecutor.submit(service.update_conversation_maps).result() -# map_id = create_document_map(course_name) + response = jsonify(response) + response.headers.add('Access-Control-Allow-Origin', '*') + return response -# response = jsonify(map_id) -# response.headers.add('Access-Control-Allow-Origin', '*') -# return response -# @app.route('/createConversationMap', methods=['GET']) -# def createConversationMap(service: NomicService): -# course_name: str = request.args.get('course_name', default='', type=str) +@app.route('/updateDocumentMaps', methods=['GET']) +def updateDocumentMaps(service: NomicService, flaskExecutor: ExecutorInterface): -# if course_name == '': -# # proper web error "400 Bad request" -# abort(400, description=f"Missing required parameter: 'course_name' must be provided. Course name: `{course_name}`") + response = flaskExecutor.submit(service.update_document_maps).result() -# map_id = service.create_conversation_map(course_name) + response = jsonify(response) + response.headers.add('Access-Control-Allow-Origin', '*') + return response -# response = jsonify(map_id) -# response.headers.add('Access-Control-Allow-Origin', '*') -# return response -# @app.route('/logToConversationMap', methods=['GET']) -# def logToConversationMap(service: NomicService, flaskExecutor: ExecutorInterface): -# course_name: str = request.args.get('course_name', default='', type=str) +@app.route('/createDocumentMap', methods=['GET']) +def createDocumentMap(service: NomicService): + course_name: str = request.args.get('course_name', default='', type=str) -# if course_name == '': -# # proper web error "400 Bad request" -# abort(400, description=f"Missing required parameter: 'course_name' must be provided. Course name: `{course_name}`") + if course_name == '': + # proper web error "400 Bad request" + abort(400, description=f"Missing required parameter: 'course_name' must be provided. Course name: `{course_name}`") -# #map_id = service.log_to_conversation_map(course_name) -# map_id = flaskExecutor.submit(service.log_to_conversation_map, course_name).result() + map_id = service.create_document_map(course_name) -# response = jsonify(map_id) -# response.headers.add('Access-Control-Allow-Origin', '*') -# return response + response = jsonify(map_id) + response.headers.add('Access-Control-Allow-Origin', '*') + return response -@app.route('/onResponseCompletion', methods=['POST']) -def logToNomic(service: NomicService, flaskExecutor: ExecutorInterface): - data = request.get_json() - course_name = data['course_name'] - conversation = data['conversation'] +@app.route('/createConversationMap', methods=['GET']) +def createConversationMap(service: NomicService): + course_name: str = request.args.get('course_name', default='', type=str) - if course_name == '' or conversation == '': + if course_name == '': # proper web error "400 Bad request" - abort( - 400, - description= - f"Missing one or more required parameters: 'course_name' and 'conversation' must be provided. Course name: `{course_name}`, Conversation: `{conversation}`" - ) - print(f"In /onResponseCompletion for course: {course_name}") + abort(400, description=f"Missing required parameter: 'course_name' must be provided. Course name: `{course_name}`") - # background execution of tasks!! - #response = flaskExecutor.submit(service.log_convo_to_nomic, course_name, data) - #result = flaskExecutor.submit(service.log_to_conversation_map, course_name, conversation).result() - response = jsonify({'outcome': 'success'}) + map_id = service.create_conversation_map(course_name) + + response = jsonify(map_id) response.headers.add('Access-Control-Allow-Origin', '*') return response @@ -533,6 +515,7 @@ def get_conversation_stats(service: RetrievalService) -> Response: response.headers.add('Access-Control-Allow-Origin', '*') return response + @app.route('/run_flow', methods=['POST']) def run_flow(service: WorkflowService) -> Response: """ @@ -593,16 +576,16 @@ def createProject(service: ProjectService, flaskExecutor: ExecutorInterface) -> @app.route('/getProjectStats', methods=['GET']) def get_project_stats(service: RetrievalService) -> Response: - project_name = request.args.get('project_name', default='', type=str) + project_name = request.args.get('project_name', default='', type=str) - if project_name == '': - abort(400, description="Missing required parameter: 'project_name' must be provided.") + if project_name == '': + abort(400, description="Missing required parameter: 'project_name' must be provided.") - project_stats = service.getProjectStats(project_name) + project_stats = service.getProjectStats(project_name) - response = jsonify(project_stats) - response.headers.add('Access-Control-Allow-Origin', '*') - return response + response = jsonify(project_stats) + response.headers.add('Access-Control-Allow-Origin', '*') + return response def configure(binder: Binder) -> None: diff --git a/ai_ta_backend/service/nomic_service.py b/ai_ta_backend/service/nomic_service.py index 80ca86ca..45c65e67 100644 --- a/ai_ta_backend/service/nomic_service.py +++ b/ai_ta_backend/service/nomic_service.py @@ -1,25 +1,17 @@ import datetime import os +import re import time -from typing import Union -import backoff import nomic -import numpy as np import pandas as pd from injector import inject -from langchain.embeddings.openai import OpenAIEmbeddings -from nomic import AtlasProject, atlas +from nomic import AtlasDataset, atlas +from tenacity import retry, stop_after_attempt, wait_exponential from ai_ta_backend.database.sql import SQLDatabase from ai_ta_backend.service.sentry_service import SentryService -LOCK_EXCEPTIONS = [ - 'Project is locked for state access! Please wait until the project is unlocked to access embeddings.', - 'Project is locked for state access! Please wait until the project is unlocked to access data.', - 'Project is currently indexing and cannot ingest new datums. Try again later.' -] - class NomicService(): @@ -37,21 +29,28 @@ def get_nomic_map(self, course_name: str, type: str): map link: https://atlas.nomic.ai/map/ed222613-97d9-46a9-8755-12bbc8a06e3a/f4967ad7-ff37-4098-ad06-7e1e1a93dd93 map id: f4967ad7-ff37-4098-ad06-7e1e1a93dd93 """ - # nomic.login(os.getenv('NOMIC_API_KEY')) # login during start of flask app - if type.lower() == 'document': - NOMIC_MAP_NAME_PREFIX = 'Document Map for ' - else: - NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' + if not course_name or not type: + raise ValueError("Course name and type are required") + if type.lower() not in ['document', 'conversation']: + raise ValueError("Invalid map type") - project_name = NOMIC_MAP_NAME_PREFIX + course_name start_time = time.monotonic() - try: - project = atlas.AtlasProject(name=project_name, add_datums_if_exists=True) - map = project.get_map(project_name) + if type.lower() == 'document': + map_prefix = 'Document Map for ' + index_suffix = "_doc_index" + else: + map_prefix = 'Conversation Map for ' + index_suffix = "_convo_index" + + project_name = map_prefix + course_name + project_name = project_name.replace(" ", "-").lower() # names are like this - conversation-map-for-cropwizard-15 + project = AtlasDataset(project_name) + map = project.get_map(course_name + index_suffix) print(f"⏰ Nomic Full Map Retrieval: {(time.monotonic() - start_time):.2f} seconds") return {"map_id": f"iframe{map.id}", "map_link": map.map_link} + except Exception as e: # Error: ValueError: You must specify a unique_id_field when creating a new project. if str(e) == 'You must specify a unique_id_field when creating a new project.': # type: ignore @@ -63,500 +62,543 @@ def get_nomic_map(self, course_name: str, type: str): self.sentry.capture_exception(e) return {"map_id": None, "map_link": None} - - def log_to_conversation_map(self, course_name: str, conversation): + def update_conversation_maps(self): """ - This function logs new conversations to existing nomic maps. - 1. Check if nomic map exists - 2. If no, create it - 3. If yes, fetch all conversations since last upload and log it + Updates all conversation maps in UIUC.Chat. To be called via a CRON job. + Returns: + str: 'success' or error message """ - nomic.login(os.getenv('NOMIC_API_KEY')) - NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' try: - # check if map exists - response = self.sql.getConvoMapFromProjects(course_name) - print("Response from supabase: ", response.data) - - # entry not present in projects table - if not response.data: - print("Map does not exist for this course. Redirecting to map creation...") - return self.create_conversation_map(course_name) - - # entry present for doc map, but not convo map - elif not response.data[0]['convo_map_id']: - print("Map does not exist for this course. Redirecting to map creation...") - return self.create_conversation_map(course_name) - - project_id = response.data[0]['convo_map_id'] - last_uploaded_convo_id = response.data[0]['last_uploaded_convo_id'] - - # check if project is accepting data - project = AtlasProject(project_id=project_id, add_datums_if_exists=True) - if not project.is_accepting_data: - return "Project is currently indexing and cannot ingest new datums. Try again later." - - # fetch count of conversations since last upload - response = self.sql.getCountFromLLMConvoMonitor(course_name, last_id=last_uploaded_convo_id) - total_convo_count = response.count - print("Total number of unlogged conversations in Supabase: ", total_convo_count) + projects = self.sql.getAllProjects().data - if total_convo_count == 0: - # log to an existing conversation - existing_convo = self.log_to_existing_conversation(course_name, conversation) - return existing_convo + for project in projects: + course_name = project['course_name'] + print(f"Processing course: {course_name}") - first_id = last_uploaded_convo_id - combined_dfs = [] - current_convo_count = 0 - convo_count = 0 + if not project['convo_map_id']: + print(f"Creating new conversation map for {course_name}") + self.create_conversation_map(course_name) + continue - while current_convo_count < total_convo_count: - response = self.sql.getAllConversationsBetweenIds(course_name, first_id, 0, 100) - print("Response count: ", len(response.data)) - if len(response.data) == 0: - break - df = pd.DataFrame(response.data) - combined_dfs.append(df) - current_convo_count += len(response.data) - convo_count += len(response.data) - print(current_convo_count) + print(f"Updating existing conversation map for {course_name}") + last_uploaded_id = project['last_uploaded_convo_id'] + total_convo_count = self.sql.getCountFromLLMConvoMonitor(course_name, last_id=last_uploaded_id).count - if convo_count >= 500: - # concat all dfs from the combined_dfs list - final_df = pd.concat(combined_dfs, ignore_index=True) - # prep data for nomic upload - embeddings, metadata = self.data_prep_for_convo_map(final_df) - # append to existing map - print("Appending data to existing map...") - result = self.append_to_map(embeddings, metadata, NOMIC_MAP_NAME_PREFIX + course_name) - if result == "success": - last_id = int(final_df['id'].iloc[-1]) - project_info = {'course_name': course_name, 'convo_map_id': project_id, 'last_uploaded_convo_id': last_id} - project_response = self.sql.updateProjects(course_name, project_info) - print("Update response from supabase: ", project_response) - # reset variables - combined_dfs = [] - convo_count = 0 - print("Records uploaded: ", current_convo_count) + if total_convo_count == 0: + print("No new conversations to log.") + continue - # set first_id for next iteration - first_id = response.data[-1]['id'] + 1 + print(f"Found {total_convo_count} unlogged conversations") + combined_dfs = [] + current_count = 0 + + while current_count < total_convo_count: + response = self.sql.getAllConversationsBetweenIds(course_name, last_uploaded_id, 0, 100) + if not response.data: + break + + combined_dfs.append(pd.DataFrame(response.data)) + current_count += len(response.data) + + if combined_dfs: + final_df = pd.concat(combined_dfs, ignore_index=True) + metadata = self.data_prep_for_convo_map(final_df) + + print("Appending data to existing map...") + map_name = re.sub(r'[^a-zA-Z0-9\s-]', '', + f"Conversation Map for {course_name}".replace("_", "-")).replace(" ", "-").lower() + + result = self.append_to_map(metadata=metadata, map_name=map_name) + + if result == "success": + last_id = int(final_df['id'].iloc[-1]) + self.sql.updateProjects(course_name, {'last_uploaded_convo_id': last_id}) + self.rebuild_map(course_name, "conversation") + else: + print(f"Error in updating conversation map: {result}") - # upload last set of convos - if convo_count > 0: - print("Uploading last set of conversations...") - final_df = pd.concat(combined_dfs, ignore_index=True) - embeddings, metadata = self.data_prep_for_convo_map(final_df) - result = self.append_to_map(embeddings, metadata, NOMIC_MAP_NAME_PREFIX + course_name) - if result == "success": - last_id = int(final_df['id'].iloc[-1]) - project_info = {'course_name': course_name, 'convo_map_id': project_id, 'last_uploaded_convo_id': last_id} - project_response = self.sql.updateProjects(course_name, project_info) - print("Update response from supabase: ", project_response) - - # rebuild the map - self.rebuild_map(course_name, "conversation") return "success" - + except Exception as e: - print(e) + error_msg = f"Error in updating conversation maps: {e}" + print(error_msg) self.sentry.capture_exception(e) - return "Error in logging to conversation map: {e}" - - - def log_to_existing_conversation(self, course_name: str, conversation): + return error_msg + + def update_document_maps(self): """ - This function logs follow-up questions to existing conversations in the map. + Updates all document maps in UIUC.Chat by processing and uploading documents in batches. + + Returns: + str: Status of document maps update process """ - print(f"in log_to_existing_conversation() for course: {course_name}") + DOCUMENT_MAP_PREFIX = "Document Map for " + BATCH_SIZE = 100 + UPLOAD_THRESHOLD = 500 try: - conversation_id = conversation['id'] - - # fetch id from supabase - incoming_id_response = self.sql.getConversation(course_name, key="convo_id", value=conversation_id) - - project_name = 'Conversation Map for ' + course_name - project = AtlasProject(name=project_name, add_datums_if_exists=True) - - prev_id = incoming_id_response.data[0]['id'] - uploaded_data = project.get_data(ids=[prev_id]) # fetch data point from nomic - prev_convo = uploaded_data[0]['conversation'] - - # update conversation - messages = conversation['messages'] - messages_to_be_logged = messages[-2:] - - for message in messages_to_be_logged: - if message['role'] == 'user': - emoji = "🙋 " - else: - emoji = "🤖 " - - if isinstance(message['content'], list): - text = message['content'][0]['text'] - else: - text = message['content'] - - prev_convo += "\n>>> " + emoji + message['role'] + ": " + text + "\n" - - # create embeddings of first query - embeddings_model = OpenAIEmbeddings(openai_api_type="openai", - openai_api_base="https://api.openai.com/v1/", - openai_api_key=os.environ['VLADS_OPENAI_KEY'], - openai_api_version="2020-11-07") - embeddings = embeddings_model.embed_documents([uploaded_data[0]['first_query']]) - - # modified timestamp - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - uploaded_data[0]['conversation'] = prev_convo - uploaded_data[0]['modified_at'] = current_time + # Fetch all projects + projects = self.sql.getAllProjects().data - metadata = pd.DataFrame(uploaded_data) - embeddings = np.array(embeddings) + for project in projects: + try: + course_name = project['course_name'] - print("Metadata shape:", metadata.shape) - print("Embeddings shape:", embeddings.shape) + # Determine whether to create or update map + if not project.get('doc_map_id'): + print(f"Creating new document map for course: {course_name}") + status = self.create_document_map(course_name) + print(f"Status of document map creation: {status}") + continue - # deleting existing map - print("Deleting point from nomic:", project.delete_data([prev_id])) + # Check for new documents + last_uploaded_doc_id = project['last_uploaded_doc_id'] + response = self.sql.getCountFromDocuments(course_name, last_id=last_uploaded_doc_id) - # re-build map to reflect deletion - project.rebuild_maps() + if not response.count: + print("No new documents to log.") + continue + + # Prepare update process + total_doc_count = response.count + print(f"Total unlogged documents in Supabase: {total_doc_count}") + + project_name = re.sub(r'[^a-zA-Z0-9\s-]', '', + f"{DOCUMENT_MAP_PREFIX}{course_name}".replace(" ", "-").replace("_", "-").lower()) + first_id = last_uploaded_doc_id + + combined_dfs = [] + current_doc_count = 0 + doc_count = 0 + batch_number = 0 + + while current_doc_count < total_doc_count: + # Fetch documents in batches + response = self.sql.getDocsForIdsGte(course_name=course_name, first_id=first_id, limit=BATCH_SIZE) + + if not response.data: + break + + df = pd.DataFrame(response.data) + combined_dfs.append(df) + current_doc_count += len(response.data) + doc_count += len(response.data) + + # Determine if we should process the batch + should_process = (doc_count >= UPLOAD_THRESHOLD or current_doc_count >= total_doc_count) + + if should_process: + batch_number += 1 + print(f"\nProcessing batch #{batch_number}") + + final_df = pd.concat(combined_dfs, ignore_index=True) + metadata = self.data_prep_for_doc_map(final_df) + + # Upload to map + result = self.append_to_map(metadata=metadata, map_name=project_name) + + if result == "success": + last_id = int(final_df['id'].iloc[-1]) + project_info = {'last_uploaded_doc_id': last_id} + self.sql.updateProjects(course_name, project_info) + + print(f"Completed batch #{batch_number}. " + f"Documents processed: {current_doc_count}/{total_doc_count}") + else: + print(f"Error in uploading batch for {course_name}: {result}") + raise Exception(f"Batch upload failed: {result}") + + # Reset for next batch + combined_dfs = [] + doc_count = 0 + + # Prepare for next iteration + first_id = response.data[-1]['id'] + 1 + + # Exit condition to prevent infinite loop + if current_doc_count >= total_doc_count: + break + + # Rebuild map after all documents are processed + self.rebuild_map(course_name, "document") + print(f"\nSuccessfully processed all documents for {course_name}") + print(f"Total batches processed: {batch_number}") + + except Exception as e: + print(f"Error in updating document map for {course_name}: {e}") + self.sentry.capture_exception(e) + continue - # re-insert updated conversation - result = self.append_to_map(embeddings, metadata, project_name) - print("Result of appending to existing map:", result) - return "success" except Exception as e: - print("Error in log_to_existing_conversation():", e) + print(f"Error in update_document_maps: {e}") self.sentry.capture_exception(e) - return "Error in logging to existing conversation: {e}" - + return f"Error in update_document_maps: {e}" def create_conversation_map(self, course_name: str): """ - This function creates a conversation map for a given course from scratch. + Creates a conversation map for a given course from conversations in the database. + Args: + course_name (str): Name of the course to create a conversation map for. + Returns: + str: Status of map creation process """ - nomic.login(os.getenv('NOMIC_API_KEY')) NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' + BATCH_SIZE = 100 + MIN_CONVERSATIONS = 20 + UPLOAD_THRESHOLD = 500 + try: - # check if map exists - response = self.sql.getConvoMapFromProjects(course_name) - print("Response from supabase: ", response.data) - if response.data: - if response.data[0]['convo_map_id']: - return "Map already exists for this course." - - # if no, fetch total count of records - response = self.sql.getCountFromLLMConvoMonitor(course_name, last_id=0) + # Check if map already exists + existing_map = self.sql.getConvoMapFromProjects(course_name) + if existing_map.data and existing_map.data[0]['convo_map_id']: + return "Map already exists for this course." - # if <20, return message that map cannot be created - if not response.count: - return "No conversations found for this course." - elif response.count < 20: - return "Cannot create a map because there are less than 20 conversations in the course." + # Validate conversation count + response = self.sql.getCountFromLLMConvoMonitor(course_name, last_id=0) + if not response.count or response.count < MIN_CONVERSATIONS: + return f"Cannot create map: {'No new convos present' if not response.count else 'Less than 20 conversations'}" - # if >20, iteratively fetch records in batches of 100 + # Prepare map creation total_convo_count = response.count - print("Total number of conversations in Supabase: ", total_convo_count) + print(f"Total conversations in Supabase: {total_convo_count}") + project_name = re.sub(r'[^a-zA-Z0-9\s-]', '', + (NOMIC_MAP_NAME_PREFIX + course_name).replace(" ", "-").replace("_", "-").lower()) first_id = response.data[0]['id'] - 1 + combined_dfs = [] current_convo_count = 0 convo_count = 0 first_batch = True - project_name = NOMIC_MAP_NAME_PREFIX + course_name - # iteratively query in batches of 50 while current_convo_count < total_convo_count: - response = self.sql.getAllConversationsBetweenIds(course_name, first_id, 0, 100) - print("Response count: ", len(response.data)) - if len(response.data) == 0: + # Fetch conversations in batches + response = self.sql.getAllConversationsBetweenIds(course_name, first_id, 0, BATCH_SIZE) + if not response.data: break + df = pd.DataFrame(response.data) combined_dfs.append(df) current_convo_count += len(response.data) convo_count += len(response.data) - print(current_convo_count) + print(f"Conversations processed: {convo_count}") + print(f"Current conversation count: {current_convo_count}") - if convo_count >= 500: - # concat all dfs from the combined_dfs list + # Process and upload batches when threshold is reached + if convo_count >= UPLOAD_THRESHOLD or current_convo_count >= total_convo_count: + print("Processing batch...") final_df = pd.concat(combined_dfs, ignore_index=True) - # prep data for nomic upload - embeddings, metadata = self.data_prep_for_convo_map(final_df) + print(f"length of final_df: {len(final_df)}") + metadata = self.data_prep_for_convo_map(final_df) + # Create or append to map if first_batch: - # create a new map - print("Creating new map...") - index_name = course_name + "_convo_index" - topic_label_field = "first_query" - colorable_fields = ["user_email", "first_query", "conversation_id", "created_at"] - result = self.create_map(embeddings, metadata, project_name, index_name, topic_label_field, - colorable_fields) - - if result == "success": - # update flag - first_batch = False - # log project info to supabase - project = AtlasProject(name=project_name, add_datums_if_exists=True) - project_id = project.id - last_id = int(final_df['id'].iloc[-1]) - project_info = {'course_name': course_name, 'convo_map_id': project_id, 'last_uploaded_convo_id': last_id} - # if entry already exists, update it - projects_record = self.sql.getConvoMapFromProjects(course_name) - if projects_record.data: - project_response = self.sql.updateProjects(course_name, project_info) - else: - project_response = self.sql.insertProjectInfo(project_info) - print("Update response from supabase: ", project_response) + print("in first batch") + index_name = f"{course_name}_convo_index" + map_title = f"{NOMIC_MAP_NAME_PREFIX}{course_name}" + result = self.create_map(metadata, map_title, index_name, index_field="first_query") else: - # append to existing map - print("Appending data to existing map...") - project = AtlasProject(name=project_name, add_datums_if_exists=True) - result = self.append_to_map(embeddings, metadata, project_name) - if result == "success": - print("map append successful") - last_id = int(final_df['id'].iloc[-1]) - project_info = {'last_uploaded_convo_id': last_id} - project_response = self.sql.updateProjects(course_name, project_info) - print("Update response from supabase: ", project_response) + result = self.append_to_map(metadata=metadata, map_name=project_name) - # reset variables + if result == "success": + project = AtlasDataset(project_name) + last_id = int(final_df['id'].iloc[-1]) + project_info = {'course_name': course_name, 'convo_map_id': project.id, 'last_uploaded_convo_id': last_id} + print("project_info", project_info) + # Update or insert project info + if existing_map.data: + self.sql.updateProjects(course_name, project_info) + else: + self.sql.insertProjectInfo(project_info) + + # Reset for next batch combined_dfs = [] convo_count = 0 - print("Records uploaded: ", current_convo_count) + first_batch = False - # set first_id for next iteration - try: - print("response: ", response.data[-1]['id']) - except: - print("response: ", response.data) - first_id = response.data[-1]['id'] + 1 + # Prepare for next iteration + first_id = response.data[-1]['id'] - print("Convo count: ", convo_count) - # upload last set of convos - if convo_count > 0: - print("Uploading last set of conversations...") - final_df = pd.concat(combined_dfs, ignore_index=True) - embeddings, metadata = self.data_prep_for_convo_map(final_df) - if first_batch: - # create map - index_name = course_name + "_convo_index" - topic_label_field = "first_query" - colorable_fields = ["user_email", "first_query", "conversation_id", "created_at"] - result = self.create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) - - else: - # append to map - print("in map append") - result = self.append_to_map(embeddings, metadata, project_name) - - if result == "success": - print("last map append successful") - last_id = int(final_df['id'].iloc[-1]) - project = AtlasProject(name=project_name, add_datums_if_exists=True) - project_id = project.id - project_info = {'course_name': course_name, 'convo_map_id': project_id, 'last_uploaded_convo_id': last_id} - print("Project info: ", project_info) - # if entry already exists, update it - projects_record = self.sql.getConvoMapFromProjects(course_name) - if projects_record.data: - project_response = self.sql.updateProjects(course_name, project_info) + # Rebuild map + self.rebuild_map(course_name, "conversation") + return "success" + + except Exception as e: + print(e) + self.sentry.capture_exception(e) + return f"Error in creating conversation map: {str(e)}" + + def create_document_map(self, course_name: str): + """ + Creates a document map for a given course from documents in the database. + + Args: + course_name (str): Name of the course to create a document map for. + + Returns: + str: Status of map creation process + """ + DOCUMENT_MAP_PREFIX = "Document Map for " + BATCH_SIZE = 100 + UPLOAD_THRESHOLD = 500 + MIN_DOCUMENTS = 20 + + try: + # Check if document map already exists + project_name = DOCUMENT_MAP_PREFIX + course_name + existing_map = self.sql.getDocMapFromProjects(course_name=course_name) + if existing_map.data and existing_map.data[0]['doc_map_id']: + return "Map already exists." + + # Validate document count + response = self.sql.getCountFromDocuments(course_name, last_id=0) + if not response.count or response.count < MIN_DOCUMENTS: + return f"Cannot create map: {'No new docs present' if not response.count else 'Less than 20 documents'}" + + # Prepare map creation + total_doc_count = response.count + print(f"Total documents in Supabase: {total_doc_count}") + + project_name = re.sub(r'[^a-zA-Z0-9\s-]', '', project_name.replace(" ", "-").replace("_", "-").lower()) + first_id = response.data[0]['id'] - 1 + + combined_dfs = [] + current_doc_count = 0 + doc_count = 0 + first_batch = True + + while current_doc_count < total_doc_count: + # Fetch documents in batches + response = self.sql.getDocsForIdsGte(course_name=course_name, first_id=first_id, limit=BATCH_SIZE) + if not response.data: + break + + df = pd.DataFrame(response.data) + combined_dfs.append(df) + current_doc_count += len(response.data) + doc_count += len(response.data) + + # Determine if we should process the batch + should_process = (doc_count >= UPLOAD_THRESHOLD or current_doc_count >= total_doc_count or + current_doc_count == total_doc_count) + + if should_process: + final_df = pd.concat(combined_dfs, ignore_index=True) + metadata = self.data_prep_for_doc_map(final_df) + + # Create or append to map + index_name = f"{course_name}_doc_index" + if first_batch: + map_title = f"{DOCUMENT_MAP_PREFIX}{course_name}" + result = self.create_map(metadata, map_title, index_name, index_field="text") else: - project_response = self.sql.insertProjectInfo(project_info) - print("Response from supabase: ", project_response) + result = self.append_to_map(metadata=metadata, map_name=project_name) + if result == "success": + project = AtlasDataset(project_name) + last_id = int(final_df['id'].iloc[-1]) + project_info = {'course_name': course_name, 'doc_map_id': project.id, 'last_uploaded_doc_id': last_id} - # rebuild the map - self.rebuild_map(course_name, "conversation") + # Update or insert project info + if existing_map.data: + self.sql.updateProjects(course_name, project_info) + else: + self.sql.insertProjectInfo(project_info) + + # Reset for next batch + combined_dfs = [] + doc_count = 0 + first_batch = False + + # Prepare for next iteration + first_id = response.data[-1]['id'] + 1 + + # Exit condition to prevent infinite loop + if current_doc_count >= total_doc_count: + break + + # Rebuild the map + self.rebuild_map(course_name, "document") return "success" + except Exception as e: print(e) self.sentry.capture_exception(e) - return "Error in creating conversation map:" + str(e) + return f"Error in creating document map: {str(e)}" ## -------------------------------- SUPPLEMENTARY MAP FUNCTIONS --------------------------------- ## def rebuild_map(self, course_name: str, map_type: str): """ - This function rebuilds a given map in Nomic. + Rebuilds a given map in Nomic. + Args: + course_name (str): Name of the course + map_type (str): Type of map ('document' or 'conversation') + Returns: + str: Status of map rebuilding process """ - print("in rebuild_map()") - nomic.login(os.getenv('NOMIC_API_KEY')) - - if map_type.lower() == 'document': - NOMIC_MAP_NAME_PREFIX = 'Document Map for ' - else: - NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' + MAP_PREFIXES = {'document': 'Document Map for ', 'conversation': 'Conversation Map for '} try: - # fetch project from Nomic - project_name = NOMIC_MAP_NAME_PREFIX + course_name - project = AtlasProject(name=project_name, add_datums_if_exists=True) + project_name = re.sub(r'[^a-zA-Z0-9\s-]', '', + (MAP_PREFIXES.get(map_type.lower(), '') + course_name).replace(" ", + "-").replace("_", + "-").lower()) + print(f"Rebuilding map: {project_name}") + project = AtlasDataset(project_name) if project.is_accepting_data: - project.rebuild_maps() + project.update_indices(rebuild_topic_models=True) + return "success" + except Exception as e: print(e) self.sentry.capture_exception(e) - return "Error in rebuilding map: {e}" + return f"Error in rebuilding map: {e}" - def create_map(self, embeddings, metadata, map_name, index_name, topic_label_field, colorable_fields): + def create_map(self, metadata, map_name, index_name, index_field): """ - Generic function to create a Nomic map from given parameters. - Args: - embeddings: np.array of embeddings - metadata: pd.DataFrame of metadata - map_name: str - index_name: str - topic_label_field: str - colorable_fields: list of str - """ - nomic.login(os.environ['NOMIC_API_KEY']) - print("in create_map()") + Creates a Nomic map with topic modeling and duplicate detection. + + Args: + metadata (pd.DataFrame): Metadata for the map + map_name (str): Name of the map to create + index_name (str): Name of the index to create + index_field (str): Field to be indexed + + Returns: + str: 'success' or error message + """ + print(f"Creating map: {map_name}") + try: - project = atlas.map_embeddings(embeddings=embeddings, - data=metadata, - id_field="id", - build_topic_model=True, - name=map_name, - topic_label_field=topic_label_field, - colorable_fields=colorable_fields, - add_datums_if_exists=True) - project.create_index(index_name, build_topic_model=True) + project = atlas.map_data(data=metadata, + identifier=map_name, + id_field="id", + topic_model=True, + duplicate_detection=True, + indexed_field=index_field) + + project.create_index(name=index_name, indexed_field=index_field, topic_model=True, duplicate_detection=True) + return "success" + except Exception as e: print(e) - return "Error in creating map: {e}" + return f"Error in creating map: {e}" - def append_to_map(self, embeddings, metadata, map_name): + @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=10, max=600)) + def append_to_map(self, metadata, map_name): + """ + Appends new data to an existing Nomic map. + + Args: + metadata (pd.DataFrame): Metadata for the map update + map_name (str): Name of the target map + + Returns: + str: 'success' or error message """ - Generic function to append new data to an existing Nomic map. - Args: - embeddings: np.array of embeddings - metadata: pd.DataFrame of Nomic upload metadata - map_name: str - """ - nomic.login(os.environ['NOMIC_API_KEY']) try: - project = atlas.AtlasProject(name=map_name, add_datums_if_exists=True) - with project.wait_for_project_lock(): - project.add_embeddings(embeddings=embeddings, data=metadata) + print(f"Appending to map: {map_name}") + project = AtlasDataset(map_name) + + # if not project.is_accepting_data: + # print("Project is currently indexing. Try again later.") + # return "Project busy" + with project.wait_for_dataset_lock(): + project.add_data(data=metadata) return "success" + except Exception as e: print(e) - return "Error in appending to map: {e}" + return f"Error in appending to map: {e}" - def data_prep_for_convo_map(self, df: pd.DataFrame): + def data_prep_for_convo_map(self, df: pd.DataFrame) -> pd.DataFrame: """ - This function prepares embeddings and metadata for nomic upload in conversation map creation. - Args: - df: pd.DataFrame - the dataframe of documents from Supabase - Returns: - embeddings: np.array of embeddings - metadata: pd.DataFrame of metadata - """ - print("in data_prep_for_convo_map()") + Prepares conversation data from Supabase for Nomic map upload. + Args: + df (pd.DataFrame): Dataframe of documents from Supabase + Returns: + pd.DataFrame: Processed metadata for map creation, or None if error occurs + """ + print("Preparing conversation data for map") try: metadata = [] - embeddings = [] - user_queries = [] - - for _index, row in df.iterrows(): - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - created_at = datetime.datetime.strptime(row['created_at'], "%Y-%m-%dT%H:%M:%S.%f%z").strftime("%Y-%m-%d %H:%M:%S") - conversation_exists = False - conversation = "" - emoji = "" - - if row['user_email'] is None: - user_email = "" - else: - user_email = row['user_email'] + current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + for _, row in df.iterrows(): + created_at = datetime.datetime.strptime(row['created_at'], + "%Y-%m-%dT%H:%M:%S.%f%z").strftime("%Y-%m-%d %H:%M:%S") messages = row['convo']['messages'] + first_message = messages[0]['content'] + if isinstance(first_message, list): + first_message = first_message[0].get('text', '') - # some conversations include images, so the data structure is different - if isinstance(messages[0]['content'], list): - if 'text' in messages[0]['content'][0]: - first_message = messages[0]['content'][0]['text'] - #print("First message:", first_message) - else: - first_message = messages[0]['content'] - user_queries.append(first_message) - - # construct metadata for multi-turn conversation + conversation = [] for message in messages: - if message['role'] == 'user': - emoji = "🙋 " - else: - emoji = "🤖 " - - if isinstance(message['content'], list): - - if 'text' in message['content'][0]: - text = message['content'][0]['text'] - else: - text = message['content'] - - conversation += "\n>>> " + emoji + message['role'] + ": " + text + "\n" + emoji = "🙋 " if message['role'] == 'user' else "🤖 " + content = message['content'] + text = content[0].get('text', '') if isinstance(content, list) else content + conversation.append(f"\n>>> {emoji}{message['role']}: {text}\n") - meta_row = { + metadata.append({ "course": row['course_name'], - "conversation": conversation, + "conversation": ''.join(conversation), "conversation_id": row['convo']['id'], "id": row['id'], - "user_email": user_email, + "user_email": row['user_email'] or "", "first_query": first_message, "created_at": created_at, "modified_at": current_time - } - #print("Metadata row:", meta_row) - metadata.append(meta_row) - - embeddings_model = OpenAIEmbeddings(openai_api_type="openai", - openai_api_base="https://api.openai.com/v1/", - openai_api_key=os.environ['VLADS_OPENAI_KEY'], - openai_api_version="2020-11-07") - embeddings = embeddings_model.embed_documents(user_queries) - - metadata = pd.DataFrame(metadata) - embeddings = np.array(embeddings) - print("Metadata shape:", metadata.shape) - print("Embeddings shape:", embeddings.shape) - return embeddings, metadata + }) + + result = pd.DataFrame(metadata) + print(f"Metadata shape: {result.shape}") + return result except Exception as e: - print("Error in data_prep_for_convo_map():", e) + print(f"Error in data preparation: {e}") self.sentry.capture_exception(e) - return None, None + return pd.DataFrame() - def delete_from_document_map(self, project_id: str, ids: list): + def data_prep_for_doc_map(self, df: pd.DataFrame) -> pd.DataFrame: + """ + Prepares document metadata for Nomic map upload. + + Args: + df (pd.DataFrame): Source documents dataframe + + Returns: + pd.DataFrame: Processed metadata for map creation, or None if error occurs """ - This function is used to delete datapoints from a document map. - Currently used within the delete_data() function in vector_database.py - Args: - course_name: str - ids: list of str - """ - print("in delete_from_document_map()") - try: - # fetch project from Nomic - project = AtlasProject(project_id=project_id, add_datums_if_exists=True) - - # delete the ids from Nomic - print("Deleting point from document map:", project.delete_data(ids)) - with project.wait_for_project_lock(): - project.rebuild_maps() - return "Successfully deleted from Nomic map" + metadata = [] + current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + for _, row in df.iterrows(): + created_at = datetime.datetime.strptime(row['created_at'], + "%Y-%m-%dT%H:%M:%S.%f%z").strftime("%Y-%m-%d %H:%M:%S") + + for idx, context in enumerate(row['contexts'], 1): + metadata.append({ + "id": f"{row['id']}_{idx}", + "created_at": created_at, + "s3_path": row['s3_path'], + "url": row['url'] or "", + "base_url": row['base_url'] or "", + "readable_filename": row['readable_filename'], + "modified_at": current_time, + "text": context['text'] + }) + + return pd.DataFrame(metadata) + except Exception as e: - print(e) + print(f"Error in document data preparation: {e}") self.sentry.capture_exception(e) - return "Error in deleting from document map: {e}" + return pd.DataFrame() diff --git a/requirements.txt b/requirements.txt index b8058fed..75b0edbb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -22,13 +22,13 @@ cryptography==42.0.7 # Utils tiktoken==0.5.1 python-dotenv==1.0.1 -pydantic==1.10.13 # pydantic v1 works better for ray +pydantic==2.8.2 # updated to resolve nomic errors flask-executor==1.0.0 retry==0.9.2 XlsxWriter==3.2.0 # AI & core services -nomic==2.0.14 +nomic==3.3.0 openai==0.28.1 langchain==0.0.331 langchainhub==0.1.14