Skip to content

Commit

Permalink
Fix /ingest for Video and SRT files
Browse files Browse the repository at this point in the history
  • Loading branch information
KastanDay committed Mar 16, 2024
1 parent 6e6fd0d commit e32ee20
Showing 1 changed file with 35 additions and 28 deletions.
63 changes: 35 additions & 28 deletions ai_ta_backend/beam/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
Docx2txtLoader,
GitLoader,
PythonLoader,
SRTLoader,
TextLoader,
UnstructuredExcelLoader,
UnstructuredPowerPointLoader,
Expand All @@ -41,7 +40,6 @@
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import Qdrant

from nomic_logging import delete_from_document_map, log_to_document_map, rebuild_map
from OpenaiEmbeddings import OpenAIAPIProcessor
from PIL import Image
Expand Down Expand Up @@ -157,7 +155,7 @@ def loader():
loader=loader,
autoscaler=autoscaler)
def ingest(**inputs: Dict[str, Any]):

qdrant_client, vectorstore, s3_client, supabase_client, posthog = inputs["context"]

course_name: List[str] | str = inputs.get('course_name', '')
Expand All @@ -173,8 +171,6 @@ def ingest(**inputs: Dict[str, Any]):

ingester = Ingest(qdrant_client, vectorstore, s3_client, supabase_client, posthog)



def run_ingest(course_name, s3_paths, base_url, url, readable_filename, content):
if content:
return ingester.ingest_single_web_text(course_name, base_url, url, content, readable_filename)
Expand Down Expand Up @@ -204,7 +200,7 @@ def run_ingest(course_name, s3_paths, base_url, url, readable_filename, content)
time.sleep(13 * retry_num) # max is 65
else:
break

# Final failure / success check
if success_fail_dict['failure_ingest']:
print(f"INGEST FAILURE -- About to send to supabase. success_fail_dict: {success_fail_dict}")
Expand All @@ -225,7 +221,7 @@ def run_ingest(course_name, s3_paths, base_url, url, readable_filename, content)
}
response = supabase_client.table('documents_failed').insert(document).execute() # type: ignore
print(f"Supabase ingest failure response: {response}")
else:
else:
# Success case: rebuild nomic document map after all ingests are done
rebuild_status = rebuild_map(str(course_name), map_type='document')

Expand Down Expand Up @@ -510,9 +506,14 @@ def _ingest_single_video(self, s3_path: str, course_name: str, **kwargs) -> str:
"""
print("Starting ingest video or audio")
try:
# Ensure the media directory exists
media_dir = "media"
if not os.path.exists(media_dir):
os.makedirs(media_dir)

# check for file extension
file_ext = Path(s3_path).suffix
openai.api_key = os.getenv('OPENAI_API_KEY')
openai.api_key = os.getenv('VLADS_OPENAI_KEY')
transcript_list = []
with NamedTemporaryFile(suffix=file_ext) as video_tmpfile:
# download from S3 into an video tmpfile
Expand All @@ -521,7 +522,7 @@ def _ingest_single_video(self, s3_path: str, course_name: str, **kwargs) -> str:
mp4_version = AudioSegment.from_file(video_tmpfile.name, file_ext[1:])

# save the extracted audio as a temporary webm file
with NamedTemporaryFile(suffix=".webm", dir="media", delete=False) as webm_tmpfile:
with NamedTemporaryFile(suffix=".webm", dir=media_dir, delete=False) as webm_tmpfile:
mp4_version.export(webm_tmpfile, format="webm")

# check file size
Expand All @@ -536,7 +537,7 @@ def _ingest_single_video(self, s3_path: str, course_name: str, **kwargs) -> str:
count = 0

while count < file_count:
with NamedTemporaryFile(suffix=".webm", dir="media", delete=False) as split_tmp:
with NamedTemporaryFile(suffix=".webm", dir=media_dir, delete=False) as split_tmp:
if count == file_count - 1:
# last segment
audio_chunk = full_audio[start:]
Expand Down Expand Up @@ -613,27 +614,33 @@ def _ingest_single_docx(self, s3_path: str, course_name: str, **kwargs) -> str:

def _ingest_single_srt(self, s3_path: str, course_name: str, **kwargs) -> str:
try:
with NamedTemporaryFile() as tmpfile:
# download from S3 into pdf_tmpfile
self.s3_client.download_fileobj(Bucket=os.getenv('S3_BUCKET_NAME'), Key=s3_path, Fileobj=tmpfile)
import pysrt

loader = SRTLoader(tmpfile.name)
documents = loader.load()
# NOTE: slightly different method for .txt files, no need for download. It's part of the 'body'
response = self.s3_client.get_object(Bucket=os.environ['S3_BUCKET_NAME'], Key=s3_path)
raw_text = response['Body'].read().decode('utf-8')

texts = [doc.page_content for doc in documents]
metadatas: List[Dict[str, Any]] = [{
'course_name': course_name,
's3_path': s3_path,
'readable_filename': kwargs.get('readable_filename',
Path(s3_path).name[37:]),
'pagenumber': '',
'timestamp': '',
'url': '',
'base_url': '',
} for doc in documents]
print("UTF-8 text to ingest as SRT:", raw_text)
parsed_info = pysrt.from_string(raw_text)
text = " ".join([t.text for t in parsed_info]) # type: ignore
print(f"Final SRT ingest: {text}")

self.split_and_upload(texts=texts, metadatas=metadatas)
return "Success"
texts = [text]
metadatas: List[Dict[str, Any]] = [{
'course_name': course_name,
's3_path': s3_path,
'readable_filename': kwargs.get('readable_filename',
Path(s3_path).name[37:]),
'pagenumber': '',
'timestamp': '',
'url': '',
'base_url': '',
}]
if len(text) == 0:
return "Error: SRT file appears empty. Skipping."

self.split_and_upload(texts=texts, metadatas=metadatas)
return "Success"
except Exception as e:
err = f"❌❌ Error in (SRT ingest): `{inspect.currentframe().f_code.co_name}`: {e}\nTraceback:\n", traceback.format_exc(
)
Expand Down

0 comments on commit e32ee20

Please sign in to comment.