Skip to content

Commit

Permalink
Improve Beam ingest retries and error handling, and log failures to S…
Browse files Browse the repository at this point in the history
…upabase
  • Loading branch information
KastanDay committed Mar 16, 2024
1 parent 93078ea commit 6e6fd0d
Showing 1 changed file with 55 additions and 20 deletions.
75 changes: 55 additions & 20 deletions ai_ta_backend/beam/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def loader():
# @app.rest_api(
@app.task_queue(
workers=4,
# callback_url='https://uiuc-chat-git-refactoringesttobeamserverless-kastanday.vercel.app/api/UIUC-api/ingestCallback',
callback_url='https://uiuc-chat-git-ingestprogresstracking-kastanday.vercel.app/api/UIUC-api/ingestTaskCallback',
max_pending_tasks=15_000,
max_retries=3,
timeout=-1,
Expand Down Expand Up @@ -193,15 +193,42 @@ def run_ingest(course_name, s3_paths, base_url, url, readable_filename, content)
# retries
num_retires = 5
for retry_num in range(1, num_retires):
if success_fail_dict['failure_ingest']:
if isinstance(success_fail_dict, str):
print(f"STRING ERROR: {success_fail_dict = }")
success_fail_dict = run_ingest(course_name, s3_paths, base_url, url, readable_filename, content)
time.sleep(13 * retry_num) # max is 65
elif success_fail_dict['failure_ingest']:
print(f"Ingest failure -- Retry attempt {retry_num}. File: {success_fail_dict}")
# s3_paths = success_fail_dict['failure_ingest'] # retry only failed paths.... what if this is a URL instead?
success_fail_dict = run_ingest(course_name, s3_paths, base_url, url, readable_filename, content)
time.sleep(13 * retry_num) # max is 65
else:
break

# rebuild nomic document map after all ingests are done
rebuild_status = rebuild_map(course_name, map_type='document')

# Final failure / success check
if success_fail_dict['failure_ingest']:
print(f"INGEST FAILURE -- About to send to supabase. success_fail_dict: {success_fail_dict}")
document = {
"course_name":
course_name,
"s3_path":
s3_paths,
"readable_filename":
readable_filename,
"url":
url,
"base_url":
base_url,
"error":
success_fail_dict['failure_ingest']['error']
if isinstance(success_fail_dict['failure_ingest'], dict) else success_fail_dict['failure_ingest']
}
response = supabase_client.table('documents_failed').insert(document).execute() # type: ignore
print(f"Supabase ingest failure response: {response}")
else:
# Success case: rebuild nomic document map after all ingests are done
rebuild_status = rebuild_map(str(course_name), map_type='document')

print(f"Final success_fail_dict: {success_fail_dict}")
return json.dumps(success_fail_dict)

Expand All @@ -215,19 +242,21 @@ def __init__(self, qdrant_client, vectorstore, s3_client, supabase_client, posth
self.supabase_client = supabase_client
self.posthog = posthog

def bulk_ingest(self, course_name: str, s3_paths: Union[str, List[str]], **kwargs) -> Dict:
def bulk_ingest(self, course_name: str, s3_paths: Union[str, List[str]],
**kwargs) -> Dict[str, None | str | Dict[str, str]]:
"""
Bulk ingest a list of s3 paths into the vectorstore, and also into the supabase database.
-> Dict[str, str | Dict[str, str]]
"""

def _ingest_single(ingest_method: Callable, s3_path, *args, **kwargs):
"""Handle running an arbitrary ingest function for an individual file."""
# RUN INGEST METHOD
ret = ingest_method(s3_path, *args, **kwargs)
if ret == "Success":
success_status['success_ingest'].append(s3_path)
success_status['success_ingest'] = str(s3_path)
else:
success_status['failure_ingest'].append(s3_path)
success_status['failure_ingest'] = {'s3_path': str(s3_path), 'error': str(ret)}

# 👇👇👇👇 ADD NEW INGEST METHODS HERE 👇👇👇👇🎉
file_ingest_methods = {
Expand Down Expand Up @@ -258,7 +287,7 @@ def _ingest_single(ingest_method: Callable, s3_path, *args, **kwargs):
# 👆👆👆👆 ADD NEW INGEST METHODhe 👆👆👆👆🎉

print(f"Top of ingest, Course_name {course_name}. S3 paths {s3_paths}")
success_status = {"success_ingest": [], "failure_ingest": []}
success_status: Dict[str, None | str | Dict[str, str]] = {"success_ingest": None, "failure_ingest": None}
try:
if isinstance(s3_paths, str):
s3_paths = [s3_paths]
Expand All @@ -283,15 +312,18 @@ def _ingest_single(ingest_method: Callable, s3_path, *args, **kwargs):
# No supported ingest... Fallback to attempting utf-8 decoding, otherwise fail.
try:
self._ingest_single_txt(s3_path, course_name)
success_status['success_ingest'].append(s3_path)
success_status['success_ingest'] = s3_path
print(f"No ingest methods -- Falling back to UTF-8 INGEST... s3_path = {s3_path}")
except Exception as e:
print(
f"We don't have a ingest method for this filetype: {file_extension}. As a last-ditch effort, we tried to ingest the file as utf-8 text, but that failed too. File is unsupported: {s3_path}. UTF-8 ingest error: {e}"
)
success_status['failure_ingest'].append(
f"We don't have a ingest method for this filetype: {file_extension} (with generic type {mime_type}), for file: {s3_path}"
)
success_status['failure_ingest'] = {
's3_path':
s3_path,
'error':
f"We don't have a ingest method for this filetype: {file_extension} (with generic type {mime_type}), for file: {s3_path}"
}
self.posthog.capture(
'distinct_id_of_the_user',
event='ingest_failure',
Expand All @@ -308,9 +340,10 @@ def _ingest_single(ingest_method: Callable, s3_path, *args, **kwargs):

return success_status
except Exception as e:
err = f"❌❌ Error in /ingest: `{inspect.currentframe().f_code.co_name}`: {e}\nTraceback:\n", traceback.format_exc()
err = f"❌❌ Error in /ingest: `{inspect.currentframe().f_code.co_name}`: {e}\nTraceback:\n", traceback.format_exc(
) # type: ignore

success_status['failure_ingest'].append(f"MAJOR ERROR IN /bulk_ingest: Error: {err}")
success_status['failure_ingest'] = {'s3_path': s3_path, 'error': f"MAJOR ERROR DURING INGEST: {err}"}
self.posthog.capture('distinct_id_of_the_user',
event='ingest_failure',
properties={
Expand All @@ -321,7 +354,7 @@ def _ingest_single(ingest_method: Callable, s3_path, *args, **kwargs):
})

sentry_sdk.capture_exception(e)
print(f"MAJOR ERROR IN /bulk_ingest: Error: {str(e)}")
print(f"MAJOR ERROR IN /bulk_ingest: {str(e)}")
return success_status

def ingest_single_web_text(self, course_name: str, base_url: str, url: str, content: str, readable_filename: str):
Expand All @@ -336,6 +369,7 @@ def ingest_single_web_text(self, course_name: str, base_url: str, url: str, cont
'content': content,
'title': readable_filename
})
success_or_failure: Dict[str, None | str | Dict[str, str]] = {"success_ingest": None, "failure_ingest": None}
try:
# if not, ingest the text
text = [content]
Expand All @@ -358,14 +392,15 @@ def ingest_single_web_text(self, course_name: str, base_url: str, url: str, cont
'title': readable_filename
})

return f"✅ Success for web text. title: {readable_filename}, url: {url}, "
success_or_failure['success_ingest'] = url
return success_or_failure
except Exception as e:

err = f"❌❌ Error in (web text ingest): `{inspect.currentframe().f_code.co_name}`: {e}\nTraceback:\n", traceback.format_exc(
)
) # type: ignore
print(err)
sentry_sdk.capture_exception(e)
return str(err)
success_or_failure['failure_ingest'] = {'url': url, 'error': str(err)}
return success_or_failure

def _ingest_single_py(self, s3_path: str, course_name: str, **kwargs):
try:
Expand Down

0 comments on commit 6e6fd0d

Please sign in to comment.