Skip to content

Commit

Permalink
Merge branch 'main' into pub-api
Browse files Browse the repository at this point in the history
  • Loading branch information
star-nox committed Mar 20, 2024
2 parents 15b5822 + a450b12 commit 166b164
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 192 deletions.
7 changes: 7 additions & 0 deletions .beamignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
.venv
venv
.idea
.vscode
.git
*.pyc
__pycache__
168 changes: 116 additions & 52 deletions ai_ta_backend/beam/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import os
import re
import shutil
import time
import traceback
import uuid
from pathlib import Path
Expand All @@ -30,7 +31,6 @@
Docx2txtLoader,
GitLoader,
PythonLoader,
SRTLoader,
TextLoader,
UnstructuredExcelLoader,
UnstructuredPowerPointLoader,
Expand All @@ -40,7 +40,7 @@
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import Qdrant
from nomic_logging import delete_from_document_map, log_to_document_map
from nomic_logging import delete_from_document_map, log_to_document_map, rebuild_map
from OpenaiEmbeddings import OpenAIAPIProcessor
from PIL import Image
from posthog import Posthog
Expand Down Expand Up @@ -145,15 +145,17 @@ def loader():


# Triggers determine how your app is deployed
@app.rest_api(
# @app.rest_api(
@app.task_queue(
workers=4,
# callback_url='https://uiuc-chat-git-refactoringesttobeamserverless-kastanday.vercel.app/api/UIUC-api/ingestCallback',
callback_url='https://uiuc-chat-git-ingestprogresstracking-kastanday.vercel.app/api/UIUC-api/ingestTaskCallback',
max_pending_tasks=15_000,
max_retries=3,
timeout=-1,
loader=loader,
autoscaler=autoscaler)
def ingest(**inputs: Dict[str, Any]):

qdrant_client, vectorstore, s3_client, supabase_client, posthog = inputs["context"]

course_name: List[str] | str = inputs.get('course_name', '')
Expand All @@ -169,17 +171,61 @@ def ingest(**inputs: Dict[str, Any]):

ingester = Ingest(qdrant_client, vectorstore, s3_client, supabase_client, posthog)

if content:
success_fail_dict = ingester.ingest_single_web_text(course_name, base_url, url, content, readable_filename)
elif readable_filename == '':
success_fail_dict = ingester.bulk_ingest(course_name, s3_paths, base_url=base_url, url=url)
def run_ingest(course_name, s3_paths, base_url, url, readable_filename, content):
if content:
return ingester.ingest_single_web_text(course_name, base_url, url, content, readable_filename)
elif readable_filename == '':
return ingester.bulk_ingest(course_name, s3_paths, base_url=base_url, url=url)
else:
return ingester.bulk_ingest(course_name,
s3_paths,
readable_filename=readable_filename,
base_url=base_url,
url=url)

# First try
success_fail_dict = run_ingest(course_name, s3_paths, base_url, url, readable_filename, content)

# retries
num_retires = 5
for retry_num in range(1, num_retires):
if isinstance(success_fail_dict, str):
print(f"STRING ERROR: {success_fail_dict = }")
success_fail_dict = run_ingest(course_name, s3_paths, base_url, url, readable_filename, content)
time.sleep(13 * retry_num) # max is 65
elif success_fail_dict['failure_ingest']:
print(f"Ingest failure -- Retry attempt {retry_num}. File: {success_fail_dict}")
# s3_paths = success_fail_dict['failure_ingest'] # retry only failed paths.... what if this is a URL instead?
success_fail_dict = run_ingest(course_name, s3_paths, base_url, url, readable_filename, content)
time.sleep(13 * retry_num) # max is 65
else:
break

# Final failure / success check
if success_fail_dict['failure_ingest']:
print(f"INGEST FAILURE -- About to send to supabase. success_fail_dict: {success_fail_dict}")
document = {
"course_name":
course_name,
"s3_path":
s3_paths,
"readable_filename":
readable_filename,
"url":
url,
"base_url":
base_url,
"error":
success_fail_dict['failure_ingest']['error']
if isinstance(success_fail_dict['failure_ingest'], dict) else success_fail_dict['failure_ingest']
}
response = supabase_client.table('documents_failed').insert(document).execute() # type: ignore
print(f"Supabase ingest failure response: {response}")
else:
success_fail_dict = ingester.bulk_ingest(course_name,
s3_paths,
readable_filename=readable_filename,
base_url=base_url,
url=url)
print("Final success_fail_dict: ", success_fail_dict)
# Success case: rebuild nomic document map after all ingests are done
rebuild_status = rebuild_map(str(course_name), map_type='document')

print(f"Final success_fail_dict: {success_fail_dict}")
return json.dumps(success_fail_dict)


Expand All @@ -192,19 +238,21 @@ def __init__(self, qdrant_client, vectorstore, s3_client, supabase_client, posth
self.supabase_client = supabase_client
self.posthog = posthog

def bulk_ingest(self, course_name: str, s3_paths: Union[str, List[str]], **kwargs) -> Dict:
def bulk_ingest(self, course_name: str, s3_paths: Union[str, List[str]],
**kwargs) -> Dict[str, None | str | Dict[str, str]]:
"""
Bulk ingest a list of s3 paths into the vectorstore, and also into the supabase database.
-> Dict[str, str | Dict[str, str]]
"""

def _ingest_single(ingest_method: Callable, s3_path, *args, **kwargs):
"""Handle running an arbitrary ingest function for an individual file."""
# RUN INGEST METHOD
ret = ingest_method(s3_path, *args, **kwargs)
if ret == "Success":
success_status['success_ingest'].append(s3_path)
success_status['success_ingest'] = str(s3_path)
else:
success_status['failure_ingest'].append(s3_path)
success_status['failure_ingest'] = {'s3_path': str(s3_path), 'error': str(ret)}

# 👇👇👇👇 ADD NEW INGEST METHODS HERE 👇👇👇👇🎉
file_ingest_methods = {
Expand Down Expand Up @@ -235,7 +283,7 @@ def _ingest_single(ingest_method: Callable, s3_path, *args, **kwargs):
# 👆👆👆👆 ADD NEW INGEST METHODhe 👆👆👆👆🎉

print(f"Top of ingest, Course_name {course_name}. S3 paths {s3_paths}")
success_status = {"success_ingest": [], "failure_ingest": []}
success_status: Dict[str, None | str | Dict[str, str]] = {"success_ingest": None, "failure_ingest": None}
try:
if isinstance(s3_paths, str):
s3_paths = [s3_paths]
Expand All @@ -260,15 +308,18 @@ def _ingest_single(ingest_method: Callable, s3_path, *args, **kwargs):
# No supported ingest... Fallback to attempting utf-8 decoding, otherwise fail.
try:
self._ingest_single_txt(s3_path, course_name)
success_status['success_ingest'].append(s3_path)
success_status['success_ingest'] = s3_path
print(f"No ingest methods -- Falling back to UTF-8 INGEST... s3_path = {s3_path}")
except Exception as e:
print(
f"We don't have a ingest method for this filetype: {file_extension}. As a last-ditch effort, we tried to ingest the file as utf-8 text, but that failed too. File is unsupported: {s3_path}. UTF-8 ingest error: {e}"
)
success_status['failure_ingest'].append(
f"We don't have a ingest method for this filetype: {file_extension} (with generic type {mime_type}), for file: {s3_path}"
)
success_status['failure_ingest'] = {
's3_path':
s3_path,
'error':
f"We don't have a ingest method for this filetype: {file_extension} (with generic type {mime_type}), for file: {s3_path}"
}
self.posthog.capture(
'distinct_id_of_the_user',
event='ingest_failure',
Expand All @@ -285,9 +336,10 @@ def _ingest_single(ingest_method: Callable, s3_path, *args, **kwargs):

return success_status
except Exception as e:
err = f"❌❌ Error in /ingest: `{inspect.currentframe().f_code.co_name}`: {e}\nTraceback:\n", traceback.format_exc()
err = f"❌❌ Error in /ingest: `{inspect.currentframe().f_code.co_name}`: {e}\nTraceback:\n", traceback.format_exc(
) # type: ignore

success_status['failure_ingest'].append(f"MAJOR ERROR IN /bulk_ingest: Error: {err}")
success_status['failure_ingest'] = {'s3_path': s3_path, 'error': f"MAJOR ERROR DURING INGEST: {err}"}
self.posthog.capture('distinct_id_of_the_user',
event='ingest_failure',
properties={
Expand All @@ -298,7 +350,7 @@ def _ingest_single(ingest_method: Callable, s3_path, *args, **kwargs):
})

sentry_sdk.capture_exception(e)
print(f"MAJOR ERROR IN /bulk_ingest: Error: {str(e)}")
print(f"MAJOR ERROR IN /bulk_ingest: {str(e)}")
return success_status

def ingest_single_web_text(self, course_name: str, base_url: str, url: str, content: str, readable_filename: str):
Expand All @@ -313,6 +365,7 @@ def ingest_single_web_text(self, course_name: str, base_url: str, url: str, cont
'content': content,
'title': readable_filename
})
success_or_failure: Dict[str, None | str | Dict[str, str]] = {"success_ingest": None, "failure_ingest": None}
try:
# if not, ingest the text
text = [content]
Expand All @@ -335,14 +388,15 @@ def ingest_single_web_text(self, course_name: str, base_url: str, url: str, cont
'title': readable_filename
})

return f"✅ Success for web text. title: {readable_filename}, url: {url}, "
success_or_failure['success_ingest'] = url
return success_or_failure
except Exception as e:

err = f"❌❌ Error in (web text ingest): `{inspect.currentframe().f_code.co_name}`: {e}\nTraceback:\n", traceback.format_exc(
)
) # type: ignore
print(err)
sentry_sdk.capture_exception(e)
return str(err)
success_or_failure['failure_ingest'] = {'url': url, 'error': str(err)}
return success_or_failure

def _ingest_single_py(self, s3_path: str, course_name: str, **kwargs):
try:
Expand Down Expand Up @@ -452,9 +506,14 @@ def _ingest_single_video(self, s3_path: str, course_name: str, **kwargs) -> str:
"""
print("Starting ingest video or audio")
try:
# Ensure the media directory exists
media_dir = "media"
if not os.path.exists(media_dir):
os.makedirs(media_dir)

# check for file extension
file_ext = Path(s3_path).suffix
openai.api_key = os.getenv('OPENAI_API_KEY')
openai.api_key = os.getenv('VLADS_OPENAI_KEY')
transcript_list = []
with NamedTemporaryFile(suffix=file_ext) as video_tmpfile:
# download from S3 into an video tmpfile
Expand All @@ -463,7 +522,7 @@ def _ingest_single_video(self, s3_path: str, course_name: str, **kwargs) -> str:
mp4_version = AudioSegment.from_file(video_tmpfile.name, file_ext[1:])

# save the extracted audio as a temporary webm file
with NamedTemporaryFile(suffix=".webm", dir="media", delete=False) as webm_tmpfile:
with NamedTemporaryFile(suffix=".webm", dir=media_dir, delete=False) as webm_tmpfile:
mp4_version.export(webm_tmpfile, format="webm")

# check file size
Expand All @@ -478,7 +537,7 @@ def _ingest_single_video(self, s3_path: str, course_name: str, **kwargs) -> str:
count = 0

while count < file_count:
with NamedTemporaryFile(suffix=".webm", dir="media", delete=False) as split_tmp:
with NamedTemporaryFile(suffix=".webm", dir=media_dir, delete=False) as split_tmp:
if count == file_count - 1:
# last segment
audio_chunk = full_audio[start:]
Expand Down Expand Up @@ -555,27 +614,33 @@ def _ingest_single_docx(self, s3_path: str, course_name: str, **kwargs) -> str:

def _ingest_single_srt(self, s3_path: str, course_name: str, **kwargs) -> str:
try:
with NamedTemporaryFile() as tmpfile:
# download from S3 into pdf_tmpfile
self.s3_client.download_fileobj(Bucket=os.getenv('S3_BUCKET_NAME'), Key=s3_path, Fileobj=tmpfile)
import pysrt

loader = SRTLoader(tmpfile.name)
documents = loader.load()
# NOTE: slightly different method for .txt files, no need for download. It's part of the 'body'
response = self.s3_client.get_object(Bucket=os.environ['S3_BUCKET_NAME'], Key=s3_path)
raw_text = response['Body'].read().decode('utf-8')

texts = [doc.page_content for doc in documents]
metadatas: List[Dict[str, Any]] = [{
'course_name': course_name,
's3_path': s3_path,
'readable_filename': kwargs.get('readable_filename',
Path(s3_path).name[37:]),
'pagenumber': '',
'timestamp': '',
'url': '',
'base_url': '',
} for doc in documents]
print("UTF-8 text to ingest as SRT:", raw_text)
parsed_info = pysrt.from_string(raw_text)
text = " ".join([t.text for t in parsed_info]) # type: ignore
print(f"Final SRT ingest: {text}")

self.split_and_upload(texts=texts, metadatas=metadatas)
return "Success"
texts = [text]
metadatas: List[Dict[str, Any]] = [{
'course_name': course_name,
's3_path': s3_path,
'readable_filename': kwargs.get('readable_filename',
Path(s3_path).name[37:]),
'pagenumber': '',
'timestamp': '',
'url': '',
'base_url': '',
}]
if len(text) == 0:
return "Error: SRT file appears empty. Skipping."

self.split_and_upload(texts=texts, metadatas=metadatas)
return "Success"
except Exception as e:
err = f"❌❌ Error in (SRT ingest): `{inspect.currentframe().f_code.co_name}`: {e}\nTraceback:\n", traceback.format_exc(
)
Expand Down Expand Up @@ -876,8 +941,7 @@ def split_and_upload(self, texts: List[str], metadatas: List[Dict[str, Any]]):
'base_url': metadatas[0].get('base_url', None),
})

print("In split and upload")
print(f"metadatas: {metadatas}")
print(f"In split and upload. Metadatas: {metadatas}")
print(f"Texts: {texts}")
assert len(texts) == len(
metadatas
Expand Down
Loading

0 comments on commit 166b164

Please sign in to comment.