From 5c93fe58f2cd18975c3d9af8d790e94ca9a8f171 Mon Sep 17 00:00:00 2001 From: Asmita Dabholkar Date: Mon, 11 Dec 2023 19:18:00 -0600 Subject: [PATCH] Guarentee unique s3 upload paths, support file updates (e.g. duplicate file guardfor Cron jobs) (#99) * added the add_users() for Canvas * added canvas course ingest * updated requirements * added .md ingest and fixed .py ingest * deleted test ipynb file * added nomic viz * added canvas file update function * completed update function * updated course export to include all contents * modified to handle diff file structures of downloaded content * modified canvas update * modified ingest function * modified update_files() for file replacement * removed the extra os.remove() * fix underscore to dash in for pip * removed json import and added abort to canvas functions * created separate PR for file update * added file-update logic in ingest, WIP * removed irrelevant text files * modified pdf ingest function * fixed PDF duplicate issue * removed unwanted files * updated nomic version in requirements.txt * modified s3_paths * testing unique filenames in aws upload * added missing library to requirements.txt * finished check_for_duplicates() * fixed filename errors * minor corrections * added a uuid check in check_for_duplicates() * regex depends on this being a dash * regex depends on this being a dash * Fix bug when no duplicate exists. * cleaning up prints, testing looks good. ready to merge * Further print and logging refinement * Remove s3 pased method for de-duplication, use Supabase only * remove duplicate imports * remove new requirement * Final print cleanups * remove pypdf import --------- Co-authored-by: root Co-authored-by: Kastan Day --- ai_ta_backend/aws.py | 9 ++- ai_ta_backend/vector_database.py | 102 ++++++++++++++++++++++++++----- ai_ta_backend/web_scrape.py | 3 + 3 files changed, 96 insertions(+), 18 deletions(-) diff --git a/ai_ta_backend/aws.py b/ai_ta_backend/aws.py index 53e4c8c2..b14bb779 100644 --- a/ai_ta_backend/aws.py +++ b/ai_ta_backend/aws.py @@ -2,7 +2,7 @@ from multiprocessing import Lock, cpu_count from multiprocessing.pool import ThreadPool from typing import List, Optional - +import uuid import boto3 @@ -38,7 +38,12 @@ def upload_data_files_to_s3(course_name: str, localdir: str) -> Optional[List[st s3_paths_lock = Lock() def upload(myfile): - s3_file = f"courses/{course_name}/{os.path.basename(myfile)}" + # get the last part of the path and append unique ID before it + directory, old_filename = os.path.split(myfile) + new_filename = str(uuid.uuid4()) + '-' + old_filename + new_filepath = os.path.join(directory, new_filename) + + s3_file = f"courses/{course_name}/{os.path.basename(new_filepath)}" s3.upload_file(myfile, os.getenv('S3_BUCKET_NAME'), s3_file) with s3_paths_lock: s3_paths.append(s3_file) diff --git a/ai_ta_backend/vector_database.py b/ai_ta_backend/vector_database.py index d94e06a2..6ed65da9 100644 --- a/ai_ta_backend/vector_database.py +++ b/ai_ta_backend/vector_database.py @@ -8,6 +8,7 @@ import time import traceback import uuid +import re from importlib import metadata from pathlib import Path from tempfile import NamedTemporaryFile @@ -167,7 +168,7 @@ def _ingest_single_py(self, s3_path: str, course_name: str, **kwargs): metadatas: List[Dict[str, Any]] = [{ 'course_name': course_name, 's3_path': s3_path, - 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name), + 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name[37:]), 'pagenumber': '', 'timestamp': '', 'url': '', @@ -177,6 +178,7 @@ def _ingest_single_py(self, s3_path: str, course_name: str, **kwargs): os.remove(file_path) success_or_failure = self.split_and_upload(texts=texts, metadatas=metadatas) + print("Python ingest: ", success_or_failure) return success_or_failure except Exception as e: @@ -199,7 +201,7 @@ def _ingest_single_vtt(self, s3_path: str, course_name: str, **kwargs): metadatas: List[Dict[str, Any]] = [{ 'course_name': course_name, 's3_path': s3_path, - 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name), + 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name[37:]), 'pagenumber': '', 'timestamp': '', 'url': '', @@ -225,6 +227,7 @@ def _ingest_html(self, s3_path: str, course_name: str, **kwargs) -> str: title = title.replace("_", " ") title = title.replace("/", " ") title = title.strip() + title = title[37:] # removing the uuid prefix text = [soup.get_text()] metadata: List[Dict[str, Any]] = [{ @@ -306,7 +309,7 @@ def _ingest_single_video(self, s3_path: str, course_name: str, **kwargs) -> str: metadatas: List[Dict[str, Any]] = [{ 'course_name': course_name, 's3_path': s3_path, - 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name), + 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name[37:]), 'pagenumber': '', 'timestamp': text.index(txt), 'url': '', @@ -332,7 +335,7 @@ def _ingest_single_docx(self, s3_path: str, course_name: str, **kwargs) -> str: metadatas: List[Dict[str, Any]] = [{ 'course_name': course_name, 's3_path': s3_path, - 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name), + 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name[37:]), 'pagenumber': '', 'timestamp': '', 'url': '', @@ -359,7 +362,7 @@ def _ingest_single_srt(self, s3_path: str, course_name: str, **kwargs) -> str: metadatas: List[Dict[str, Any]] = [{ 'course_name': course_name, 's3_path': s3_path, - 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name), + 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name[37:]), 'pagenumber': '', 'timestamp': '', 'url': '', @@ -387,7 +390,7 @@ def _ingest_single_excel(self, s3_path: str, course_name: str, **kwargs) -> str: metadatas: List[Dict[str, Any]] = [{ 'course_name': course_name, 's3_path': s3_path, - 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name), + 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name[37:]), 'pagenumber': '', 'timestamp': '', 'url': '', @@ -422,7 +425,7 @@ def _ingest_single_image(self, s3_path: str, course_name: str, **kwargs) -> str: metadatas: List[Dict[str, Any]] = [{ 'course_name': course_name, 's3_path': s3_path, - 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name), + 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name[37:]), 'pagenumber': '', 'timestamp': '', 'url': '', @@ -449,7 +452,7 @@ def _ingest_single_csv(self, s3_path: str, course_name: str, **kwargs) -> str: metadatas: List[Dict[str, Any]] = [{ 'course_name': course_name, 's3_path': s3_path, - 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name), + 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name[37:]), 'pagenumber': '', 'timestamp': '', 'url': '', @@ -500,7 +503,7 @@ def _ingest_single_pdf(self, s3_path: str, course_name: str, **kwargs): # Extract text text = page.get_text().encode("utf8").decode("utf8", errors='ignore') # get plain text (is in UTF-8) - pdf_pages_OCRed.append(dict(text=text, page_number=i, readable_filename=Path(s3_path).name)) + pdf_pages_OCRed.append(dict(text=text, page_number=i, readable_filename=Path(s3_path).name[37:])) metadatas: List[Dict[str, Any]] = [ { @@ -515,10 +518,10 @@ def _ingest_single_pdf(self, s3_path: str, course_name: str, **kwargs): ] pdf_texts = [page['text'] for page in pdf_pages_OCRed] - self.split_and_upload(texts=pdf_texts, metadatas=metadatas) - print("Success pdf ingest") + success_or_failure = self.split_and_upload(texts=pdf_texts, metadatas=metadatas) + return success_or_failure except Exception as e: - err = f"❌❌ Error in (PDF ingest): `{inspect.currentframe().f_code.co_name}`: {e}\nTraceback:\n", traceback.format_exc() + err = f"❌❌ Error in (PDF ingest): `{inspect.currentframe().f_code.co_name}`: {e}\nTraceback:\n", traceback.format_exc() # type: ignore print(err) return err return "Success" @@ -543,7 +546,7 @@ def _ingest_single_txt(self, s3_path: str, course_name: str, **kwargs) -> str: metadatas: List[Dict[str, Any]] = [{ 'course_name': course_name, 's3_path': s3_path, - 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name), + 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name[37:]), 'pagenumber': '', 'timestamp': '', 'url': '', @@ -575,7 +578,7 @@ def _ingest_single_ppt(self, s3_path: str, course_name: str, **kwargs) -> str: metadatas: List[Dict[str, Any]] = [{ 'course_name': course_name, 's3_path': s3_path, - 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name), + 'readable_filename': kwargs.get('readable_filename', Path(s3_path).name[37:]), 'pagenumber': '', 'timestamp': '', 'url': '', @@ -722,6 +725,11 @@ def split_and_upload(self, texts: List[str], metadatas: List[Dict[str, Any]]): contexts: List[Document] = text_splitter.create_documents(texts=texts, metadatas=metadatas) input_texts = [{'input': context.page_content, 'model': 'text-embedding-ada-002'} for context in contexts] + # check for duplicates + is_duplicate = self.check_for_duplicates(input_texts, metadatas) + if is_duplicate: + return "Success" + # adding chunk index to metadata for parent doc retrieval for i, context in enumerate(contexts): context.metadata['chunk_index'] = i @@ -1087,7 +1095,7 @@ def get_context_stuffed_prompt(self, user_question: str, course_name: str, top_n summary = f"\nSummary: {text}" all_texts += doc + summary + '\n' + separator + '\n' - stuffed_prompt = f"""Please answer the following question. + stuffed_prompt = """Please answer the following question. Use the context below, called 'your documents', only if it's helpful and don't use parts that are very irrelevant. It's good to quote 'your documents' directly using informal citations, like "in document X it says Y". Try to avoid giving false or misleading information. Feel free to say you don't know. Try to be helpful, polite, honest, sophisticated, emotionally aware, and humble-but-knowledgeable. @@ -1201,6 +1209,68 @@ def format_for_json(self, found_docs: List[Document]) -> List[Dict]: return contexts - + def check_for_duplicates(self, texts: List[Dict], metadatas: List[Dict[str, Any]]) -> bool: + """ + For given metadata, fetch docs from Supabase based on S3 path or URL. + If docs exists, concatenate the texts and compare with current texts, if same, return True. + """ + doc_table = os.getenv('NEW_NEW_NEWNEW_MATERIALS_SUPABASE_TABLE', '') + course_name = metadatas[0]['course_name'] + incoming_s3_path = metadatas[0]['s3_path'] + url = metadatas[0]['url'] + original_filename = incoming_s3_path.split('/')[-1][37:] # remove the 37-char uuid prefix + + # check if uuid exists in s3_path -- not all s3_paths have uuids! + incoming_filename = incoming_s3_path.split('/')[-1] + pattern = re.compile(r'[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}', re.I) # uuid V4 pattern, and v4 only. + if bool(pattern.search(incoming_filename)): + # uuid pattern exists -- remove the uuid and proceed with duplicate checking + original_filename = incoming_filename[37:] + else: + # do not remove anything and proceed with duplicate checking + original_filename = incoming_filename + + if incoming_s3_path: + filename = incoming_s3_path + supabase_contents = self.supabase_client.table(doc_table).select('id', 'contexts', 's3_path').eq('course_name', course_name).like('s3_path', '%' + original_filename + '%').order('id', desc=True).execute() + supabase_contents = supabase_contents.data + elif url: + filename = url + supabase_contents = self.supabase_client.table(doc_table).select('id', 'contexts', 's3_path').eq('course_name', course_name).eq('url', url).order('id', desc=True).execute() + supabase_contents = supabase_contents.data + else: + filename = None + supabase_contents = [] + + supabase_whole_text = "" + if len(supabase_contents) > 0: # if a doc with same filename exists in Supabase + # concatenate texts + supabase_contexts = supabase_contents[0] + for text in supabase_contexts['contexts']: + supabase_whole_text += text['text'] + + current_whole_text = "" + for text in texts: + current_whole_text += text['input'] + + if supabase_whole_text == current_whole_text: # matches the previous file + print(f"Duplicate ingested! 📄 s3_path: {filename}.") + return True + + else: # the file is updated + print(f"Updated file detected! Same filename, new contents. 📄 s3_path: {filename}") + + # call the delete function on older docs + for content in supabase_contents: + print("older s3_path to be deleted: ", content['s3_path']) + delete_status = self.delete_data(course_name, content['s3_path'], '') + print("delete_status: ", delete_status) + return False + + else: # filename does not already exist in Supabase, so its a brand new file + print(f"NOT a duplicate! 📄s3_path: {filename}") + return False + + if __name__ == '__main__': pass diff --git a/ai_ta_backend/web_scrape.py b/ai_ta_backend/web_scrape.py index 48707a6b..0c6a07d8 100644 --- a/ai_ta_backend/web_scrape.py +++ b/ai_ta_backend/web_scrape.py @@ -3,6 +3,7 @@ import re import shutil import time +import uuid from collections import Counter from tempfile import NamedTemporaryFile from zipfile import ZipFile @@ -199,6 +200,8 @@ def ingest_file(self, key, course_name, path_name, base_url): print("Writing", key[2] ,"to temp file") temp_file.write(key[1]) temp_file.seek(0) + path_name = str(uuid.uuid4()) + '-' + path_name + print("path name in webscrape: ", path_name) s3_upload_path = "courses/"+ course_name + "/" + path_name + key[2] with open(temp_file.name, 'rb') as f: print("Uploading", key[2] ,"to S3")