diff --git a/ai_ta_backend/aws.py b/ai_ta_backend/aws.py index 53e4c8c2..b14bb779 100644 --- a/ai_ta_backend/aws.py +++ b/ai_ta_backend/aws.py @@ -2,7 +2,7 @@ from multiprocessing import Lock, cpu_count from multiprocessing.pool import ThreadPool from typing import List, Optional - +import uuid import boto3 @@ -38,7 +38,12 @@ def upload_data_files_to_s3(course_name: str, localdir: str) -> Optional[List[st s3_paths_lock = Lock() def upload(myfile): - s3_file = f"courses/{course_name}/{os.path.basename(myfile)}" + # get the last part of the path and append unique ID before it + directory, old_filename = os.path.split(myfile) + new_filename = str(uuid.uuid4()) + '-' + old_filename + new_filepath = os.path.join(directory, new_filename) + + s3_file = f"courses/{course_name}/{os.path.basename(new_filepath)}" s3.upload_file(myfile, os.getenv('S3_BUCKET_NAME'), s3_file) with s3_paths_lock: s3_paths.append(s3_file) diff --git a/ai_ta_backend/vector_database.py b/ai_ta_backend/vector_database.py index d94e06a2..6ed65da9 100644 --- a/ai_ta_backend/vector_database.py +++ b/ai_ta_backend/vector_database.py @@ -8,6 +8,7 @@ import time import traceback import uuid +import re from importlib import metadata from pathlib import Path from tempfile import NamedTemporaryFile @@ -167,7 +168,7 @@ def _ingest_single_py(self, s3_path: str, course_name: str, **kwargs): metadatas: List[Dict[str, Any]] = [{ 'course_name': course_name, 's3_path': s3_path, - 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name), + 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name[37:]), 'pagenumber': '', 'timestamp': '', 'url': '', @@ -177,6 +178,7 @@ def _ingest_single_py(self, s3_path: str, course_name: str, **kwargs): os.remove(file_path) success_or_failure = self.split_and_upload(texts=texts, metadatas=metadatas) + print("Python ingest: ", success_or_failure) return success_or_failure except Exception as e: @@ -199,7 +201,7 @@ def _ingest_single_vtt(self, s3_path: str, course_name: str, **kwargs): metadatas: List[Dict[str, Any]] = [{ 'course_name': course_name, 's3_path': s3_path, - 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name), + 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name[37:]), 'pagenumber': '', 'timestamp': '', 'url': '', @@ -225,6 +227,7 @@ def _ingest_html(self, s3_path: str, course_name: str, **kwargs) -> str: title = title.replace("_", " ") title = title.replace("/", " ") title = title.strip() + title = title[37:] # removing the uuid prefix text = [soup.get_text()] metadata: List[Dict[str, Any]] = [{ @@ -306,7 +309,7 @@ def _ingest_single_video(self, s3_path: str, course_name: str, **kwargs) -> str: metadatas: List[Dict[str, Any]] = [{ 'course_name': course_name, 's3_path': s3_path, - 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name), + 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name[37:]), 'pagenumber': '', 'timestamp': text.index(txt), 'url': '', @@ -332,7 +335,7 @@ def _ingest_single_docx(self, s3_path: str, course_name: str, **kwargs) -> str: metadatas: List[Dict[str, Any]] = [{ 'course_name': course_name, 's3_path': s3_path, - 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name), + 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name[37:]), 'pagenumber': '', 'timestamp': '', 'url': '', @@ -359,7 +362,7 @@ def _ingest_single_srt(self, s3_path: str, course_name: str, **kwargs) -> str: metadatas: List[Dict[str, Any]] = [{ 'course_name': course_name, 's3_path': s3_path, - 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name), + 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name[37:]), 'pagenumber': '', 'timestamp': '', 'url': '', @@ -387,7 +390,7 @@ def _ingest_single_excel(self, s3_path: str, course_name: str, **kwargs) -> str: metadatas: List[Dict[str, Any]] = [{ 'course_name': course_name, 's3_path': s3_path, - 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name), + 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name[37:]), 'pagenumber': '', 'timestamp': '', 'url': '', @@ -422,7 +425,7 @@ def _ingest_single_image(self, s3_path: str, course_name: str, **kwargs) -> str: metadatas: List[Dict[str, Any]] = [{ 'course_name': course_name, 's3_path': s3_path, - 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name), + 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name[37:]), 'pagenumber': '', 'timestamp': '', 'url': '', @@ -449,7 +452,7 @@ def _ingest_single_csv(self, s3_path: str, course_name: str, **kwargs) -> str: metadatas: List[Dict[str, Any]] = [{ 'course_name': course_name, 's3_path': s3_path, - 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name), + 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name[37:]), 'pagenumber': '', 'timestamp': '', 'url': '', @@ -500,7 +503,7 @@ def _ingest_single_pdf(self, s3_path: str, course_name: str, **kwargs): # Extract text text = page.get_text().encode("utf8").decode("utf8", errors='ignore') # get plain text (is in UTF-8) - pdf_pages_OCRed.append(dict(text=text, page_number=i, readable_filename=Path(s3_path).name)) + pdf_pages_OCRed.append(dict(text=text, page_number=i, readable_filename=Path(s3_path).name[37:])) metadatas: List[Dict[str, Any]] = [ { @@ -515,10 +518,10 @@ def _ingest_single_pdf(self, s3_path: str, course_name: str, **kwargs): ] pdf_texts = [page['text'] for page in pdf_pages_OCRed] - self.split_and_upload(texts=pdf_texts, metadatas=metadatas) - print("Success pdf ingest") + success_or_failure = self.split_and_upload(texts=pdf_texts, metadatas=metadatas) + return success_or_failure except Exception as e: - err = f"❌❌ Error in (PDF ingest): `{inspect.currentframe().f_code.co_name}`: {e}\nTraceback:\n", traceback.format_exc() + err = f"❌❌ Error in (PDF ingest): `{inspect.currentframe().f_code.co_name}`: {e}\nTraceback:\n", traceback.format_exc() # type: ignore print(err) return err return "Success" @@ -543,7 +546,7 @@ def _ingest_single_txt(self, s3_path: str, course_name: str, **kwargs) -> str: metadatas: List[Dict[str, Any]] = [{ 'course_name': course_name, 's3_path': s3_path, - 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name), + 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name[37:]), 'pagenumber': '', 'timestamp': '', 'url': '', @@ -575,7 +578,7 @@ def _ingest_single_ppt(self, s3_path: str, course_name: str, **kwargs) -> str: metadatas: List[Dict[str, Any]] = [{ 'course_name': course_name, 's3_path': s3_path, - 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name), + 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name[37:]), 'pagenumber': '', 'timestamp': '', 'url': '', @@ -722,6 +725,11 @@ def split_and_upload(self, texts: List[str], metadatas: List[Dict[str, Any]]): contexts: List[Document] = text_splitter.create_documents(texts=texts, metadatas=metadatas) input_texts = [{'input': context.page_content, 'model': 'text-embedding-ada-002'} for context in contexts] + # check for duplicates + is_duplicate = self.check_for_duplicates(input_texts, metadatas) + if is_duplicate: + return "Success" + # adding chunk index to metadata for parent doc retrieval for i, context in enumerate(contexts): context.metadata['chunk_index'] = i @@ -1087,7 +1095,7 @@ def get_context_stuffed_prompt(self, user_question: str, course_name: str, top_n summary = f"\nSummary: {text}" all_texts += doc + summary + '\n' + separator + '\n' - stuffed_prompt = f"""Please answer the following question. + stuffed_prompt = """Please answer the following question. Use the context below, called 'your documents', only if it's helpful and don't use parts that are very irrelevant. It's good to quote 'your documents' directly using informal citations, like "in document X it says Y". Try to avoid giving false or misleading information. Feel free to say you don't know. Try to be helpful, polite, honest, sophisticated, emotionally aware, and humble-but-knowledgeable. @@ -1201,6 +1209,68 @@ def format_for_json(self, found_docs: List[Document]) -> List[Dict]: return contexts - + def check_for_duplicates(self, texts: List[Dict], metadatas: List[Dict[str, Any]]) -> bool: + """ + For given metadata, fetch docs from Supabase based on S3 path or URL. + If docs exists, concatenate the texts and compare with current texts, if same, return True. + """ + doc_table = os.getenv('NEW_NEW_NEWNEW_MATERIALS_SUPABASE_TABLE', '') + course_name = metadatas[0]['course_name'] + incoming_s3_path = metadatas[0]['s3_path'] + url = metadatas[0]['url'] + original_filename = incoming_s3_path.split('/')[-1][37:] # remove the 37-char uuid prefix + + # check if uuid exists in s3_path -- not all s3_paths have uuids! + incoming_filename = incoming_s3_path.split('/')[-1] + pattern = re.compile(r'[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}', re.I) # uuid V4 pattern, and v4 only. + if bool(pattern.search(incoming_filename)): + # uuid pattern exists -- remove the uuid and proceed with duplicate checking + original_filename = incoming_filename[37:] + else: + # do not remove anything and proceed with duplicate checking + original_filename = incoming_filename + + if incoming_s3_path: + filename = incoming_s3_path + supabase_contents = self.supabase_client.table(doc_table).select('id', 'contexts', 's3_path').eq('course_name', course_name).like('s3_path', '%' + original_filename + '%').order('id', desc=True).execute() + supabase_contents = supabase_contents.data + elif url: + filename = url + supabase_contents = self.supabase_client.table(doc_table).select('id', 'contexts', 's3_path').eq('course_name', course_name).eq('url', url).order('id', desc=True).execute() + supabase_contents = supabase_contents.data + else: + filename = None + supabase_contents = [] + + supabase_whole_text = "" + if len(supabase_contents) > 0: # if a doc with same filename exists in Supabase + # concatenate texts + supabase_contexts = supabase_contents[0] + for text in supabase_contexts['contexts']: + supabase_whole_text += text['text'] + + current_whole_text = "" + for text in texts: + current_whole_text += text['input'] + + if supabase_whole_text == current_whole_text: # matches the previous file + print(f"Duplicate ingested! 📄 s3_path: {filename}.") + return True + + else: # the file is updated + print(f"Updated file detected! Same filename, new contents. 📄 s3_path: {filename}") + + # call the delete function on older docs + for content in supabase_contents: + print("older s3_path to be deleted: ", content['s3_path']) + delete_status = self.delete_data(course_name, content['s3_path'], '') + print("delete_status: ", delete_status) + return False + + else: # filename does not already exist in Supabase, so its a brand new file + print(f"NOT a duplicate! 📄s3_path: {filename}") + return False + + if __name__ == '__main__': pass diff --git a/ai_ta_backend/web_scrape.py b/ai_ta_backend/web_scrape.py index 34d57453..e02361f8 100644 --- a/ai_ta_backend/web_scrape.py +++ b/ai_ta_backend/web_scrape.py @@ -3,6 +3,7 @@ import re import shutil import time +import uuid from collections import Counter from tempfile import NamedTemporaryFile from zipfile import ZipFile @@ -199,6 +200,8 @@ def ingest_file(self, key, course_name, path_name, base_url): print("Writing", key[2] ,"to temp file") temp_file.write(key[1]) temp_file.seek(0) + path_name = str(uuid.uuid4()) + '-' + path_name + print("path name in webscrape: ", path_name) s3_upload_path = "courses/"+ course_name + "/" + path_name + key[2] with open(temp_file.name, 'rb') as f: print("Uploading", key[2] ,"to S3")