diff --git a/ai_ta_backend/beam/ingest.py b/ai_ta_backend/beam/ingest.py index a23043a2..a9cb0a4c 100644 --- a/ai_ta_backend/beam/ingest.py +++ b/ai_ta_backend/beam/ingest.py @@ -31,7 +31,6 @@ Docx2txtLoader, GitLoader, PythonLoader, - SRTLoader, TextLoader, UnstructuredExcelLoader, UnstructuredPowerPointLoader, @@ -41,7 +40,6 @@ from langchain.schema import Document from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.vectorstores import Qdrant - from nomic_logging import delete_from_document_map, log_to_document_map, rebuild_map from OpenaiEmbeddings import OpenAIAPIProcessor from PIL import Image @@ -157,7 +155,7 @@ def loader(): loader=loader, autoscaler=autoscaler) def ingest(**inputs: Dict[str, Any]): - + qdrant_client, vectorstore, s3_client, supabase_client, posthog = inputs["context"] course_name: List[str] | str = inputs.get('course_name', '') @@ -173,8 +171,6 @@ def ingest(**inputs: Dict[str, Any]): ingester = Ingest(qdrant_client, vectorstore, s3_client, supabase_client, posthog) - - def run_ingest(course_name, s3_paths, base_url, url, readable_filename, content): if content: return ingester.ingest_single_web_text(course_name, base_url, url, content, readable_filename) @@ -204,7 +200,7 @@ def run_ingest(course_name, s3_paths, base_url, url, readable_filename, content) time.sleep(13 * retry_num) # max is 65 else: break - + # Final failure / success check if success_fail_dict['failure_ingest']: print(f"INGEST FAILURE -- About to send to supabase. success_fail_dict: {success_fail_dict}") @@ -225,7 +221,7 @@ def run_ingest(course_name, s3_paths, base_url, url, readable_filename, content) } response = supabase_client.table('documents_failed').insert(document).execute() # type: ignore print(f"Supabase ingest failure response: {response}") - else: + else: # Success case: rebuild nomic document map after all ingests are done rebuild_status = rebuild_map(str(course_name), map_type='document') @@ -510,9 +506,14 @@ def _ingest_single_video(self, s3_path: str, course_name: str, **kwargs) -> str: """ print("Starting ingest video or audio") try: + # Ensure the media directory exists + media_dir = "media" + if not os.path.exists(media_dir): + os.makedirs(media_dir) + # check for file extension file_ext = Path(s3_path).suffix - openai.api_key = os.getenv('OPENAI_API_KEY') + openai.api_key = os.getenv('VLADS_OPENAI_KEY') transcript_list = [] with NamedTemporaryFile(suffix=file_ext) as video_tmpfile: # download from S3 into an video tmpfile @@ -521,7 +522,7 @@ def _ingest_single_video(self, s3_path: str, course_name: str, **kwargs) -> str: mp4_version = AudioSegment.from_file(video_tmpfile.name, file_ext[1:]) # save the extracted audio as a temporary webm file - with NamedTemporaryFile(suffix=".webm", dir="media", delete=False) as webm_tmpfile: + with NamedTemporaryFile(suffix=".webm", dir=media_dir, delete=False) as webm_tmpfile: mp4_version.export(webm_tmpfile, format="webm") # check file size @@ -536,7 +537,7 @@ def _ingest_single_video(self, s3_path: str, course_name: str, **kwargs) -> str: count = 0 while count < file_count: - with NamedTemporaryFile(suffix=".webm", dir="media", delete=False) as split_tmp: + with NamedTemporaryFile(suffix=".webm", dir=media_dir, delete=False) as split_tmp: if count == file_count - 1: # last segment audio_chunk = full_audio[start:] @@ -613,27 +614,33 @@ def _ingest_single_docx(self, s3_path: str, course_name: str, **kwargs) -> str: def _ingest_single_srt(self, s3_path: str, course_name: str, **kwargs) -> str: try: - with NamedTemporaryFile() as tmpfile: - # download from S3 into pdf_tmpfile - self.s3_client.download_fileobj(Bucket=os.getenv('S3_BUCKET_NAME'), Key=s3_path, Fileobj=tmpfile) + import pysrt - loader = SRTLoader(tmpfile.name) - documents = loader.load() + # NOTE: slightly different method for .txt files, no need for download. It's part of the 'body' + response = self.s3_client.get_object(Bucket=os.environ['S3_BUCKET_NAME'], Key=s3_path) + raw_text = response['Body'].read().decode('utf-8') - texts = [doc.page_content for doc in documents] - metadatas: List[Dict[str, Any]] = [{ - 'course_name': course_name, - 's3_path': s3_path, - 'readable_filename': kwargs.get('readable_filename', - Path(s3_path).name[37:]), - 'pagenumber': '', - 'timestamp': '', - 'url': '', - 'base_url': '', - } for doc in documents] + print("UTF-8 text to ingest as SRT:", raw_text) + parsed_info = pysrt.from_string(raw_text) + text = " ".join([t.text for t in parsed_info]) # type: ignore + print(f"Final SRT ingest: {text}") - self.split_and_upload(texts=texts, metadatas=metadatas) - return "Success" + texts = [text] + metadatas: List[Dict[str, Any]] = [{ + 'course_name': course_name, + 's3_path': s3_path, + 'readable_filename': kwargs.get('readable_filename', + Path(s3_path).name[37:]), + 'pagenumber': '', + 'timestamp': '', + 'url': '', + 'base_url': '', + }] + if len(text) == 0: + return "Error: SRT file appears empty. Skipping." + + self.split_and_upload(texts=texts, metadatas=metadatas) + return "Success" except Exception as e: err = f"❌❌ Error in (SRT ingest): `{inspect.currentframe().f_code.co_name}`: {e}\nTraceback:\n", traceback.format_exc( )