From 3ba86a6a92b2751853ccbc3506682c779f7ca09a Mon Sep 17 00:00:00 2001 From: Kastan Day Date: Mon, 22 Jul 2024 11:11:27 -0700 Subject: [PATCH] Fix: On Ingest, move all failure logging to Task Callback (on frontend). All success logging happens in Beam. --- ai_ta_backend/beam/ingest.py | 62 +++++++++++++++++++----------------- 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/ai_ta_backend/beam/ingest.py b/ai_ta_backend/beam/ingest.py index 37bc90c7..830bcc8a 100644 --- a/ai_ta_backend/beam/ingest.py +++ b/ai_ta_backend/beam/ingest.py @@ -154,13 +154,14 @@ def loader(): # Triggers determine how your app is deployed # @app.rest_api( -@app.task_queue(workers=4, - max_pending_tasks=15_000, - callback_url='https://uiuc.chat/api/UIUC-api/ingestTaskCallback', - timeout=60 * 15, - max_retries=3, - loader=loader, - autoscaler=autoscaler) +@app.task_queue( + workers=4, + max_pending_tasks=15_000, + callback_url='https://uiuc.chat/api/UIUC-api/ingestTaskCallback', + timeout=60 * 15, + max_retries=0, # change to 3 + loader=loader, + autoscaler=autoscaler) def ingest(**inputs: Dict[str, Any]): qdrant_client, vectorstore, s3_client, supabase_client, posthog = inputs["context"] @@ -194,7 +195,7 @@ def run_ingest(course_name, s3_paths, base_url, url, readable_filename, content) success_fail_dict = run_ingest(course_name, s3_paths, base_url, url, readable_filename, content) # retries - num_retires = 5 + num_retires = 3 for retry_num in range(1, num_retires): if isinstance(success_fail_dict, str): print(f"STRING ERROR: {success_fail_dict = }") @@ -211,23 +212,24 @@ def run_ingest(course_name, s3_paths, base_url, url, readable_filename, content) # Final failure / success check if success_fail_dict['failure_ingest']: print(f"INGEST FAILURE -- About to send to supabase. success_fail_dict: {success_fail_dict}") - document = { - "course_name": - course_name, - "s3_path": - s3_paths, - "readable_filename": - readable_filename, - "url": - url, - "base_url": - base_url, - "error": - success_fail_dict['failure_ingest']['error'] - if isinstance(success_fail_dict['failure_ingest'], dict) else success_fail_dict['failure_ingest'] - } - response = supabase_client.table('documents_failed').insert(document).execute() # type: ignore - print(f"Supabase ingest failure response: {response}") + # Failure logging done in TaskCallback now, from frontend. + # document = { + # "course_name": + # course_name, + # "s3_path": + # s3_paths, + # "readable_filename": + # readable_filename, + # "url": + # url, + # "base_url": + # base_url, + # "error": + # success_fail_dict['failure_ingest']['error'] + # if isinstance(success_fail_dict['failure_ingest'], dict) else success_fail_dict['failure_ingest'] + # } + # response = supabase_client.table('documents_failed').insert(document).execute() # type: ignore + # print(f"Supabase ingest failure response: {response}") else: # Success case: rebuild nomic document map after all ingests are done # rebuild_status = rebuild_map(str(course_name), map_type='document') @@ -1163,7 +1165,7 @@ def check_for_duplicates(self, texts: List[Dict], metadatas: List[Dict[str, Any] if incoming_s3_path: # check if uuid exists in s3_path -- not all s3_paths have uuids! incoming_filename = incoming_s3_path.split('/')[-1] - print("Full filename: ", incoming_filename) + # print("Full filename: ", incoming_filename) pattern = re.compile(r'[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}', re.I) # uuid V4 pattern, and v4 only. if bool(pattern.search(incoming_filename)): @@ -1172,20 +1174,20 @@ def check_for_duplicates(self, texts: List[Dict], metadatas: List[Dict[str, Any] else: # do not remove anything and proceed with duplicate checking original_filename = incoming_filename - print("Filename after removing uuid: ", original_filename) + print(f"Filename after removing uuid: {original_filename}") supabase_contents = self.supabase_client.table(doc_table).select('id', 'contexts', 's3_path').eq( 'course_name', course_name).like('s3_path', '%' + original_filename + '%').order('id', desc=True).execute() supabase_contents = supabase_contents.data - print("No. of S3 path based records retrieved: ", - len(supabase_contents)) # multiple records can be retrieved: 3.pdf and 453.pdf + print(f"No. of S3 path based records retrieved: {len(supabase_contents)}" + ) # multiple records can be retrieved: 3.pdf and 453.pdf elif url: original_filename = url supabase_contents = self.supabase_client.table(doc_table).select('id', 'contexts', 'url').eq( 'course_name', course_name).eq('url', url).order('id', desc=True).execute() supabase_contents = supabase_contents.data - print("No. of URL-based records retrieved: ", len(supabase_contents)) + print(f"No. of URL-based records retrieved: {len(supabase_contents)}") else: original_filename = None supabase_contents = []