Skip to content

Commit

Permalink
Guarentee unique s3 upload paths, support file updates (e.g. duplicat…
Browse files Browse the repository at this point in the history
…e file guardfor Cron jobs) (#99)

* added the add_users() for Canvas

* added canvas course ingest

* updated requirements

* added .md ingest and fixed .py ingest

* deleted test ipynb file

* added nomic viz

* added canvas file update function

* completed update function

* updated course export to include all contents

* modified to handle diff file structures of downloaded content

* modified canvas update

* modified ingest function

* modified update_files() for file replacement

* removed the extra os.remove()

* fix underscore to dash in for pip

* removed json import and added abort to canvas functions

* created separate PR for file update

* added file-update logic in ingest, WIP

* removed irrelevant text files

* modified pdf ingest function

* fixed PDF duplicate issue

* removed unwanted files

* updated nomic version in requirements.txt

* modified s3_paths

* testing unique filenames in aws upload

* added missing library to requirements.txt

* finished check_for_duplicates()

* fixed filename errors

* minor corrections

* added a uuid check in check_for_duplicates()

* regex depends on this being a dash

* regex depends on this being a dash

* Fix bug when no duplicate exists.

* cleaning up prints, testing looks good. ready to merge

* Further print and logging refinement

* Remove s3 pased method for de-duplication, use Supabase only

* remove duplicate imports

* remove new requirement

* Final print cleanups

* remove pypdf import

---------

Co-authored-by: root <root@ASMITA>
Co-authored-by: Kastan Day <[email protected]>
  • Loading branch information
3 people authored Dec 12, 2023
1 parent cfca31c commit 5c93fe5
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 18 deletions.
9 changes: 7 additions & 2 deletions ai_ta_backend/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from multiprocessing import Lock, cpu_count
from multiprocessing.pool import ThreadPool
from typing import List, Optional

import uuid
import boto3


Expand Down Expand Up @@ -38,7 +38,12 @@ def upload_data_files_to_s3(course_name: str, localdir: str) -> Optional[List[st
s3_paths_lock = Lock()

def upload(myfile):
s3_file = f"courses/{course_name}/{os.path.basename(myfile)}"
# get the last part of the path and append unique ID before it
directory, old_filename = os.path.split(myfile)
new_filename = str(uuid.uuid4()) + '-' + old_filename
new_filepath = os.path.join(directory, new_filename)

s3_file = f"courses/{course_name}/{os.path.basename(new_filepath)}"
s3.upload_file(myfile, os.getenv('S3_BUCKET_NAME'), s3_file)
with s3_paths_lock:
s3_paths.append(s3_file)
Expand Down
102 changes: 86 additions & 16 deletions ai_ta_backend/vector_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import time
import traceback
import uuid
import re
from importlib import metadata
from pathlib import Path
from tempfile import NamedTemporaryFile
Expand Down Expand Up @@ -167,7 +168,7 @@ def _ingest_single_py(self, s3_path: str, course_name: str, **kwargs):
metadatas: List[Dict[str, Any]] = [{
'course_name': course_name,
's3_path': s3_path,
'readable_filename': kwargs.get('readable_filename', Path(s3_path).name),
'readable_filename': kwargs.get('readable_filename', Path(s3_path).name[37:]),
'pagenumber': '',
'timestamp': '',
'url': '',
Expand All @@ -177,6 +178,7 @@ def _ingest_single_py(self, s3_path: str, course_name: str, **kwargs):
os.remove(file_path)

success_or_failure = self.split_and_upload(texts=texts, metadatas=metadatas)
print("Python ingest: ", success_or_failure)
return success_or_failure

except Exception as e:
Expand All @@ -199,7 +201,7 @@ def _ingest_single_vtt(self, s3_path: str, course_name: str, **kwargs):
metadatas: List[Dict[str, Any]] = [{
'course_name': course_name,
's3_path': s3_path,
'readable_filename': kwargs.get('readable_filename', Path(s3_path).name),
'readable_filename': kwargs.get('readable_filename', Path(s3_path).name[37:]),
'pagenumber': '',
'timestamp': '',
'url': '',
Expand All @@ -225,6 +227,7 @@ def _ingest_html(self, s3_path: str, course_name: str, **kwargs) -> str:
title = title.replace("_", " ")
title = title.replace("/", " ")
title = title.strip()
title = title[37:] # removing the uuid prefix
text = [soup.get_text()]

metadata: List[Dict[str, Any]] = [{
Expand Down Expand Up @@ -306,7 +309,7 @@ def _ingest_single_video(self, s3_path: str, course_name: str, **kwargs) -> str:
metadatas: List[Dict[str, Any]] = [{
'course_name': course_name,
's3_path': s3_path,
'readable_filename': kwargs.get('readable_filename', Path(s3_path).name),
'readable_filename': kwargs.get('readable_filename', Path(s3_path).name[37:]),
'pagenumber': '',
'timestamp': text.index(txt),
'url': '',
Expand All @@ -332,7 +335,7 @@ def _ingest_single_docx(self, s3_path: str, course_name: str, **kwargs) -> str:
metadatas: List[Dict[str, Any]] = [{
'course_name': course_name,
's3_path': s3_path,
'readable_filename': kwargs.get('readable_filename', Path(s3_path).name),
'readable_filename': kwargs.get('readable_filename', Path(s3_path).name[37:]),
'pagenumber': '',
'timestamp': '',
'url': '',
Expand All @@ -359,7 +362,7 @@ def _ingest_single_srt(self, s3_path: str, course_name: str, **kwargs) -> str:
metadatas: List[Dict[str, Any]] = [{
'course_name': course_name,
's3_path': s3_path,
'readable_filename': kwargs.get('readable_filename', Path(s3_path).name),
'readable_filename': kwargs.get('readable_filename', Path(s3_path).name[37:]),
'pagenumber': '',
'timestamp': '',
'url': '',
Expand Down Expand Up @@ -387,7 +390,7 @@ def _ingest_single_excel(self, s3_path: str, course_name: str, **kwargs) -> str:
metadatas: List[Dict[str, Any]] = [{
'course_name': course_name,
's3_path': s3_path,
'readable_filename': kwargs.get('readable_filename', Path(s3_path).name),
'readable_filename': kwargs.get('readable_filename', Path(s3_path).name[37:]),
'pagenumber': '',
'timestamp': '',
'url': '',
Expand Down Expand Up @@ -422,7 +425,7 @@ def _ingest_single_image(self, s3_path: str, course_name: str, **kwargs) -> str:
metadatas: List[Dict[str, Any]] = [{
'course_name': course_name,
's3_path': s3_path,
'readable_filename': kwargs.get('readable_filename', Path(s3_path).name),
'readable_filename': kwargs.get('readable_filename', Path(s3_path).name[37:]),
'pagenumber': '',
'timestamp': '',
'url': '',
Expand All @@ -449,7 +452,7 @@ def _ingest_single_csv(self, s3_path: str, course_name: str, **kwargs) -> str:
metadatas: List[Dict[str, Any]] = [{
'course_name': course_name,
's3_path': s3_path,
'readable_filename': kwargs.get('readable_filename', Path(s3_path).name),
'readable_filename': kwargs.get('readable_filename', Path(s3_path).name[37:]),
'pagenumber': '',
'timestamp': '',
'url': '',
Expand Down Expand Up @@ -500,7 +503,7 @@ def _ingest_single_pdf(self, s3_path: str, course_name: str, **kwargs):

# Extract text
text = page.get_text().encode("utf8").decode("utf8", errors='ignore') # get plain text (is in UTF-8)
pdf_pages_OCRed.append(dict(text=text, page_number=i, readable_filename=Path(s3_path).name))
pdf_pages_OCRed.append(dict(text=text, page_number=i, readable_filename=Path(s3_path).name[37:]))

metadatas: List[Dict[str, Any]] = [
{
Expand All @@ -515,10 +518,10 @@ def _ingest_single_pdf(self, s3_path: str, course_name: str, **kwargs):
]
pdf_texts = [page['text'] for page in pdf_pages_OCRed]

self.split_and_upload(texts=pdf_texts, metadatas=metadatas)
print("Success pdf ingest")
success_or_failure = self.split_and_upload(texts=pdf_texts, metadatas=metadatas)
return success_or_failure
except Exception as e:
err = f"❌❌ Error in (PDF ingest): `{inspect.currentframe().f_code.co_name}`: {e}\nTraceback:\n", traceback.format_exc()
err = f"❌❌ Error in (PDF ingest): `{inspect.currentframe().f_code.co_name}`: {e}\nTraceback:\n", traceback.format_exc() # type: ignore
print(err)
return err
return "Success"
Expand All @@ -543,7 +546,7 @@ def _ingest_single_txt(self, s3_path: str, course_name: str, **kwargs) -> str:
metadatas: List[Dict[str, Any]] = [{
'course_name': course_name,
's3_path': s3_path,
'readable_filename': kwargs.get('readable_filename', Path(s3_path).name),
'readable_filename': kwargs.get('readable_filename', Path(s3_path).name[37:]),
'pagenumber': '',
'timestamp': '',
'url': '',
Expand Down Expand Up @@ -575,7 +578,7 @@ def _ingest_single_ppt(self, s3_path: str, course_name: str, **kwargs) -> str:
metadatas: List[Dict[str, Any]] = [{
'course_name': course_name,
's3_path': s3_path,
'readable_filename': kwargs.get('readable_filename', Path(s3_path).name),
'readable_filename': kwargs.get('readable_filename', Path(s3_path).name[37:]),
'pagenumber': '',
'timestamp': '',
'url': '',
Expand Down Expand Up @@ -722,6 +725,11 @@ def split_and_upload(self, texts: List[str], metadatas: List[Dict[str, Any]]):
contexts: List[Document] = text_splitter.create_documents(texts=texts, metadatas=metadatas)
input_texts = [{'input': context.page_content, 'model': 'text-embedding-ada-002'} for context in contexts]

# check for duplicates
is_duplicate = self.check_for_duplicates(input_texts, metadatas)
if is_duplicate:
return "Success"

# adding chunk index to metadata for parent doc retrieval
for i, context in enumerate(contexts):
context.metadata['chunk_index'] = i
Expand Down Expand Up @@ -1087,7 +1095,7 @@ def get_context_stuffed_prompt(self, user_question: str, course_name: str, top_n
summary = f"\nSummary: {text}"
all_texts += doc + summary + '\n' + separator + '\n'

stuffed_prompt = f"""Please answer the following question.
stuffed_prompt = """Please answer the following question.
Use the context below, called 'your documents', only if it's helpful and don't use parts that are very irrelevant.
It's good to quote 'your documents' directly using informal citations, like "in document X it says Y". Try to avoid giving false or misleading information. Feel free to say you don't know.
Try to be helpful, polite, honest, sophisticated, emotionally aware, and humble-but-knowledgeable.
Expand Down Expand Up @@ -1201,6 +1209,68 @@ def format_for_json(self, found_docs: List[Document]) -> List[Dict]:

return contexts


def check_for_duplicates(self, texts: List[Dict], metadatas: List[Dict[str, Any]]) -> bool:
"""
For given metadata, fetch docs from Supabase based on S3 path or URL.
If docs exists, concatenate the texts and compare with current texts, if same, return True.
"""
doc_table = os.getenv('NEW_NEW_NEWNEW_MATERIALS_SUPABASE_TABLE', '')
course_name = metadatas[0]['course_name']
incoming_s3_path = metadatas[0]['s3_path']
url = metadatas[0]['url']
original_filename = incoming_s3_path.split('/')[-1][37:] # remove the 37-char uuid prefix

# check if uuid exists in s3_path -- not all s3_paths have uuids!
incoming_filename = incoming_s3_path.split('/')[-1]
pattern = re.compile(r'[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}', re.I) # uuid V4 pattern, and v4 only.
if bool(pattern.search(incoming_filename)):
# uuid pattern exists -- remove the uuid and proceed with duplicate checking
original_filename = incoming_filename[37:]
else:
# do not remove anything and proceed with duplicate checking
original_filename = incoming_filename

if incoming_s3_path:
filename = incoming_s3_path
supabase_contents = self.supabase_client.table(doc_table).select('id', 'contexts', 's3_path').eq('course_name', course_name).like('s3_path', '%' + original_filename + '%').order('id', desc=True).execute()
supabase_contents = supabase_contents.data
elif url:
filename = url
supabase_contents = self.supabase_client.table(doc_table).select('id', 'contexts', 's3_path').eq('course_name', course_name).eq('url', url).order('id', desc=True).execute()
supabase_contents = supabase_contents.data
else:
filename = None
supabase_contents = []

supabase_whole_text = ""
if len(supabase_contents) > 0: # if a doc with same filename exists in Supabase
# concatenate texts
supabase_contexts = supabase_contents[0]
for text in supabase_contexts['contexts']:
supabase_whole_text += text['text']

current_whole_text = ""
for text in texts:
current_whole_text += text['input']

if supabase_whole_text == current_whole_text: # matches the previous file
print(f"Duplicate ingested! 📄 s3_path: {filename}.")
return True

else: # the file is updated
print(f"Updated file detected! Same filename, new contents. 📄 s3_path: {filename}")

# call the delete function on older docs
for content in supabase_contents:
print("older s3_path to be deleted: ", content['s3_path'])
delete_status = self.delete_data(course_name, content['s3_path'], '')
print("delete_status: ", delete_status)
return False

else: # filename does not already exist in Supabase, so its a brand new file
print(f"NOT a duplicate! 📄s3_path: {filename}")
return False


if __name__ == '__main__':
pass
3 changes: 3 additions & 0 deletions ai_ta_backend/web_scrape.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import re
import shutil
import time
import uuid
from collections import Counter
from tempfile import NamedTemporaryFile
from zipfile import ZipFile
Expand Down Expand Up @@ -199,6 +200,8 @@ def ingest_file(self, key, course_name, path_name, base_url):
print("Writing", key[2] ,"to temp file")
temp_file.write(key[1])
temp_file.seek(0)
path_name = str(uuid.uuid4()) + '-' + path_name
print("path name in webscrape: ", path_name)
s3_upload_path = "courses/"+ course_name + "/" + path_name + key[2]
with open(temp_file.name, 'rb') as f:
print("Uploading", key[2] ,"to S3")
Expand Down

0 comments on commit 5c93fe5

Please sign in to comment.