diff --git a/ai_ta_backend/beam/ingest.py b/ai_ta_backend/beam/ingest.py index 72aae20c..11477b25 100644 --- a/ai_ta_backend/beam/ingest.py +++ b/ai_ta_backend/beam/ingest.py @@ -10,6 +10,7 @@ import os import re import shutil +import time import traceback import uuid from pathlib import Path @@ -145,7 +146,8 @@ def loader(): # Triggers determine how your app is deployed -@app.rest_api( +# @app.rest_api( +@app.task_queue( workers=4, # callback_url='https://uiuc-chat-git-refactoringesttobeamserverless-kastanday.vercel.app/api/UIUC-api/ingestCallback', max_pending_tasks=15_000, @@ -169,17 +171,31 @@ def ingest(**inputs: Dict[str, Any]): ingester = Ingest(qdrant_client, vectorstore, s3_client, supabase_client, posthog) - if content: - success_fail_dict = ingester.ingest_single_web_text(course_name, base_url, url, content, readable_filename) - elif readable_filename == '': - success_fail_dict = ingester.bulk_ingest(course_name, s3_paths, base_url=base_url, url=url) - else: - success_fail_dict = ingester.bulk_ingest(course_name, - s3_paths, - readable_filename=readable_filename, - base_url=base_url, - url=url) - print("Final success_fail_dict: ", success_fail_dict) + def run_ingest(course_name, s3_paths, base_url, url, readable_filename, content): + if content: + return ingester.ingest_single_web_text(course_name, base_url, url, content, readable_filename) + elif readable_filename == '': + return ingester.bulk_ingest(course_name, s3_paths, base_url=base_url, url=url) + else: + return ingester.bulk_ingest(course_name, + s3_paths, + readable_filename=readable_filename, + base_url=base_url, + url=url) + + # First try + success_fail_dict = run_ingest(course_name, s3_paths, base_url, url, readable_filename, content) + + # retries + num_retires = 5 + for retry_num in range(1, num_retires): + if success_fail_dict['failure_ingest']: + print(f"Ingest failure -- Retry attempt {retry_num}. File: {success_fail_dict}") + # s3_paths = success_fail_dict['failure_ingest'] # retry only failed paths.... what if this is a URL instead? + success_fail_dict = run_ingest(course_name, s3_paths, base_url, url, readable_filename, content) + time.sleep(13 * retry_num) # max is 65 + + print(f"Final success_fail_dict: {success_fail_dict}") return json.dumps(success_fail_dict) @@ -876,8 +892,7 @@ def split_and_upload(self, texts: List[str], metadatas: List[Dict[str, Any]]): 'base_url': metadatas[0].get('base_url', None), }) - print("In split and upload") - print(f"metadatas: {metadatas}") + print(f"In split and upload. Metadatas: {metadatas}") print(f"Texts: {texts}") assert len(texts) == len( metadatas