diff --git a/ai_ta_backend/beam/ingest.py b/ai_ta_backend/beam/ingest.py index 8ac8d06d..4a6823b8 100644 --- a/ai_ta_backend/beam/ingest.py +++ b/ai_ta_backend/beam/ingest.py @@ -4,6 +4,7 @@ """ import asyncio import inspect +import json import logging import mimetypes import os @@ -13,7 +14,7 @@ import uuid from pathlib import Path from tempfile import NamedTemporaryFile -from typing import Any, Callable, Dict, List, Union +from typing import Any, Callable, Dict, List, Optional, Union import beam import boto3 @@ -39,19 +40,15 @@ from langchain.schema import Document from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.vectorstores import Qdrant -from ai_ta_backend.beam.OpenaiEmbeddings import OpenAIAPIProcessor + +from nomic_logging import delete_from_document_map, log_to_document_map, rebuild_map +from OpenaiEmbeddings import OpenAIAPIProcessor from PIL import Image from posthog import Posthog from pydub import AudioSegment from qdrant_client import QdrantClient, models from qdrant_client.models import PointStruct -from ai_ta_backend.beam.nomic_logging import ( - delete_from_document_map, - log_to_document_map, - rebuild_map -) - # from langchain.schema.output_parser import StrOutputParser # from langchain.chat_models import AzureChatOpenAI @@ -149,7 +146,14 @@ def loader(): # Triggers determine how your app is deployed -@app.rest_api(workers=2, max_pending_tasks=15_000, max_retries=3, timeout=-1, loader=loader, autoscaler=autoscaler) +@app.rest_api( + workers=4, + # callback_url='https://uiuc-chat-git-refactoringesttobeamserverless-kastanday.vercel.app/api/UIUC-api/ingestCallback', + max_pending_tasks=15_000, + max_retries=3, + timeout=-1, + loader=loader, + autoscaler=autoscaler) def ingest(**inputs: Dict[str, Any]): qdrant_client, vectorstore, s3_client, supabase_client, posthog = inputs["context"] @@ -159,8 +163,7 @@ def ingest(**inputs: Dict[str, Any]): url: List[str] | str | None = inputs.get('url', None) base_url: List[str] | str | None = inputs.get('base_url', None) readable_filename: List[str] | str = inputs.get('readable_filename', '') - content: str | None = inputs.get('content', None) # is webtext - # is_webtext: bool | None = inputs.get('url', False) + content: str | None = inputs.get('content', None) # is webtext if content exists print( f"In top of /ingest route. course: {course_name}, s3paths: {s3_paths}, readable_filename: {readable_filename}, base_url: {base_url}, url: {url}, content: {content}" @@ -182,7 +185,8 @@ def ingest(**inputs: Dict[str, Any]): # rebuild nomic document map after all ingests are done rebuild_status = rebuild_map(course_name, map_type='document') - return success_fail_dict + + return json.dumps(success_fail_dict) class Ingest(): @@ -247,7 +251,7 @@ def _ingest_single(ingest_method: Callable, s3_path, *args, **kwargs): with NamedTemporaryFile(suffix=file_extension) as tmpfile: self.s3_client.download_fileobj(Bucket=os.environ['S3_BUCKET_NAME'], Key=s3_path, Fileobj=tmpfile) mime_type = str(mimetypes.guess_type(tmpfile.name, strict=False)[0]) - mime_category, mime_subcategory = mime_type.split('/') + mime_category = mime_type.split('/')[0] if '/' in mime_type else mime_type if file_extension in file_ingest_methods: # Use specialized functions when possible, fallback to mimetype. Else raise error. @@ -263,7 +267,7 @@ def _ingest_single(ingest_method: Callable, s3_path, *args, **kwargs): try: self._ingest_single_txt(s3_path, course_name) success_status['success_ingest'].append(s3_path) - print("✅ FALLBACK TO UTF-8 INGEST WAS SUCCESSFUL :) ") + print(f"No ingest methods -- Falling back to UTF-8 INGEST... s3_path = {s3_path}") except Exception as e: print( f"We don't have a ingest method for this filetype: {file_extension}. As a last-ditch effort, we tried to ingest the file as utf-8 text, but that failed too. File is unsupported: {s3_path}. UTF-8 ingest error: {e}" @@ -273,7 +277,7 @@ def _ingest_single(ingest_method: Callable, s3_path, *args, **kwargs): ) self.posthog.capture( 'distinct_id_of_the_user', - event='Ingest Failure', + event='ingest_failure', properties={ 'course_name': course_name, @@ -291,7 +295,7 @@ def _ingest_single(ingest_method: Callable, s3_path, *args, **kwargs): success_status['failure_ingest'].append(f"MAJOR ERROR IN /bulk_ingest: Error: {err}") self.posthog.capture('distinct_id_of_the_user', - event='Ingest Failure', + event='ingest_failure', properties={ 'course_name': course_name, 's3_path': s3_paths, @@ -753,13 +757,12 @@ def _ingest_single_txt(self, s3_path: str, course_name: str, **kwargs) -> str: Returns: str: "Success" or an error message """ - print("In text ingest") + print("In text ingest, UTF-8") try: # NOTE: slightly different method for .txt files, no need for download. It's part of the 'body' response = self.s3_client.get_object(Bucket=os.environ['S3_BUCKET_NAME'], Key=s3_path) - print("s3 Resonse:", response) text = response['Body'].read().decode('utf-8') - print("Text from s3:", text) + print("UTF-8 text to ignest (from s3)", text) text = [text] metadatas: List[Dict[str, Any]] = [{ diff --git a/ai_ta_backend/beam/nomic_logging.py b/ai_ta_backend/beam/nomic_logging.py index e8189e10..ce8235a2 100644 --- a/ai_ta_backend/beam/nomic_logging.py +++ b/ai_ta_backend/beam/nomic_logging.py @@ -1,27 +1,29 @@ import datetime +import json import os import time +import backoff import nomic import numpy as np import pandas as pd +import sentry_sdk import supabase from langchain.embeddings import OpenAIEmbeddings from nomic import AtlasProject, atlas -import sentry_sdk -import backoff -import json OPENAI_API_TYPE = "azure" - SUPABASE_CLIENT = supabase.create_client( # type: ignore - supabase_url=os.getenv('SUPABASE_URL'), # type: ignore - supabase_key=os.getenv('SUPABASE_API_KEY')) # type: ignore + supabase_url=os.getenv('SUPABASE_URL'), # type: ignore + supabase_key=os.getenv('SUPABASE_API_KEY')) # type: ignore + +LOCK_EXCEPTIONS = [ + 'Project is locked for state access! Please wait until the project is unlocked to access embeddings.', + 'Project is locked for state access! Please wait until the project is unlocked to access data.', + 'Project is currently indexing and cannot ingest new datums. Try again later.' +] -LOCK_EXCEPTIONS = ['Project is locked for state access! Please wait until the project is unlocked to access embeddings.', - 'Project is locked for state access! Please wait until the project is unlocked to access data.', - 'Project is currently indexing and cannot ingest new datums. Try again later.'] def giveup_hdlr(e): """ @@ -212,9 +214,9 @@ def log_convo_to_nomic(course_name: str, conversation) -> str: # raising exception again to trigger backoff and passing parameters to use in create_nomic_map() raise Exception({"exception": str(e)}) - -def get_nomic_map(course_name: str, type: str): + +def get_nomic_map(course_name: str, type: str): """ Returns the variables necessary to construct an iframe of the Nomic map given a course name. We just need the ID and URL. @@ -402,6 +404,8 @@ def create_nomic_map(course_name: str, log_data: list): return "failed" + + ## -------------------------------- DOCUMENT MAP FUNCTIONS --------------------------------- ## def create_document_map(course_name: str): @@ -426,15 +430,18 @@ def create_document_map(course_name: str): supabase_client = supabase.create_client( # type: ignore supabase_url=os.getenv('SUPABASE_URL'), # type: ignore supabase_key=os.getenv('SUPABASE_API_KEY')) # type: ignore - + try: # check if map exists response = supabase_client.table("projects").select("doc_map_id").eq("course_name", course_name).execute() if response.data: return "Map already exists for this course." - + # fetch relevant document data from Supabase - response = supabase_client.table("documents").select("id", count="exact").eq("course_name", course_name).order('id', desc=False).execute() + response = supabase_client.table("documents").select("id", + count="exact").eq("course_name", + course_name).order('id', + desc=False).execute() if not response.count: return "No documents found for this course." @@ -442,27 +449,29 @@ def create_document_map(course_name: str): print("Total number of documents in Supabase: ", total_doc_count) # minimum 20 docs needed to create map - if total_doc_count > 19: - + + if total_doc_count > 19: first_id = response.data[0]['id'] combined_dfs = [] curr_total_doc_count = 0 doc_count = 0 first_batch = True - + # iteratively query in batches of 25 while curr_total_doc_count < total_doc_count: - response = supabase_client.table("documents").select("id, created_at, s3_path, url, readable_filename, contexts").eq("course_name", course_name).gte( - 'id', first_id).order('id', desc=False).limit(25).execute() + response = supabase_client.table("documents").select( + "id, created_at, s3_path, url, readable_filename, contexts").eq("course_name", course_name).gte( + 'id', first_id).order('id', desc=False).limit(25).execute() df = pd.DataFrame(response.data) - combined_dfs.append(df) # list of dfs + combined_dfs.append(df) # list of dfs curr_total_doc_count += len(response.data) doc_count += len(response.data) - if doc_count >= 1000: # upload to Nomic every 1000 docs - + + if doc_count >= 1000: # upload to Nomic every 1000 docs + # concat all dfs from the combined_dfs list final_df = pd.concat(combined_dfs, ignore_index=True) @@ -486,14 +495,16 @@ def create_document_map(course_name: str): project_name = NOMIC_MAP_NAME_PREFIX + course_name # add project lock logic here result = append_to_map(embeddings, metadata, project_name) - + + # reset variables combined_dfs = [] doc_count = 0 # set first_id for next iteration first_id = response.data[-1]['id'] + 1 - + + # upload last set of docs final_df = pd.concat(combined_dfs, ignore_index=True) embeddings, metadata = data_prep_for_doc_map(final_df) @@ -520,8 +531,9 @@ def create_document_map(course_name: str): except Exception as e: print(e) sentry_sdk.capture_exception(e) - return "failed" - + + return "failed" + def delete_from_document_map(course_name: str, ids: list): """ @@ -532,7 +544,7 @@ def delete_from_document_map(course_name: str, ids: list): ids: list of str """ print("in delete_from_document_map()") - + try: # check if project exists response = SUPABASE_CLIENT.table("projects").select("doc_map_id").eq("course_name", course_name).execute() @@ -540,7 +552,7 @@ def delete_from_document_map(course_name: str, ids: list): project_id = response.data[0]['doc_map_id'] else: return "No document map found for this course" - + # fetch project from Nomic project = AtlasProject(project_id=project_id, add_datums_if_exists=True) @@ -554,6 +566,7 @@ def delete_from_document_map(course_name: str, ids: list): sentry_sdk.capture_exception(e) return "Error in deleting from document map: {e}" + def log_to_document_map(data: dict): """ This is a function which appends new documents to an existing document map. It's called @@ -562,7 +575,7 @@ def log_to_document_map(data: dict): data: dict - the response data from Supabase insertion """ print("in add_to_document_map()") - + try: # check if map exists course_name = data['course_name'] @@ -579,10 +592,9 @@ def log_to_document_map(data: dict): response = SUPABASE_CLIENT.table("projects").select("doc_map_id").eq("course_name", course_name).execute() project_id = response.data[0]['doc_map_id'] - project = AtlasProject(project_id=project_id, add_datums_if_exists=True) - #print("Inserted data: ", data) - + #print("Inserted data: ", data) + embeddings = [] metadata = [] context_count = 0 @@ -591,13 +603,13 @@ def log_to_document_map(data: dict): context_count += 1 embeddings.append(row['embedding']) metadata.append({ - "id": str(data['id']) + "_" + str(context_count), - "doc_ingested_at": data['created_at'], - "s3_path": data['s3_path'], - "url": data['url'], - "readable_filename": data['readable_filename'], - "created_at": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "text": row['text'] + "id": str(data['id']) + "_" + str(context_count), + "doc_ingested_at": data['created_at'], + "s3_path": data['s3_path'], + "url": data['url'], + "readable_filename": data['readable_filename'], + "created_at": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "text": row['text'] }) embeddings = np.array(embeddings) metadata = pd.DataFrame(metadata) @@ -606,13 +618,14 @@ def log_to_document_map(data: dict): # append to existing map project_name = "Document Map for " + course_name result = append_to_map(embeddings, metadata, project_name) + return result except Exception as e: print(e) sentry_sdk.capture_exception(e) return "Error in appending to map: {e}" - + def create_map(embeddings, metadata, map_name, index_name, topic_label_field, colorable_fields): """ @@ -626,25 +639,21 @@ def create_map(embeddings, metadata, map_name, index_name, topic_label_field, co colorable_fields: list of str """ nomic.login(os.getenv('NOMIC_API_KEY')) - try: - project = atlas.map_embeddings( - embeddings=embeddings, - data=metadata, - id_field="id", - build_topic_model=True, - name=map_name, - topic_label_field=topic_label_field, - colorable_fields=colorable_fields, - add_datums_if_exists=True - ) + project = atlas.map_embeddings(embeddings=embeddings, + data=metadata, + id_field="id", + build_topic_model=True, + name=map_name, + topic_label_field=topic_label_field, + colorable_fields=colorable_fields, + add_datums_if_exists=True) project.create_index(index_name, build_topic_model=True) return "success" except Exception as e: print(e) return "Error in creating map: {e}" - - + def append_to_map(embeddings, metadata, map_name): """ Generic function to append new data to an existing Nomic map. @@ -653,7 +662,8 @@ def append_to_map(embeddings, metadata, map_name): metadata: pd.DataFrame of Nomic upload metadata map_name: str """ - nomic.login(os.getenv('NOMIC_API_KEY')) + + nomic.login(os.getenv('NOMIC_API_KEY')) try: project = atlas.AtlasProject(name=map_name, add_datums_if_exists=True) with project.wait_for_project_lock(): @@ -662,7 +672,7 @@ def append_to_map(embeddings, metadata, map_name): except Exception as e: print(e) return "Error in appending to map: {e}" - + def data_prep_for_doc_map(df: pd.DataFrame): """ @@ -677,11 +687,11 @@ def data_prep_for_doc_map(df: pd.DataFrame): metadata = [] embeddings = [] - texts = [] + + texts = [] for index, row in df.iterrows(): - - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") if row['url'] == None: row['url'] = "" # iterate through all contexts and create separate entries for each @@ -690,17 +700,17 @@ def data_prep_for_doc_map(df: pd.DataFrame): context_count += 1 text_row = context['text'] embeddings_row = context['embedding'] - + meta_row = { - "id": str(row['id']) + "_" + str(context_count), - "doc_ingested_at": row['created_at'], - "s3_path": row['s3_path'], - "url": row['url'], - "readable_filename": row['readable_filename'], - "created_at": current_time, - "text": text_row + "id": str(row['id']) + "_" + str(context_count), + "doc_ingested_at": row['created_at'], + "s3_path": row['s3_path'], + "url": row['url'], + "readable_filename": row['readable_filename'], + "created_at": current_time, + "text": text_row } - + embeddings.append(embeddings_row) metadata.append(meta_row) texts.append(text_row) @@ -711,6 +721,7 @@ def data_prep_for_doc_map(df: pd.DataFrame): # check dimension if embeddings_np is (n, 1536) if len(embeddings_np.shape) < 2: print("Creating new embeddings...") + embeddings_model = OpenAIEmbeddings(openai_api_type="openai", openai_api_base="https://api.openai.com/v1/", openai_api_key=os.getenv('VLADS_OPENAI_KEY')) # type: ignore @@ -748,4 +759,5 @@ def rebuild_map(course_name:str, map_type:str): if __name__ == '__main__': - pass \ No newline at end of file + pass + diff --git a/ai_ta_backend/service/retrieval_service.py b/ai_ta_backend/service/retrieval_service.py index 3d73a82b..af425218 100644 --- a/ai_ta_backend/service/retrieval_service.py +++ b/ai_ta_backend/service/retrieval_service.py @@ -315,7 +315,7 @@ def format_for_json_mqr(self, found_docs) -> List[Dict]: def delete_from_nomic_and_supabase(self, course_name: str, identifier_key: str, identifier_value: str): try: - print(f"Deleting from Nomic and Supabase for {course_name} using {identifier_key}: {identifier_value}") + print(f"Nomic delete. Course: {course_name} using {identifier_key}: {identifier_value}") response = self.sqlDb.getMaterialsForCourseAndKeyAndValue(course_name, identifier_key, identifier_value) if not response.data: raise Exception(f"No materials found for {course_name} using {identifier_key}: {identifier_value}") @@ -323,18 +323,20 @@ def delete_from_nomic_and_supabase(self, course_name: str, identifier_key: str, nomic_ids_to_delete = [str(data['id']) + "_" + str(i) for i in range(1, len(data['contexts']) + 1)] # delete from Nomic - # check if project exists response = self.sqlDb.getProjectsMapForCourse(course_name) if not response.data: raise Exception(f"No document map found for this course: {course_name}") project_id = response.data[0]['doc_map_id'] self.nomicService.delete_from_document_map(project_id, nomic_ids_to_delete) + except Exception as e: + print(f"Nomic Error in deleting. {identifier_key}: {identifier_value}", e) + self.sentry.capture_exception(e) - # delete from Supabase - print(f"Deleting from Supabase for {course_name} using {identifier_key}: {identifier_value}") + try: + print(f"Supabase Delete. course: {course_name} using {identifier_key}: {identifier_value}") response = self.sqlDb.deleteMaterialsForCourseAndKeyAndValue(course_name, identifier_key, identifier_value) except Exception as e: - print(f"Error in deleting file from Nomic or Supabase using {identifier_key}: {identifier_value}", e) + print(f"Supabase Error in delete. {identifier_key}: {identifier_value}", e) self.sentry.capture_exception(e) def vector_search(self, search_query, course_name):