Skip to content

Commit

Permalink
Merge branch 'main' into refactor-nomic-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
star-nox authored Mar 15, 2024
2 parents 689dc4a + 34ad712 commit 259e4ea
Showing 1 changed file with 31 additions and 15 deletions.
46 changes: 31 additions & 15 deletions ai_ta_backend/beam/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import os
import re
import shutil
import time
import traceback
import uuid
from pathlib import Path
Expand Down Expand Up @@ -146,7 +147,8 @@ def loader():


# Triggers determine how your app is deployed
@app.rest_api(
# @app.rest_api(
@app.task_queue(
workers=4,
# callback_url='https://uiuc-chat-git-refactoringesttobeamserverless-kastanday.vercel.app/api/UIUC-api/ingestCallback',
max_pending_tasks=15_000,
Expand All @@ -171,21 +173,36 @@ def ingest(**inputs: Dict[str, Any]):

ingester = Ingest(qdrant_client, vectorstore, s3_client, supabase_client, posthog)

if content:
success_fail_dict = ingester.ingest_single_web_text(course_name, base_url, url, content, readable_filename)
elif readable_filename == '':
success_fail_dict = ingester.bulk_ingest(course_name, s3_paths, base_url=base_url, url=url)
else:
success_fail_dict = ingester.bulk_ingest(course_name,
s3_paths,
readable_filename=readable_filename,
base_url=base_url,
url=url)
print("Final success_fail_dict: ", success_fail_dict)


def run_ingest(course_name, s3_paths, base_url, url, readable_filename, content):
if content:
return ingester.ingest_single_web_text(course_name, base_url, url, content, readable_filename)
elif readable_filename == '':
return ingester.bulk_ingest(course_name, s3_paths, base_url=base_url, url=url)
else:
return ingester.bulk_ingest(course_name,
s3_paths,
readable_filename=readable_filename,
base_url=base_url,
url=url)

# First try
success_fail_dict = run_ingest(course_name, s3_paths, base_url, url, readable_filename, content)

# retries
num_retires = 5
for retry_num in range(1, num_retires):
if success_fail_dict['failure_ingest']:
print(f"Ingest failure -- Retry attempt {retry_num}. File: {success_fail_dict}")
# s3_paths = success_fail_dict['failure_ingest'] # retry only failed paths.... what if this is a URL instead?
success_fail_dict = run_ingest(course_name, s3_paths, base_url, url, readable_filename, content)
time.sleep(13 * retry_num) # max is 65

# rebuild nomic document map after all ingests are done
rebuild_status = rebuild_map(course_name, map_type='document')


print(f"Final success_fail_dict: {success_fail_dict}")
return json.dumps(success_fail_dict)


Expand Down Expand Up @@ -882,8 +899,7 @@ def split_and_upload(self, texts: List[str], metadatas: List[Dict[str, Any]]):
'base_url': metadatas[0].get('base_url', None),
})

print("In split and upload")
print(f"metadatas: {metadatas}")
print(f"In split and upload. Metadatas: {metadatas}")
print(f"Texts: {texts}")
assert len(texts) == len(
metadatas
Expand Down

0 comments on commit 259e4ea

Please sign in to comment.