Skip to content

Commit

Permalink
Fix: On Ingest, move all failure logging to Task Callback (on fronten…
Browse files Browse the repository at this point in the history
…d). All success logging happens in Beam.
  • Loading branch information
KastanDay committed Jul 22, 2024
1 parent 4f2c032 commit 3ba86a6
Showing 1 changed file with 32 additions and 30 deletions.
62 changes: 32 additions & 30 deletions ai_ta_backend/beam/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,14 @@ def loader():

# Triggers determine how your app is deployed
# @app.rest_api(
@app.task_queue(workers=4,
max_pending_tasks=15_000,
callback_url='https://uiuc.chat/api/UIUC-api/ingestTaskCallback',
timeout=60 * 15,
max_retries=3,
loader=loader,
autoscaler=autoscaler)
@app.task_queue(
workers=4,
max_pending_tasks=15_000,
callback_url='https://uiuc.chat/api/UIUC-api/ingestTaskCallback',
timeout=60 * 15,
max_retries=0, # change to 3
loader=loader,
autoscaler=autoscaler)
def ingest(**inputs: Dict[str, Any]):

qdrant_client, vectorstore, s3_client, supabase_client, posthog = inputs["context"]
Expand Down Expand Up @@ -194,7 +195,7 @@ def run_ingest(course_name, s3_paths, base_url, url, readable_filename, content)
success_fail_dict = run_ingest(course_name, s3_paths, base_url, url, readable_filename, content)

# retries
num_retires = 5
num_retires = 3
for retry_num in range(1, num_retires):
if isinstance(success_fail_dict, str):
print(f"STRING ERROR: {success_fail_dict = }")
Expand All @@ -211,23 +212,24 @@ def run_ingest(course_name, s3_paths, base_url, url, readable_filename, content)
# Final failure / success check
if success_fail_dict['failure_ingest']:
print(f"INGEST FAILURE -- About to send to supabase. success_fail_dict: {success_fail_dict}")
document = {
"course_name":
course_name,
"s3_path":
s3_paths,
"readable_filename":
readable_filename,
"url":
url,
"base_url":
base_url,
"error":
success_fail_dict['failure_ingest']['error']
if isinstance(success_fail_dict['failure_ingest'], dict) else success_fail_dict['failure_ingest']
}
response = supabase_client.table('documents_failed').insert(document).execute() # type: ignore
print(f"Supabase ingest failure response: {response}")
# Failure logging done in TaskCallback now, from frontend.
# document = {
# "course_name":
# course_name,
# "s3_path":
# s3_paths,
# "readable_filename":
# readable_filename,
# "url":
# url,
# "base_url":
# base_url,
# "error":
# success_fail_dict['failure_ingest']['error']
# if isinstance(success_fail_dict['failure_ingest'], dict) else success_fail_dict['failure_ingest']
# }
# response = supabase_client.table('documents_failed').insert(document).execute() # type: ignore
# print(f"Supabase ingest failure response: {response}")
else:
# Success case: rebuild nomic document map after all ingests are done
# rebuild_status = rebuild_map(str(course_name), map_type='document')
Expand Down Expand Up @@ -1163,7 +1165,7 @@ def check_for_duplicates(self, texts: List[Dict], metadatas: List[Dict[str, Any]
if incoming_s3_path:
# check if uuid exists in s3_path -- not all s3_paths have uuids!
incoming_filename = incoming_s3_path.split('/')[-1]
print("Full filename: ", incoming_filename)
# print("Full filename: ", incoming_filename)
pattern = re.compile(r'[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}',
re.I) # uuid V4 pattern, and v4 only.
if bool(pattern.search(incoming_filename)):
Expand All @@ -1172,20 +1174,20 @@ def check_for_duplicates(self, texts: List[Dict], metadatas: List[Dict[str, Any]
else:
# do not remove anything and proceed with duplicate checking
original_filename = incoming_filename
print("Filename after removing uuid: ", original_filename)
print(f"Filename after removing uuid: {original_filename}")

supabase_contents = self.supabase_client.table(doc_table).select('id', 'contexts', 's3_path').eq(
'course_name', course_name).like('s3_path', '%' + original_filename + '%').order('id', desc=True).execute()
supabase_contents = supabase_contents.data
print("No. of S3 path based records retrieved: ",
len(supabase_contents)) # multiple records can be retrieved: 3.pdf and 453.pdf
print(f"No. of S3 path based records retrieved: {len(supabase_contents)}"
) # multiple records can be retrieved: 3.pdf and 453.pdf

elif url:
original_filename = url
supabase_contents = self.supabase_client.table(doc_table).select('id', 'contexts', 'url').eq(
'course_name', course_name).eq('url', url).order('id', desc=True).execute()
supabase_contents = supabase_contents.data
print("No. of URL-based records retrieved: ", len(supabase_contents))
print(f"No. of URL-based records retrieved: {len(supabase_contents)}")
else:
original_filename = None
supabase_contents = []
Expand Down

0 comments on commit 3ba86a6

Please sign in to comment.