diff --git a/ai_ta_backend/beam/ingest.py b/ai_ta_backend/beam/ingest.py index a9cb0a4c..9bc8b487 100644 --- a/ai_ta_backend/beam/ingest.py +++ b/ai_ta_backend/beam/ingest.py @@ -1029,8 +1029,8 @@ def split_and_upload(self, texts: List[str], metadatas: List[Dict[str, Any]]): # add to Nomic document map if len(response.data) > 0: - inserted_data = response.data[0] - log_to_document_map(inserted_data) + course_name = contexts[0].metadata.get('course_name') + log_to_document_map(course_name) self.posthog.capture('distinct_id_of_the_user', event='split_and_upload_succeeded', diff --git a/ai_ta_backend/beam/nomic_logging.py b/ai_ta_backend/beam/nomic_logging.py index ce8235a2..30dcf301 100644 --- a/ai_ta_backend/beam/nomic_logging.py +++ b/ai_ta_backend/beam/nomic_logging.py @@ -1,9 +1,6 @@ import datetime -import json import os -import time -import backoff import nomic import numpy as np import pandas as pd @@ -18,393 +15,7 @@ supabase_url=os.getenv('SUPABASE_URL'), # type: ignore supabase_key=os.getenv('SUPABASE_API_KEY')) # type: ignore -LOCK_EXCEPTIONS = [ - 'Project is locked for state access! Please wait until the project is unlocked to access embeddings.', - 'Project is locked for state access! Please wait until the project is unlocked to access data.', - 'Project is currently indexing and cannot ingest new datums. Try again later.' -] - - -def giveup_hdlr(e): - """ - Function to handle giveup conditions in backoff decorator - Args: - e: Exception raised by the decorated function - Returns: - True if we want to stop retrying, False otherwise - """ - (e_args,) = e.args - e_str = e_args['exception'] - - print("giveup_hdlr() called with exception:", e_str) - if e_str in LOCK_EXCEPTIONS: - return False - else: - sentry_sdk.capture_exception(e) - return True - - -def backoff_hdlr(details): - """ - Function to handle backup conditions in backoff decorator. - Currently just prints the details of the backoff. - """ - print( - "\nBacking off {wait:0.1f} seconds after {tries} tries, calling function {target} with args {args} and kwargs {kwargs}" - .format(**details)) - - -def backoff_strategy(): - """ - Function to define retry strategy. Is usualy defined in the decorator, - but passing parameters to it is giving errors. - """ - return backoff.expo(base=10, factor=1.5) - - -@backoff.on_exception(backoff_strategy, - Exception, - max_tries=5, - raise_on_giveup=False, - giveup=giveup_hdlr, - on_backoff=backoff_hdlr) -def log_convo_to_nomic(course_name: str, conversation) -> str: - nomic.login(os.getenv('NOMIC_API_KEY')) # login during start of flask app - NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' - """ - Logs conversation to Nomic. - 1. Check if map exists for given course - 2. Check if conversation ID exists - - if yes, delete and add new data point - - if no, add new data point - 3. Keep current logic for map doesn't exist - update metadata - """ - - print(f"in log_convo_to_nomic() for course: {course_name}") - print("type of conversation:", type(conversation)) - #conversation = json.loads(conversation) - messages = conversation['conversation']['messages'] - if 'user_email' not in conversation['conversation']: - user_email = "NULL" - else: - user_email = conversation['conversation']['user_email'] - conversation_id = conversation['conversation']['id'] - - # we have to upload whole conversations - # check what the fetched data looks like - pandas df or pyarrow table - # check if conversation ID exists in Nomic, if yes fetch all data from it and delete it. - # will have current QA and historical QA from Nomic, append new data and add_embeddings() - - project_name = NOMIC_MAP_NAME_PREFIX + course_name - start_time = time.monotonic() - emoji = "" - - try: - # fetch project metadata and embbeddings - project = AtlasProject(name=project_name, add_datums_if_exists=True) - - map_metadata_df = project.maps[1].data.df # type: ignore - map_embeddings_df = project.maps[1].embeddings.latent - # create a function which returns project, data and embeddings df here - map_metadata_df['id'] = map_metadata_df['id'].astype(int) - last_id = map_metadata_df['id'].max() - - if conversation_id in map_metadata_df.values: - # store that convo metadata locally - prev_data = map_metadata_df[map_metadata_df['conversation_id'] == conversation_id] - prev_index = prev_data.index.values[0] - embeddings = map_embeddings_df[prev_index - 1].reshape(1, 1536) - prev_convo = prev_data['conversation'].values[0] - prev_id = prev_data['id'].values[0] - created_at = pd.to_datetime(prev_data['created_at'].values[0]).strftime('%Y-%m-%d %H:%M:%S') - - # delete that convo data point from Nomic, and print result - print("Deleting point from nomic:", project.delete_data([str(prev_id)])) - - # prep for new point - first_message = prev_convo.split("\n")[1].split(": ")[1] - - # select the last 2 messages and append new convo to prev convo - messages_to_be_logged = messages[-2:] - for message in messages_to_be_logged: - if message['role'] == 'user': - emoji = "🙋 " - else: - emoji = "🤖 " - - if isinstance(message['content'], list): - text = message['content'][0]['text'] - else: - text = message['content'] - - prev_convo += "\n>>> " + emoji + message['role'] + ": " + text + "\n" - - # modified timestamp - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - - # update metadata - metadata = [{ - "course": course_name, - "conversation": prev_convo, - "conversation_id": conversation_id, - "id": last_id + 1, - "user_email": user_email, - "first_query": first_message, - "created_at": created_at, - "modified_at": current_time - }] - else: - print("conversation_id does not exist") - - # add new data point - user_queries = [] - conversation_string = "" - - first_message = messages[0]['content'] - if isinstance(first_message, list): - first_message = first_message[0]['text'] - user_queries.append(first_message) - - for message in messages: - if message['role'] == 'user': - emoji = "🙋 " - else: - emoji = "🤖 " - - if isinstance(message['content'], list): - text = message['content'][0]['text'] - else: - text = message['content'] - - conversation_string += "\n>>> " + emoji + message['role'] + ": " + text + "\n" - - # modified timestamp - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - - metadata = [{ - "course": course_name, - "conversation": conversation_string, - "conversation_id": conversation_id, - "id": last_id + 1, - "user_email": user_email, - "first_query": first_message, - "created_at": current_time, - "modified_at": current_time - }] - - # create embeddings - embeddings_model = OpenAIEmbeddings(openai_api_type=OPENAI_API_TYPE) # type: ignore - embeddings = embeddings_model.embed_documents(user_queries) - - # add embeddings to the project - create a new function for this - project = atlas.AtlasProject(name=project_name, add_datums_if_exists=True) - with project.wait_for_project_lock(): - project.add_embeddings(embeddings=np.array(embeddings), data=pd.DataFrame(metadata)) - project.rebuild_maps() - - print(f"⏰ Nomic logging runtime: {(time.monotonic() - start_time):.2f} seconds") - return f"Successfully logged for {course_name}" - - except Exception as e: - if str(e) == 'You must specify a unique_id_field when creating a new project.': - print("Attempting to create Nomic map...") - result = create_nomic_map(course_name, conversation) - print("result of create_nomic_map():", result) - else: - # raising exception again to trigger backoff and passing parameters to use in create_nomic_map() - raise Exception({"exception": str(e)}) - - - -def get_nomic_map(course_name: str, type: str): - """ - Returns the variables necessary to construct an iframe of the Nomic map given a course name. - We just need the ID and URL. - Example values: - map link: https://atlas.nomic.ai/map/ed222613-97d9-46a9-8755-12bbc8a06e3a/f4967ad7-ff37-4098-ad06-7e1e1a93dd93 - map id: f4967ad7-ff37-4098-ad06-7e1e1a93dd93 - """ - nomic.login(os.getenv('NOMIC_API_KEY')) # login during start of flask app - if type.lower() == 'document': - NOMIC_MAP_NAME_PREFIX = 'Document Map for ' - else: - NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' - - project_name = NOMIC_MAP_NAME_PREFIX + course_name - start_time = time.monotonic() - - try: - project = atlas.AtlasProject(name=project_name, add_datums_if_exists=True) - map = project.get_map(project_name) - - print(f"⏰ Nomic Full Map Retrieval: {(time.monotonic() - start_time):.2f} seconds") - return {"map_id": f"iframe{map.id}", "map_link": map.map_link} - except Exception as e: - # Error: ValueError: You must specify a unique_id_field when creating a new project. - if str(e) == 'You must specify a unique_id_field when creating a new project.': # type: ignore - print("Nomic map does not exist yet, probably because you have less than 20 queries/documents on your project: ", e) - else: - print("ERROR in get_nomic_map():", e) - sentry_sdk.capture_exception(e) - return {"map_id": None, "map_link": None} - - -def create_nomic_map(course_name: str, log_data: list): - """ - Creates a Nomic map for new courses and those which previously had < 20 queries. - 1. fetches supabase conversations for course - 2. appends current embeddings and metadata to it - 2. creates map if there are at least 20 queries - """ - nomic.login(os.getenv('NOMIC_API_KEY')) # login during start of flask app - NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' - - print(f"in create_nomic_map() for {course_name}") - # initialize supabase - supabase_client = supabase.create_client( # type: ignore - supabase_url=os.getenv('SUPABASE_URL'), # type: ignore - supabase_key=os.getenv('SUPABASE_API_KEY')) # type: ignore - - try: - # fetch all conversations with this new course (we expect <=20 conversations, because otherwise the map should be made already) - response = supabase_client.table("llm-convo-monitor").select("*").eq("course_name", course_name).execute() - data = response.data - df = pd.DataFrame(data) - - if len(data) < 19: - return None - else: - # get all queries for course and create metadata - user_queries = [] - metadata = [] - i = 1 - conversation_exists = False - - # current log details - log_messages = log_data['conversation']['messages'] # type: ignore - log_user_email = log_data['conversation']['user_email'] # type: ignore - log_conversation_id = log_data['conversation']['id'] # type: ignore - - for _index, row in df.iterrows(): - user_email = row['user_email'] - created_at = pd.to_datetime(row['created_at']).strftime('%Y-%m-%d %H:%M:%S') - convo = row['convo'] - messages = convo['messages'] - - first_message = messages[0]['content'] - if isinstance(first_message, list): - first_message = first_message[0]['text'] - - user_queries.append(first_message) - - # create metadata for multi-turn conversation - conversation = "" - for message in messages: - # string of role: content, role: content, ... - if message['role'] == 'user': # type: ignore - emoji = "🙋 " - else: - emoji = "🤖 " - - if isinstance(message['content'], list): - text = message['content'][0]['text'] - else: - text = message['content'] - - conversation += "\n>>> " + emoji + message['role'] + ": " + text + "\n" - - # append current chat to previous chat if convo already exists - if convo['id'] == log_conversation_id: - conversation_exists = True - - for m in log_messages: - if m['role'] == 'user': # type: ignore - emoji = "🙋 " - else: - emoji = "🤖 " - - if isinstance(m['content'], list): - text = m['content'][0]['text'] - else: - text = m['content'] - conversation += "\n>>> " + emoji + m['role'] + ": " + text + "\n" - - # adding modified timestamp - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - - # add to metadata - metadata_row = { - "course": row['course_name'], - "conversation": conversation, - "conversation_id": convo['id'], - "id": i, - "user_email": user_email, - "first_query": first_message, - "created_at": created_at, - "modified_at": current_time - } - metadata.append(metadata_row) - i += 1 - - # add current log as a new data point if convo doesn't exist - if not conversation_exists: - user_queries.append(log_messages[0]['content']) - conversation = "" - for message in log_messages: - if message['role'] == 'user': - emoji = "🙋 " - else: - emoji = "🤖 " - - if isinstance(message['content'], list): - text = message['content'][0]['text'] - else: - text = message['content'] - conversation += "\n>>> " + emoji + message['role'] + ": " + text + "\n" - - # adding timestamp - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - - metadata_row = { - "course": course_name, - "conversation": conversation, - "conversation_id": log_conversation_id, - "id": i, - "user_email": log_user_email, - "first_query": log_messages[0]['content'], - "created_at": current_time, - "modified_at": current_time - } - metadata.append(metadata_row) - - metadata = pd.DataFrame(metadata) - embeddings_model = OpenAIEmbeddings(openai_api_type=OPENAI_API_TYPE) # type: ignore - embeddings = embeddings_model.embed_documents(user_queries) - - # create Atlas project - project_name = NOMIC_MAP_NAME_PREFIX + course_name - index_name = course_name + "_convo_index" - project = atlas.map_embeddings( - embeddings=np.array(embeddings), - data=metadata, # type: ignore - this is the correct type, the func signature from Nomic is incomplete - id_field='id', - build_topic_model=True, - topic_label_field='first_query', - name=project_name, - colorable_fields=['conversation_id', 'first_query']) - project.create_index(index_name, build_topic_model=True) - return f"Successfully created Nomic map for {course_name}" - except Exception as e: - # Error: ValueError: You must specify a unique_id_field when creating a new project. - if str(e) == 'You must specify a unique_id_field when creating a new project.': # type: ignore - print("Nomic map does not exist yet, probably because you have less than 20 queries on your project: ", e) - else: - print("ERROR in create_nomic_map():", e) - sentry_sdk.capture_exception(e) - - return "failed" - - +NOMIC_MAP_NAME_PREFIX = 'Document Map for ' ## -------------------------------- DOCUMENT MAP FUNCTIONS --------------------------------- ## @@ -424,21 +35,16 @@ def create_document_map(course_name: str): """ print("in create_document_map()") nomic.login(os.getenv('NOMIC_API_KEY')) - NOMIC_MAP_NAME_PREFIX = 'Document Map for ' - - # initialize supabase - supabase_client = supabase.create_client( # type: ignore - supabase_url=os.getenv('SUPABASE_URL'), # type: ignore - supabase_key=os.getenv('SUPABASE_API_KEY')) # type: ignore - + try: # check if map exists - response = supabase_client.table("projects").select("doc_map_id").eq("course_name", course_name).execute() + response = SUPABASE_CLIENT.table("projects").select("doc_map_id").eq("course_name", course_name).execute() if response.data: - return "Map already exists for this course." + if response.data[0]['doc_map_id']: + return "Map already exists for this course." # fetch relevant document data from Supabase - response = supabase_client.table("documents").select("id", + response = SUPABASE_CLIENT.table("documents").select("id", count="exact").eq("course_name", course_name).order('id', desc=False).execute() @@ -449,92 +55,121 @@ def create_document_map(course_name: str): print("Total number of documents in Supabase: ", total_doc_count) # minimum 20 docs needed to create map + if total_doc_count < 20: + return "Cannot create a map because there are less than 20 documents in the course." - if total_doc_count > 19: - first_id = response.data[0]['id'] - combined_dfs = [] - curr_total_doc_count = 0 - doc_count = 0 - first_batch = True - - # iteratively query in batches of 25 - while curr_total_doc_count < total_doc_count: - - response = supabase_client.table("documents").select( - "id, created_at, s3_path, url, readable_filename, contexts").eq("course_name", course_name).gte( + first_id = response.data[0]['id'] + + combined_dfs = [] + curr_total_doc_count = 0 + doc_count = 0 + first_batch = True + + # iteratively query in batches of 25 + while curr_total_doc_count < total_doc_count: + + response = SUPABASE_CLIENT.table("documents").select( + "id, created_at, s3_path, url, base_url, readable_filename, contexts").eq("course_name", course_name).gte( 'id', first_id).order('id', desc=False).limit(25).execute() - df = pd.DataFrame(response.data) - combined_dfs.append(df) # list of dfs - - curr_total_doc_count += len(response.data) - doc_count += len(response.data) - - - if doc_count >= 1000: # upload to Nomic every 1000 docs - - # concat all dfs from the combined_dfs list - final_df = pd.concat(combined_dfs, ignore_index=True) - - # prep data for nomic upload - embeddings, metadata = data_prep_for_doc_map(final_df) - - if first_batch: - # create a new map - print("Creating new map...") - project_name = NOMIC_MAP_NAME_PREFIX + course_name - index_name = course_name + "_doc_index" - topic_label_field = "text" - colorable_fields = ["readable_filename", "text"] - result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) + df = pd.DataFrame(response.data) + combined_dfs.append(df) # list of dfs + + curr_total_doc_count += len(response.data) + doc_count += len(response.data) + + if doc_count >= 1000: # upload to Nomic in batches of 1000 + + # concat all dfs from the combined_dfs list + final_df = pd.concat(combined_dfs, ignore_index=True) + + # prep data for nomic upload + embeddings, metadata = data_prep_for_doc_map(final_df) + + if first_batch: + # create a new map + print("Creating new map...") + project_name = NOMIC_MAP_NAME_PREFIX + course_name + index_name = course_name + "_doc_index" + topic_label_field = "text" + colorable_fields = ["readable_filename", "text", "base_url", "created_at"] + result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) + + if result == "success": # update flag first_batch = False + # log project info to supabase + project = AtlasProject(name=project_name, add_datums_if_exists=True) + project_id = project.id + last_id = int(final_df['id'].iloc[-1]) + project_info = {'course_name': course_name, 'doc_map_id': project_id, 'last_uploaded_doc_id': last_id} + project_response = SUPABASE_CLIENT.table("projects").select("*").eq("course_name", course_name).execute() + if project_response.data: + update_response = SUPABASE_CLIENT.table("projects").update(project_info).eq("course_name", course_name).execute() + print("Response from supabase: ", update_response) + else: + insert_response = SUPABASE_CLIENT.table("projects").insert(project_info).execute() + print("Insert Response from supabase: ", insert_response) + - else: - # append to existing map - print("Appending data to existing map...") - project_name = NOMIC_MAP_NAME_PREFIX + course_name - # add project lock logic here - result = append_to_map(embeddings, metadata, project_name) - - - # reset variables - combined_dfs = [] - doc_count = 0 - - # set first_id for next iteration - first_id = response.data[-1]['id'] + 1 - - - # upload last set of docs + else: + # append to existing map + print("Appending data to existing map...") + project_name = NOMIC_MAP_NAME_PREFIX + course_name + # add project lock logic here + result = append_to_map(embeddings, metadata, project_name) + if result == "success": + # update the last uploaded id in supabase + last_id = int(final_df['id'].iloc[-1]) + info = {'last_uploaded_doc_id': last_id} + update_response = SUPABASE_CLIENT.table("projects").update(info).eq("course_name", course_name).execute() + print("Response from supabase: ", update_response) + + # reset variables + combined_dfs = [] + doc_count = 0 + print("Records uploaded: ", curr_total_doc_count) + + # set first_id for next iteration + first_id = response.data[-1]['id'] + 1 + + # upload last set of docs + if doc_count > 0: final_df = pd.concat(combined_dfs, ignore_index=True) embeddings, metadata = data_prep_for_doc_map(final_df) project_name = NOMIC_MAP_NAME_PREFIX + course_name if first_batch: index_name = course_name + "_doc_index" topic_label_field = "text" - colorable_fields = ["readable_filename", "text"] + colorable_fields = ["readable_filename", "text", "base_url", "created_at"] result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) else: result = append_to_map(embeddings, metadata, project_name) - print("Atlas upload status: ", result) - # log info to supabase - project = AtlasProject(name=project_name, add_datums_if_exists=True) - project_id = project.id - project.rebuild_maps() - project_info = {'course_name': course_name, 'doc_map_id': project_id} - response = supabase_client.table("projects").insert(project_info).execute() - print("Response from supabase: ", response) - return "success" - else: - return "Cannot create a map because there are less than 20 documents in the course." + # update the last uploaded id in supabase + if result == "success": + # update the last uploaded id in supabase + last_id = int(final_df['id'].iloc[-1]) + project = AtlasProject(name=project_name, add_datums_if_exists=True) + project_id = project.id + project_info = {'course_name': course_name, 'doc_map_id': project_id, 'last_uploaded_doc_id': last_id} + print("project_info: ", project_info) + project_response = SUPABASE_CLIENT.table("projects").select("*").eq("course_name", course_name).execute() + if project_response.data: + update_response = SUPABASE_CLIENT.table("projects").update(project_info).eq("course_name", course_name).execute() + print("Response from supabase: ", update_response) + else: + insert_response = SUPABASE_CLIENT.table("projects").insert(project_info).execute() + print("Insert Response from supabase: ", insert_response) + + + # rebuild the map + rebuild_map(course_name, "document") + except Exception as e: print(e) sentry_sdk.capture_exception(e) - return "failed" - def delete_from_document_map(course_name: str, ids: list): """ This function is used to delete datapoints from a document map. @@ -560,14 +195,14 @@ def delete_from_document_map(course_name: str, ids: list): print("Deleting point from document map:", project.delete_data(ids)) with project.wait_for_project_lock(): project.rebuild_maps() - return "Successfully deleted from Nomic map" + return "success" except Exception as e: print(e) sentry_sdk.capture_exception(e) return "Error in deleting from document map: {e}" -def log_to_document_map(data: dict): +def log_to_document_map(course_name: str): """ This is a function which appends new documents to an existing document map. It's called at the end of split_and_upload() after inserting data to Supabase. @@ -577,11 +212,11 @@ def log_to_document_map(data: dict): print("in add_to_document_map()") try: - # check if map exists - course_name = data['course_name'] - response = SUPABASE_CLIENT.table("projects").select("doc_map_id").eq("course_name", course_name).execute() + # check if map exists + response = SUPABASE_CLIENT.table("projects").select("doc_map_id, last_uploaded_doc_id").eq("course_name", course_name).execute() if response.data: project_id = response.data[0]['doc_map_id'] + last_uploaded_doc_id = response.data[0]['last_uploaded_doc_id'] else: # create a map map_creation_result = create_document_map(course_name) @@ -589,44 +224,80 @@ def log_to_document_map(data: dict): return "The project has less than 20 documents and a map cannot be created." else: # fetch project id - response = SUPABASE_CLIENT.table("projects").select("doc_map_id").eq("course_name", course_name).execute() + response = SUPABASE_CLIENT.table("projects").select("doc_map_id, last_uploaded_doc_id").eq("course_name", course_name).execute() project_id = response.data[0]['doc_map_id'] + last_uploaded_doc_id = response.data[0]['last_uploaded_doc_id'] project = AtlasProject(project_id=project_id, add_datums_if_exists=True) - #print("Inserted data: ", data) - - embeddings = [] - metadata = [] - context_count = 0 - # prep data for nomic upload - for row in data['contexts']: - context_count += 1 - embeddings.append(row['embedding']) - metadata.append({ - "id": str(data['id']) + "_" + str(context_count), - "doc_ingested_at": data['created_at'], - "s3_path": data['s3_path'], - "url": data['url'], - "readable_filename": data['readable_filename'], - "created_at": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "text": row['text'] - }) - embeddings = np.array(embeddings) - metadata = pd.DataFrame(metadata) - print("Shape of embeddings: ", embeddings.shape) - - # append to existing map project_name = "Document Map for " + course_name - result = append_to_map(embeddings, metadata, project_name) - - return result + # check if project is locked, if yes -> skip logging + if not project.is_accepting_data: + return "Skipping Nomic logging because project is locked." + + # fetch count of records greater than last_uploaded_doc_id + print("last uploaded doc id: ", last_uploaded_doc_id) + response = SUPABASE_CLIENT.table("documents").select("id", count="exact").eq("course_name", course_name).gt("id", last_uploaded_doc_id).execute() + print("Number of new documents: ", response.count) + total_doc_count = response.count + current_doc_count = 0 + combined_dfs = [] + doc_count = 0 + first_id = last_uploaded_doc_id + while current_doc_count < total_doc_count: + # fetch all records from supabase greater than last_uploaded_doc_id + response = SUPABASE_CLIENT.table("documents").select("id, created_at, s3_path, url, base_url, readable_filename, contexts").eq("course_name", course_name).gt("id", first_id).limit(25).execute() + df = pd.DataFrame(response.data) + combined_dfs.append(df) # list of dfs + + current_doc_count += len(response.data) + doc_count += len(response.data) + + if doc_count >= 1000: # upload to Nomic in batches of 1000 + # concat all dfs from the combined_dfs list + final_df = pd.concat(combined_dfs, ignore_index=True) + # prep data for nomic upload + embeddings, metadata = data_prep_for_doc_map(final_df) + + # append to existing map + print("Appending data to existing map...") + + result = append_to_map(embeddings, metadata, project_name) + if result == "success": + # update the last uploaded id in supabase + last_id = int(final_df['id'].iloc[-1]) + info = {'last_uploaded_doc_id': last_id} + update_response = SUPABASE_CLIENT.table("projects").update(info).eq("course_name", course_name).execute() + print("Response from supabase: ", update_response) + + # reset variables + combined_dfs = [] + doc_count = 0 + print("Records uploaded: ", current_doc_count) + + # set first_id for next iteration + first_id = response.data[-1]['id'] + 1 + + # upload last set of docs + if doc_count > 0: + final_df = pd.concat(combined_dfs, ignore_index=True) + embeddings, metadata = data_prep_for_doc_map(final_df) + result = append_to_map(embeddings, metadata, project_name) + + # update the last uploaded id in supabase + if result == "success": + # update the last uploaded id in supabase + last_id = int(final_df['id'].iloc[-1]) + project_info = {'last_uploaded_doc_id': last_id} + update_response = SUPABASE_CLIENT.table("projects").update(project_info).eq("course_name", course_name).execute() + print("Response from supabase: ", update_response) + + return "success" except Exception as e: print(e) - sentry_sdk.capture_exception(e) - return "Error in appending to map: {e}" - - + return "failed" + + def create_map(embeddings, metadata, map_name, index_name, topic_label_field, colorable_fields): """ Generic function to create a Nomic map from given parameters. @@ -644,11 +315,11 @@ def create_map(embeddings, metadata, map_name, index_name, topic_label_field, co data=metadata, id_field="id", build_topic_model=True, - name=map_name, topic_label_field=topic_label_field, + name=map_name, colorable_fields=colorable_fields, add_datums_if_exists=True) - project.create_index(index_name, build_topic_model=True) + project.create_index(name=index_name, build_topic_model=True) return "success" except Exception as e: print(e) @@ -668,12 +339,11 @@ def append_to_map(embeddings, metadata, map_name): project = atlas.AtlasProject(name=map_name, add_datums_if_exists=True) with project.wait_for_project_lock(): project.add_embeddings(embeddings=embeddings, data=metadata) - return "Successfully appended to Nomic map" + return "success" except Exception as e: print(e) return "Error in appending to map: {e}" - def data_prep_for_doc_map(df: pd.DataFrame): """ This function prepares embeddings and metadata for nomic upload in document map creation. @@ -692,8 +362,11 @@ def data_prep_for_doc_map(df: pd.DataFrame): for index, row in df.iterrows(): current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + created_at = datetime.datetime.strptime(row['created_at'], "%Y-%m-%dT%H:%M:%S.%f%z").strftime("%Y-%m-%d %H:%M:%S") if row['url'] == None: row['url'] = "" + if row['base_url'] == None: + row['base_url'] = "" # iterate through all contexts and create separate entries for each context_count = 0 for context in row['contexts']: @@ -703,11 +376,12 @@ def data_prep_for_doc_map(df: pd.DataFrame): meta_row = { "id": str(row['id']) + "_" + str(context_count), - "doc_ingested_at": row['created_at'], + "created_at": created_at, "s3_path": row['s3_path'], "url": row['url'], + "base_url": row['base_url'], "readable_filename": row['readable_filename'], - "created_at": current_time, + "modified_at": current_time, "text": text_row } @@ -750,7 +424,7 @@ def rebuild_map(course_name:str, map_type:str): with project.wait_for_project_lock(): project.rebuild_maps() - return "Successfully rebuilt map" + return "success" except Exception as e: print(e) sentry_sdk.capture_exception(e) diff --git a/ai_ta_backend/database/sql.py b/ai_ta_backend/database/sql.py index 223bc386..ecd775d2 100644 --- a/ai_ta_backend/database/sql.py +++ b/ai_ta_backend/database/sql.py @@ -73,9 +73,14 @@ def getAllFromTableForDownloadType(self, course_name: str, download_type: str, f return response - def getAllConversationsBetweenIds(self, course_name: str, first_id: int, last_id: int): - return self.supabase_client.table("llm-convo-monitor").select("*").eq("course_name", course_name).gte( - 'id', first_id).lte('id', last_id).order('id', desc=False).limit(25).execute() + def getAllConversationsBetweenIds(self, course_name: str, first_id: int, last_id: int, limit: int = 50): + if last_id == 0: + return self.supabase_client.table("llm-convo-monitor").select("*").eq("course_name", course_name).gt( + 'id', first_id).order('id', desc=False).limit(limit).execute() + else: + return self.supabase_client.table("llm-convo-monitor").select("*").eq("course_name", course_name).gte( + 'id', first_id).lte('id', last_id).order('id', desc=False).limit(limit).execute() + def getDocsForIdsGte(self, course_name: str, first_id: int, fields: str = "*", limit: int = 100): return self.supabase_client.table("documents").select(fields).eq("course_name", course_name).gte( @@ -85,4 +90,20 @@ def insertProjectInfo(self, project_info): return self.supabase_client.table("projects").insert(project_info).execute() def getAllFromLLMConvoMonitor(self, course_name: str): - return self.supabase_client.table("llm-convo-monitor").select("*").eq("course_name", course_name).execute() + return self.supabase_client.table("llm-convo-monitor").select("*").eq("course_name", course_name).order('id', desc=False).execute() + + def getCountFromLLMConvoMonitor(self, course_name: str, last_id: int): + if last_id == 0: + return self.supabase_client.table("llm-convo-monitor").select("id", count='exact').eq("course_name", course_name).order('id', desc=False).execute() + else: + return self.supabase_client.table("llm-convo-monitor").select("id", count='exact').eq("course_name", course_name).gt("id", last_id).order('id', desc=False).execute() + + def getDocMapFromProjects(self, course_name: str): + return self.supabase_client.table("projects").select("doc_map_id").eq("course_name", course_name).execute() + + def getConvoMapFromProjects(self, course_name: str): + return self.supabase_client.table("projects").select("*").eq("course_name", course_name).execute() + + def updateProjects(self, course_name: str, data: dict): + return self.supabase_client.table("projects").update(data).eq("course_name", course_name).execute() + diff --git a/ai_ta_backend/main.py b/ai_ta_backend/main.py index 77bfeea5..452792ac 100644 --- a/ai_ta_backend/main.py +++ b/ai_ta_backend/main.py @@ -38,6 +38,8 @@ from ai_ta_backend.service.retrieval_service import RetrievalService from ai_ta_backend.service.sentry_service import SentryService +from ai_ta_backend.beam.nomic_logging import create_document_map + app = Flask(__name__) CORS(app) executor = Executor(app) @@ -191,7 +193,36 @@ def createDocumentMap(service: NomicService): # proper web error "400 Bad request" abort(400, description=f"Missing required parameter: 'course_name' must be provided. Course name: `{course_name}`") - map_id = service.create_document_map(course_name) + map_id = create_document_map(course_name) + + response = jsonify(map_id) + response.headers.add('Access-Control-Allow-Origin', '*') + return response + +@app.route('/createConversationMap', methods=['GET']) +def createConversationMap(service: NomicService): + course_name: str = request.args.get('course_name', default='', type=str) + + if course_name == '': + # proper web error "400 Bad request" + abort(400, description=f"Missing required parameter: 'course_name' must be provided. Course name: `{course_name}`") + + map_id = service.create_conversation_map(course_name) + + response = jsonify(map_id) + response.headers.add('Access-Control-Allow-Origin', '*') + return response + +@app.route('/logToConversationMap', methods=['GET']) +def logToConversationMap(service: NomicService, flaskExecutor: ExecutorInterface): + course_name: str = request.args.get('course_name', default='', type=str) + + if course_name == '': + # proper web error "400 Bad request" + abort(400, description=f"Missing required parameter: 'course_name' must be provided. Course name: `{course_name}`") + + #map_id = service.log_to_conversation_map(course_name) + map_id = flaskExecutor.submit(service.log_to_conversation_map, course_name).result() response = jsonify(map_id) response.headers.add('Access-Control-Allow-Origin', '*') @@ -214,7 +245,8 @@ def logToNomic(service: NomicService, flaskExecutor: ExecutorInterface): print(f"In /onResponseCompletion for course: {course_name}") # background execution of tasks!! - response = flaskExecutor.submit(service.log_convo_to_nomic, course_name, data) + #response = flaskExecutor.submit(service.log_convo_to_nomic, course_name, data) + result = flaskExecutor.submit(service.log_to_conversation_map, course_name).result() response = jsonify({'outcome': 'success'}) response.headers.add('Access-Control-Allow-Origin', '*') return response diff --git a/ai_ta_backend/service/nomic_service.py b/ai_ta_backend/service/nomic_service.py index aee724fa..fee6ee21 100644 --- a/ai_ta_backend/service/nomic_service.py +++ b/ai_ta_backend/service/nomic_service.py @@ -20,7 +20,6 @@ 'Project is currently indexing and cannot ingest new datums. Try again later.' ] - def giveup_hdlr(e): """ Function to handle giveup conditions in backoff decorator @@ -66,157 +65,157 @@ def __init__(self, sentry: SentryService, sql: SQLDatabase): self.sentry = sentry self.sql = sql - @backoff.on_exception(backoff_strategy, - Exception, - max_tries=5, - raise_on_giveup=False, - giveup=giveup_hdlr, - on_backoff=backoff_hdlr) - def log_convo_to_nomic(self, course_name: str, conversation) -> Union[str, None]: - # nomic.login(os.getenv('NOMIC_API_KEY')) # login during start of flask app - NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' - """ - Logs conversation to Nomic. - 1. Check if map exists for given course - 2. Check if conversation ID exists - - if yes, delete and add new data point - - if no, add new data point - 3. Keep current logic for map doesn't exist - update metadata - """ - - print(f"in log_convo_to_nomic() for course: {course_name}") - print("type of conversation:", type(conversation)) - #conversation = json.loads(conversation) - messages = conversation['conversation']['messages'] - if 'user_email' not in conversation['conversation']: - user_email = "NULL" - else: - user_email = conversation['conversation']['user_email'] - conversation_id = conversation['conversation']['id'] - - # we have to upload whole conversations - # check what the fetched data looks like - pandas df or pyarrow table - # check if conversation ID exists in Nomic, if yes fetch all data from it and delete it. - # will have current QA and historical QA from Nomic, append new data and add_embeddings() - - project_name = NOMIC_MAP_NAME_PREFIX + course_name - start_time = time.monotonic() - emoji = "" - - try: - # fetch project metadata and embbeddings - project = AtlasProject(name=project_name, add_datums_if_exists=True) - - map_metadata_df = project.maps[1].data.df # type: ignore - map_embeddings_df = project.maps[1].embeddings.latent - # create a function which returns project, data and embeddings df here - map_metadata_df['id'] = map_metadata_df['id'].astype(int) - last_id = map_metadata_df['id'].max() - - if conversation_id in map_metadata_df.values: - # store that convo metadata locally - prev_data = map_metadata_df[map_metadata_df['conversation_id'] == conversation_id] - prev_index = prev_data.index.values[0] - embeddings = map_embeddings_df[prev_index - 1].reshape(1, 1536) - prev_convo = prev_data['conversation'].values[0] - prev_id = prev_data['id'].values[0] - created_at = pd.to_datetime(prev_data['created_at'].values[0]).strftime('%Y-%m-%d %H:%M:%S') - - # delete that convo data point from Nomic, and print result - print("Deleting point from nomic:", project.delete_data([str(prev_id)])) - - # prep for new point - first_message = prev_convo.split("\n")[1].split(": ")[1] - - # select the last 2 messages and append new convo to prev convo - messages_to_be_logged = messages[-2:] - for message in messages_to_be_logged: - if message['role'] == 'user': - emoji = "🙋 " - else: - emoji = "🤖 " - - if isinstance(message['content'], list): - text = message['content'][0]['text'] - else: - text = message['content'] - - prev_convo += "\n>>> " + emoji + message['role'] + ": " + text + "\n" - - # modified timestamp - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - - # update metadata - metadata = [{ - "course": course_name, - "conversation": prev_convo, - "conversation_id": conversation_id, - "id": last_id + 1, - "user_email": user_email, - "first_query": first_message, - "created_at": created_at, - "modified_at": current_time - }] - else: - print("conversation_id does not exist") - - # add new data point - user_queries = [] - conversation_string = "" - - first_message = messages[0]['content'] - if isinstance(first_message, list): - first_message = first_message[0]['text'] - user_queries.append(first_message) - - for message in messages: - if message['role'] == 'user': - emoji = "🙋 " - else: - emoji = "🤖 " - - if isinstance(message['content'], list): - text = message['content'][0]['text'] - else: - text = message['content'] - - conversation_string += "\n>>> " + emoji + message['role'] + ": " + text + "\n" - - # modified timestamp - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - - metadata = [{ - "course": course_name, - "conversation": conversation_string, - "conversation_id": conversation_id, - "id": last_id + 1, - "user_email": user_email, - "first_query": first_message, - "created_at": current_time, - "modified_at": current_time - }] - - # create embeddings - embeddings_model = OpenAIEmbeddings(openai_api_type=os.environ['OPENAI_API_TYPE']) - embeddings = embeddings_model.embed_documents(user_queries) - - # add embeddings to the project - create a new function for this - project = atlas.AtlasProject(name=project_name, add_datums_if_exists=True) - with project.wait_for_project_lock(): - project.add_embeddings(embeddings=np.array(embeddings), data=pd.DataFrame(metadata)) - project.rebuild_maps() - - print(f"⏰ Nomic logging runtime: {(time.monotonic() - start_time):.2f} seconds") - return f"Successfully logged for {course_name}" - - except Exception as e: - if str(e) == 'You must specify a unique_id_field when creating a new project.': - print("Attempting to create Nomic map...") - result = self.create_nomic_map(course_name, conversation) - print("result of create_nomic_map():", result) - else: - # raising exception again to trigger backoff and passing parameters to use in create_nomic_map() - raise Exception({"exception": str(e)}) + # @backoff.on_exception(backoff_strategy, + # Exception, + # max_tries=5, + # raise_on_giveup=False, + # giveup=giveup_hdlr, + # on_backoff=backoff_hdlr) + # def log_convo_to_nomic(self, course_name: str, conversation) -> Union[str, None]: + # # nomic.login(os.getenv('NOMIC_API_KEY')) # login during start of flask app + # NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' + # """ + # Logs conversation to Nomic. + # 1. Check if map exists for given course + # 2. Check if conversation ID exists + # - if yes, delete and add new data point + # - if no, add new data point + # 3. Keep current logic for map doesn't exist - update metadata + # """ + + # print(f"in log_convo_to_nomic() for course: {course_name}") + # print("type of conversation:", type(conversation)) + # #conversation = json.loads(conversation) + # messages = conversation['conversation']['messages'] + # if 'user_email' not in conversation['conversation']: + # user_email = "NULL" + # else: + # user_email = conversation['conversation']['user_email'] + # conversation_id = conversation['conversation']['id'] + + # # we have to upload whole conversations + # # check what the fetched data looks like - pandas df or pyarrow table + # # check if conversation ID exists in Nomic, if yes fetch all data from it and delete it. + # # will have current QA and historical QA from Nomic, append new data and add_embeddings() + + # project_name = NOMIC_MAP_NAME_PREFIX + course_name + # start_time = time.monotonic() + # emoji = "" + + # try: + # # fetch project metadata and embbeddings + # project = AtlasProject(name=project_name, add_datums_if_exists=True) + + # map_metadata_df = project.maps[1].data.df # type: ignore + # map_embeddings_df = project.maps[1].embeddings.latent + # # create a function which returns project, data and embeddings df here + # map_metadata_df['id'] = map_metadata_df['id'].astype(int) + # last_id = map_metadata_df['id'].max() + + # if conversation_id in map_metadata_df.values: + # # store that convo metadata locally + # prev_data = map_metadata_df[map_metadata_df['conversation_id'] == conversation_id] + # prev_index = prev_data.index.values[0] + # embeddings = map_embeddings_df[prev_index - 1].reshape(1, 1536) + # prev_convo = prev_data['conversation'].values[0] + # prev_id = prev_data['id'].values[0] + # created_at = pd.to_datetime(prev_data['created_at'].values[0]).strftime('%Y-%m-%d %H:%M:%S') + + # # delete that convo data point from Nomic, and print result + # print("Deleting point from nomic:", project.delete_data([str(prev_id)])) + + # # prep for new point + # first_message = prev_convo.split("\n")[1].split(": ")[1] + + # # select the last 2 messages and append new convo to prev convo + # messages_to_be_logged = messages[-2:] + # for message in messages_to_be_logged: + # if message['role'] == 'user': + # emoji = "🙋 " + # else: + # emoji = "🤖 " + + # if isinstance(message['content'], list): + # text = message['content'][0]['text'] + # else: + # text = message['content'] + + # prev_convo += "\n>>> " + emoji + message['role'] + ": " + text + "\n" + + # # modified timestamp + # current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + # # update metadata + # metadata = [{ + # "course": course_name, + # "conversation": prev_convo, + # "conversation_id": conversation_id, + # "id": last_id + 1, + # "user_email": user_email, + # "first_query": first_message, + # "created_at": created_at, + # "modified_at": current_time + # }] + # else: + # print("conversation_id does not exist") + + # # add new data point + # user_queries = [] + # conversation_string = "" + + # first_message = messages[0]['content'] + # if isinstance(first_message, list): + # first_message = first_message[0]['text'] + # user_queries.append(first_message) + + # for message in messages: + # if message['role'] == 'user': + # emoji = "🙋 " + # else: + # emoji = "🤖 " + + # if isinstance(message['content'], list): + # text = message['content'][0]['text'] + # else: + # text = message['content'] + + # conversation_string += "\n>>> " + emoji + message['role'] + ": " + text + "\n" + + # # modified timestamp + # current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + # metadata = [{ + # "course": course_name, + # "conversation": conversation_string, + # "conversation_id": conversation_id, + # "id": last_id + 1, + # "user_email": user_email, + # "first_query": first_message, + # "created_at": current_time, + # "modified_at": current_time + # }] + + # # create embeddings + # embeddings_model = OpenAIEmbeddings(openai_api_type=os.environ['OPENAI_API_TYPE']) + # embeddings = embeddings_model.embed_documents(user_queries) + + # # add embeddings to the project - create a new function for this + # project = atlas.AtlasProject(name=project_name, add_datums_if_exists=True) + # with project.wait_for_project_lock(): + # project.add_embeddings(embeddings=np.array(embeddings), data=pd.DataFrame(metadata)) + # project.rebuild_maps() + + # print(f"⏰ Nomic logging runtime: {(time.monotonic() - start_time):.2f} seconds") + # return f"Successfully logged for {course_name}" + + # except Exception as e: + # if str(e) == 'You must specify a unique_id_field when creating a new project.': + # print("Attempting to create Nomic map...") + # result = self.create_nomic_map(course_name, conversation) + # print("result of create_nomic_map():", result) + # else: + # # raising exception again to trigger backoff and passing parameters to use in create_nomic_map() + # raise Exception({"exception": str(e)}) def get_nomic_map(self, course_name: str, type: str): """ @@ -252,367 +251,264 @@ def get_nomic_map(self, course_name: str, type: str): self.sentry.capture_exception(e) return {"map_id": None, "map_link": None} - def create_nomic_map(self, course_name: str, log_data: list): + def log_to_conversation_map(self, course_name: str): """ - Creates a Nomic map for new courses and those which previously had < 20 queries. - 1. fetches supabase conversations for course - 2. appends current embeddings and metadata to it - 2. creates map if there are at least 20 queries - """ - nomic.login(os.environ['NOMIC_API_KEY']) # login during start of flask app + This function logs new conversations to existing nomic maps. + 1. Check if nomic map exists + 2. If no, create it + 3. If yes, fetch all conversations since last upload and log it + """ + nomic.login(os.getenv('NOMIC_API_KEY')) NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' - print(f"in create_nomic_map() for {course_name}") - - try: - # fetch all conversations with this new course (we expect <=20 conversations, because otherwise the map should be made already) - - response = self.sql.getAllFromLLMConvoMonitor(course_name) - data = response.data - df = pd.DataFrame(data) + # check if map exists + response = self.sql.getConvoMapFromProjects(course_name) + print("Response from supabase: ", response.data) + + if not response.data[0]['convo_map_id']: + print("Map does not exist for this course. Redirecting to map creation...") + return self.create_conversation_map(course_name) + + project_id = response.data[0]['convo_map_id'] + last_uploaded_convo_id = response.data[0]['last_uploaded_convo_id'] + + # check if project is accepting data + project = AtlasProject(project_id=project_id, add_datums_if_exists=True) + if not project.is_accepting_data: + return "Project is currently indexing and cannot ingest new datums. Try again later." + + # fetch count of conversations since last upload + response = self.sql.getCountFromLLMConvoMonitor(course_name, last_id=last_uploaded_convo_id) + total_convo_count = response.count + print("Total number of unlogged conversations in Supabase: ", total_convo_count) + + if total_convo_count == 0: + return "No new conversations to log." + + first_id = last_uploaded_convo_id + combined_dfs = [] + current_convo_count = 0 + convo_count = 0 + + while current_convo_count < total_convo_count: + response = self.sql.getAllConversationsBetweenIds(course_name, first_id, 0, 100) + print("Response count: ", len(response.data)) + if len(response.data) == 0: + break + df = pd.DataFrame(response.data) + combined_dfs.append(df) + current_convo_count += len(response.data) + convo_count += len(response.data) + print(current_convo_count) + + if convo_count >= 500: + # concat all dfs from the combined_dfs list + final_df = pd.concat(combined_dfs, ignore_index=True) + # prep data for nomic upload + embeddings, metadata = self.data_prep_for_convo_map(final_df) + # append to existing map + print("Appending data to existing map...") + result = self.append_to_map(embeddings, metadata, NOMIC_MAP_NAME_PREFIX + course_name) + if result == "success": + last_id = int(final_df['id'].iloc[-1]) + project_info = {'course_name': course_name, 'convo_map_id': project_id, 'last_uploaded_convo_id': last_id} + project_response = self.sql.updateProjects(course_name, project_info) + print("Update response from supabase: ", project_response) + # reset variables + combined_dfs = [] + convo_count = 0 + print("Records uploaded: ", current_convo_count) + + # set first_id for next iteration + first_id = response.data[-1]['id'] + 1 - if len(data) < 19: - return None - else: - # get all queries for course and create metadata - user_queries = [] - metadata = [] - i = 1 - conversation_exists = False - - # current log details - log_messages = log_data['conversation']['messages'] # type: ignore - log_user_email = log_data['conversation']['user_email'] # type: ignore - log_conversation_id = log_data['conversation']['id'] # type: ignore - - for _index, row in df.iterrows(): - user_email = row['user_email'] - created_at = pd.to_datetime(row['created_at']).strftime('%Y-%m-%d %H:%M:%S') - convo = row['convo'] - messages = convo['messages'] - - first_message = messages[0]['content'] - if isinstance(first_message, list): - first_message = first_message[0]['text'] - - user_queries.append(first_message) - - # create metadata for multi-turn conversation - conversation = "" - for message in messages: - # string of role: content, role: content, ... - if message['role'] == 'user': # type: ignore - emoji = "🙋 " - else: - emoji = "🤖 " - - if isinstance(message['content'], list): - text = message['content'][0]['text'] - else: - text = message['content'] - - conversation += "\n>>> " + emoji + message['role'] + ": " + text + "\n" - - # append current chat to previous chat if convo already exists - if convo['id'] == log_conversation_id: - conversation_exists = True - - for m in log_messages: - if m['role'] == 'user': # type: ignore - emoji = "🙋 " - else: - emoji = "🤖 " + # upload last set of convos + if convo_count > 0: + print("Uploading last set of conversations...") + final_df = pd.concat(combined_dfs, ignore_index=True) + embeddings, metadata = self.data_prep_for_convo_map(final_df) + result = self.append_to_map(embeddings, metadata, NOMIC_MAP_NAME_PREFIX + course_name) + if result == "success": + last_id = int(final_df['id'].iloc[-1]) + project_info = {'course_name': course_name, 'convo_map_id': project_id, 'last_uploaded_convo_id': last_id} + project_response = self.sql.updateProjects(course_name, project_info) + print("Update response from supabase: ", project_response) - if isinstance(m['content'], list): - text = m['content'][0]['text'] - else: - text = m['content'] - conversation += "\n>>> " + emoji + m['role'] + ": " + text + "\n" - - # adding modified timestamp - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - - # add to metadata - metadata_row = { - "course": row['course_name'], - "conversation": conversation, - "conversation_id": convo['id'], - "id": i, - "user_email": user_email, - "first_query": first_message, - "created_at": created_at, - "modified_at": current_time - } - metadata.append(metadata_row) - i += 1 - - # add current log as a new data point if convo doesn't exist - if not conversation_exists: - user_queries.append(log_messages[0]['content']) - conversation = "" - for message in log_messages: - if message['role'] == 'user': - emoji = "🙋 " - else: - emoji = "🤖 " - - if isinstance(message['content'], list): - text = message['content'][0]['text'] - else: - text = message['content'] - conversation += "\n>>> " + emoji + message['role'] + ": " + text + "\n" - - # adding timestamp - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - - metadata_row = { - "course": course_name, - "conversation": conversation, - "conversation_id": log_conversation_id, - "id": i, - "user_email": log_user_email, - "first_query": log_messages[0]['content'], - "created_at": current_time, - "modified_at": current_time - } - metadata.append(metadata_row) - - metadata = pd.DataFrame(metadata) - embeddings_model = OpenAIEmbeddings(openai_api_type=os.environ['OPENAI_API_TYPE']) - embeddings = embeddings_model.embed_documents(user_queries) - - # create Atlas project - project_name = NOMIC_MAP_NAME_PREFIX + course_name - index_name = course_name + "_convo_index" - project = atlas.map_embeddings( - embeddings=np.array(embeddings), - data=metadata, # type: ignore - this is the correct type, the func signature from Nomic is incomplete - id_field='id', - build_topic_model=True, - topic_label_field='first_query', - name=project_name, - colorable_fields=['conversation_id', 'first_query']) - project.create_index(index_name, build_topic_model=True) - return f"Successfully created Nomic map for {course_name}" - except Exception as e: - # Error: ValueError: You must specify a unique_id_field when creating a new project. - if str(e) == 'You must specify a unique_id_field when creating a new project.': # type: ignore - print("Nomic map does not exist yet, probably because you have less than 20 queries on your project: ", e) - else: - print("ERROR in create_nomic_map():", e) - self.sentry.capture_exception(e) + return "success" - return "failed" - ## -------------------------------- DOCUMENT MAP FUNCTIONS --------------------------------- ## - def create_document_map(self, course_name: str): + def create_conversation_map(self, course_name: str): """ - This is a function which creates a document map for a given course from scratch - 1. Gets count of documents for the course - 2. If less than 20, returns a message that a map cannot be created - 3. If greater than 20, iteratively fetches documents in batches of 25 - 4. Prepares metadata and embeddings for nomic upload - 5. Creates a new map and uploads the data - - Args: - course_name: str - Returns: - str: success or failed - """ - print("in create_document_map()") - # nomic.login(os.getenv('NOMIC_API_KEY')) - NOMIC_MAP_NAME_PREFIX = 'Document Map for ' - + This function creates a conversation map for a given course from scratch. + """ + nomic.login(os.getenv('NOMIC_API_KEY')) + NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' try: # check if map exists - - response = self.sql.getProjectsMapForCourse(course_name) + response = self.sql.getConvoMapFromProjects(course_name) + print("Response from supabase: ", response.data) if response.data: - return "Map already exists for this course." + if response.data[0]['convo_map_id']: + return "Map already exists for this course." - # fetch relevant document data from Supabase - response = self.sql.getDocumentsBetweenDates(course_name, '', '', "documents") + # if no, fetch total count of records + response = self.sql.getCountFromLLMConvoMonitor(course_name, last_id=0) + # if <20, return message that map cannot be created if not response.count: - return "No documents found for this course." - - total_doc_count = response.count - print("Total number of documents in Supabase: ", total_doc_count) - - # minimum 20 docs needed to create map - if total_doc_count > 19: - - first_id = response.data[0]['id'] - combined_dfs = [] - curr_total_doc_count = 0 - doc_count = 0 - first_batch = True - - # iteratively query in batches of 25 - while curr_total_doc_count < total_doc_count: - - response = self.sql.getDocsForIdsGte(course_name, first_id, - "id, created_at, s3_path, url, readable_filename, contexts", 25) - - df = pd.DataFrame(response.data) - combined_dfs.append(df) # list of dfs - - curr_total_doc_count += len(response.data) - doc_count += len(response.data) - - if doc_count >= 1000: # upload to Nomic every 1000 docs - - # concat all dfs from the combined_dfs list - final_df = pd.concat(combined_dfs, ignore_index=True) - - # prep data for nomic upload - embeddings, metadata = self.data_prep_for_doc_map(final_df) - - if first_batch: - # create a new map - print("Creating new map...") - project_name = NOMIC_MAP_NAME_PREFIX + course_name - index_name = course_name + "_doc_index" - topic_label_field = "text" - colorable_fields = ["readable_filename", "text"] - result = self.create_map(embeddings, metadata, project_name, index_name, topic_label_field, - colorable_fields) + return "No conversations found for this course." + elif response.count < 20: + return "Cannot create a map because there are less than 20 conversations in the course." + + # if >20, iteratively fetch records in batches of 100 + total_convo_count = response.count + print("Total number of conversations in Supabase: ", total_convo_count) + + first_id = response.data[0]['id'] - 1 + combined_dfs = [] + current_convo_count = 0 + convo_count = 0 + first_batch = True + project_name = NOMIC_MAP_NAME_PREFIX + course_name + + # iteratively query in batches of 50 + while current_convo_count < total_convo_count: + response = self.sql.getAllConversationsBetweenIds(course_name, first_id, 0, 100) + print("Response count: ", len(response.data)) + if len(response.data) == 0: + break + df = pd.DataFrame(response.data) + combined_dfs.append(df) + current_convo_count += len(response.data) + convo_count += len(response.data) + print(current_convo_count) + + if convo_count >= 500: + # concat all dfs from the combined_dfs list + final_df = pd.concat(combined_dfs, ignore_index=True) + # prep data for nomic upload + embeddings, metadata = self.data_prep_for_convo_map(final_df) + + if first_batch: + # create a new map + print("Creating new map...") + index_name = course_name + "_convo_index" + topic_label_field = "first_query" + colorable_fields = ["user_email", "first_query", "conversation_id", "created_at"] + result = self.create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) + + if result == "success": # update flag first_batch = False - - else: - # append to existing map - print("Appending data to existing map...") - project_name = NOMIC_MAP_NAME_PREFIX + course_name - # add project lock logic here - result = self.append_to_map(embeddings, metadata, project_name) - - # reset variables - combined_dfs = [] - doc_count = 0 - - # set first_id for next iteration - first_id = response.data[-1]['id'] + 1 - - # upload last set of docs + # log project info to supabase + project = AtlasProject(name=project_name, add_datums_if_exists=True) + project_id = project.id + last_id = int(final_df['id'].iloc[-1]) + project_info = {'course_name': course_name, 'convo_map_id': project_id, 'last_uploaded_convo_id': last_id} + # if entry already exists, update it + projects_record = self.sql.getConvoMapFromProjects(course_name) + if projects_record.data: + project_response = self.sql.updateProjects(course_name, project_info) + else: + project_response = self.sql.insertProjectInfo(project_info) + print("Update response from supabase: ", project_response) + else: + # append to existing map + print("Appending data to existing map...") + project = AtlasProject(name=project_name, add_datums_if_exists=True) + result = self.append_to_map(embeddings, metadata, project_name) + if result == "success": + print("map append successful") + last_id = int(final_df['id'].iloc[-1]) + project_info = {'last_uploaded_convo_id': last_id} + project_response = self.sql.updateProjects(course_name, project_info) + print("Update response from supabase: ", project_response) + + # reset variables + combined_dfs = [] + convo_count = 0 + print("Records uploaded: ", current_convo_count) + + # set first_id for next iteration + try: + print("response: ", response.data[-1]['id']) + except: + print("response: ", response.data) + first_id = response.data[-1]['id'] + 1 + + print("Convo count: ", convo_count) + # upload last set of convos + if convo_count > 0: + print("Uploading last set of conversations...") final_df = pd.concat(combined_dfs, ignore_index=True) - embeddings, metadata = self.data_prep_for_doc_map(final_df) - project_name = NOMIC_MAP_NAME_PREFIX + course_name + embeddings, metadata = self.data_prep_for_convo_map(final_df) if first_batch: - index_name = course_name + "_doc_index" - topic_label_field = "text" - colorable_fields = ["readable_filename", "text"] + # create map + index_name = course_name + "_convo_index" + topic_label_field = "first_query" + colorable_fields = ["user_email", "first_query", "conversation_id", "created_at"] result = self.create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) + else: + # append to map + print("in map append") result = self.append_to_map(embeddings, metadata, project_name) - print("Atlas upload status: ", result) - - # log info to supabase - project = AtlasProject(name=project_name, add_datums_if_exists=True) - project_id = project.id - project.rebuild_maps() - project_info = {'course_name': course_name, 'doc_map_id': project_id} - response = self.sql.insertProjectInfo(project_info) - print("Response from supabase: ", response) - return "success" - else: - return "Cannot create a map because there are less than 20 documents in the course." + + if result == "success": + print("last map append successful") + last_id = int(final_df['id'].iloc[-1]) + project = AtlasProject(name=project_name, add_datums_if_exists=True) + project_id = project.id + project_info = {'course_name': course_name, 'convo_map_id': project_id, 'last_uploaded_convo_id': last_id} + print("Project info: ", project_info) + # if entry already exists, update it + projects_record = self.sql.getConvoMapFromProjects(course_name) + if projects_record.data: + project_response = self.sql.updateProjects(course_name, project_info) + else: + project_response = self.sql.insertProjectInfo(project_info) + print("Response from supabase: ", project_response) + + # rebuild the map + self.rebuild_map(course_name, "conversation") + return "success" except Exception as e: print(e) self.sentry.capture_exception(e) - return "failed" + return "Error in creating conversation map:" + str(e) - def delete_from_document_map(self, project_id: str, ids: list): + + + + + ## -------------------------------- SUPPLEMENTARY MAP FUNCTIONS --------------------------------- ## + + def rebuild_map(self, course_name:str, map_type:str): """ - This function is used to delete datapoints from a document map. - Currently used within the delete_data() function in vector_database.py - Args: - course_name: str - ids: list of str - """ - print("in delete_from_document_map()") + This function rebuilds a given map in Nomic. + """ + print("in rebuild_map()") + nomic.login(os.getenv('NOMIC_API_KEY')) + + if map_type.lower() == 'document': + NOMIC_MAP_NAME_PREFIX = 'Document Map for ' + else: + NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' try: # fetch project from Nomic - project = AtlasProject(project_id=project_id, add_datums_if_exists=True) + project_name = NOMIC_MAP_NAME_PREFIX + course_name + project = AtlasProject(name=project_name, add_datums_if_exists=True) - # delete the ids from Nomic - print("Deleting point from document map:", project.delete_data(ids)) - with project.wait_for_project_lock(): + if project.is_accepting_data: project.rebuild_maps() - return "Successfully deleted from Nomic map" + return "success" except Exception as e: print(e) self.sentry.capture_exception(e) - return "Error in deleting from document map: {e}" - - # If this needs to be uncommented, make sure to move the supabase call to the respective service - # def log_to_document_map(self, data: dict): - # """ - # This is a function which appends new documents to an existing document map. It's called - # at the end of split_and_upload() after inserting data to Supabase. - # Args: - # data: dict - the response data from Supabase insertion - # """ - # print("in add_to_document_map()") - - # try: - # # check if map exists - # course_name = data['course_name'] - # response = SUPABASE_CLIENT.table("projects").select("doc_map_id").eq("course_name", course_name).execute() - # if response.data: - # project_id = response.data[0]['doc_map_id'] - # else: - # # create a map - # map_creation_result = self.create_document_map(course_name) - # if map_creation_result != "success": - # return "The project has less than 20 documents and a map cannot be created." - # else: - # # fetch project id - # response = SUPABASE_CLIENT.table("projects").select("doc_map_id").eq("course_name", course_name).execute() - # project_id = response.data[0]['doc_map_id'] - - # project = AtlasProject(project_id=project_id, add_datums_if_exists=True) - # #print("Inserted data: ", data) - - # embeddings = [] - # metadata = [] - # context_count = 0 - # # prep data for nomic upload - # for row in data['contexts']: - # context_count += 1 - # embeddings.append(row['embedding']) - # metadata.append({ - # "id": str(data['id']) + "_" + str(context_count), - # "doc_ingested_at": data['created_at'], - # "s3_path": data['s3_path'], - # "url": data['url'], - # "readable_filename": data['readable_filename'], - # "created_at": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - # "text": row['text'] - # }) - # embeddings = np.array(embeddings) - # metadata = pd.DataFrame(metadata) - # print("Shape of embeddings: ", embeddings.shape) - - # # append to existing map - # project_name = "Document Map for " + course_name - # result = self.append_to_map(embeddings, metadata, project_name) - - # # check if project is accepting new datums - # if project.is_accepting_data: - # with project.wait_for_project_lock(): - # project.rebuild_maps() - - # # with project.wait_for_project_lock(): - # # project.rebuild_maps() - # return result - - # except Exception as e: - # print(e) - # self.sentry.capture_exception(e) - # return "Error in appending to map: {e}" + return "Error in rebuilding map: {e}" + def create_map(self, embeddings, metadata, map_name, index_name, topic_label_field, colorable_fields): """ @@ -626,7 +522,7 @@ def create_map(self, embeddings, metadata, map_name, index_name, topic_label_fie colorable_fields: list of str """ nomic.login(os.environ['NOMIC_API_KEY']) - + print("in create_map()") try: project = atlas.map_embeddings(embeddings=embeddings, data=metadata, @@ -655,67 +551,115 @@ def append_to_map(self, embeddings, metadata, map_name): project = atlas.AtlasProject(name=map_name, add_datums_if_exists=True) with project.wait_for_project_lock(): project.add_embeddings(embeddings=embeddings, data=metadata) - return "Successfully appended to Nomic map" + return "success" except Exception as e: print(e) return "Error in appending to map: {e}" + - def data_prep_for_doc_map(self, df: pd.DataFrame): + def data_prep_for_convo_map(self, df: pd.DataFrame): """ - This function prepares embeddings and metadata for nomic upload in document map creation. + This function prepares embeddings and metadata for nomic upload in conversation map creation. Args: df: pd.DataFrame - the dataframe of documents from Supabase Returns: embeddings: np.array of embeddings metadata: pd.DataFrame of metadata """ - print("in data_prep_for_doc_map()") - + print("in data_prep_for_convo_map()") + metadata = [] embeddings = [] - texts = [] + user_queries = [] for _index, row in df.iterrows(): - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - if row['url'] is None: - row['url'] = "" - # iterate through all contexts and create separate entries for each - context_count = 0 - for context in row['contexts']: - context_count += 1 - text_row = context['text'] - embeddings_row = context['embedding'] - - meta_row = { - "id": str(row['id']) + "_" + str(context_count), - "doc_ingested_at": row['created_at'], - "s3_path": row['s3_path'], - "url": row['url'], - "readable_filename": row['readable_filename'], - "created_at": current_time, - "text": text_row - } - - embeddings.append(embeddings_row) - metadata.append(meta_row) - texts.append(text_row) - - embeddings_np = np.array(embeddings, dtype=object) - print("Shape of embeddings: ", embeddings_np.shape) - - # check dimension if embeddings_np is (n, 1536) - if len(embeddings_np.shape) < 2: - print("Creating new embeddings...") - # embeddings_model = OpenAIEmbeddings(openai_api_type=OPENAI_API_TYPE, - # openai_api_base=os.getenv('AZURE_OPENAI_BASE'), - # openai_api_key=os.getenv('AZURE_OPENAI_KEY')) # type: ignore - embeddings_model = OpenAIEmbeddings(openai_api_type="openai", - openai_api_base="https://api.openai.com/v1/", - openai_api_key=os.environ['VLADS_OPENAI_KEY']) - embeddings = embeddings_model.embed_documents(texts) + created_at = datetime.datetime.strptime(row['created_at'], "%Y-%m-%dT%H:%M:%S.%f%z").strftime("%Y-%m-%d %H:%M:%S") + conversation_exists = False + conversation = "" + emoji = "" + + if row['user_email'] is None: + user_email = "" + else: + user_email = row['user_email'] + + messages = row['convo']['messages'] + + # some conversations include images, so the data structure is different + if isinstance(messages[0]['content'], list): + if 'text' in messages[0]['content'][0]: + first_message = messages[0]['content'][0]['text'] + #print("First message:", first_message) + else: + first_message = messages[0]['content'] + user_queries.append(first_message) + + # construct metadata for multi-turn conversation + for message in messages: + if message['role'] == 'user': + emoji = "🙋 " + else: + emoji = "🤖 " + if isinstance(message['content'], list): + + if 'text' in message['content'][0]: + text = message['content'][0]['text'] + else: + text = message['content'] + + conversation += "\n>>> " + emoji + message['role'] + ": " + text + "\n" + + meta_row = { + "course": row['course_name'], + "conversation": conversation, + "conversation_id": row['convo']['id'], + "id": row['id'], + "user_email": user_email, + "first_query": first_message, + "created_at": created_at, + "modified_at": current_time + } + #print("Metadata row:", meta_row) + metadata.append(meta_row) + + embeddings_model = OpenAIEmbeddings(openai_api_type="openai", + openai_api_base="https://api.openai.com/v1/", + openai_api_key=os.environ['VLADS_OPENAI_KEY']) + embeddings = embeddings_model.embed_documents(user_queries) + metadata = pd.DataFrame(metadata) embeddings = np.array(embeddings) - + print("Metadata shape:", metadata.shape) + print("Embeddings shape:", embeddings.shape) return embeddings, metadata + + # except Exception as e: + # print("Error in data_prep_for_convo_map():", e) + # self.sentry.capture_exception(e) + # return None, None + + def delete_from_document_map(self, project_id: str, ids: list): + """ + This function is used to delete datapoints from a document map. + Currently used within the delete_data() function in vector_database.py + Args: + course_name: str + ids: list of str + """ + print("in delete_from_document_map()") + + try: + # fetch project from Nomic + project = AtlasProject(project_id=project_id, add_datums_if_exists=True) + + # delete the ids from Nomic + print("Deleting point from document map:", project.delete_data(ids)) + with project.wait_for_project_lock(): + project.rebuild_maps() + return "Successfully deleted from Nomic map" + except Exception as e: + print(e) + self.sentry.capture_exception(e) + return "Error in deleting from document map: {e}" \ No newline at end of file