Skip to content

Commit

Permalink
Merge branch 'add_beam_serverless_ingest' of github.com:UIUC-Chatbot/…
Browse files Browse the repository at this point in the history
…ai-ta-backend into dependency_injection
  • Loading branch information
rohan-uiuc committed Mar 7, 2024
2 parents f0542a8 + 3281c70 commit f40d533
Showing 1 changed file with 35 additions and 8 deletions.
43 changes: 35 additions & 8 deletions ai_ta_backend/beam/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@
from qdrant_client import QdrantClient, models
from qdrant_client.models import PointStruct

from ai_ta_backend.beam.nomic_logging import (
delete_from_document_map,
log_to_document_map,
)

# from langchain.schema.output_parser import StrOutputParser
# from langchain.chat_models import AzureChatOpenAI

Expand All @@ -72,6 +77,7 @@
"GitPython==3.1.40",
"beautifulsoup4==4.12.2",
"sentry-sdk==1.39.1",
"nomic==2.0.14",
]

# TODO: consider adding workers. They share CPU and memory https://docs.beam.cloud/deployment/autoscaling#worker-use-cases
Expand Down Expand Up @@ -260,10 +266,34 @@ def _ingest_single(ingest_method: Callable, s3_path, *args, **kwargs):
success_status['failure_ingest'].append(
f"We don't have a ingest method for this filetype: {file_extension} (with generic type {mime_type}), for file: {s3_path}"
)
self.posthog.capture(
'distinct_id_of_the_user',
event='Ingest Failure',
properties={
'course_name':
course_name,
's3_path':
s3_paths,
'kwargs':
kwargs,
'error':
f"We don't have a ingest method for this filetype: {file_extension} (with generic type {mime_type}), for file: {s3_path}"
})

return success_status
except Exception as e:
success_status['failure_ingest'].append(f"MAJOR ERROR IN /bulk_ingest: Error: {str(e)}")
err = f"❌❌ Error in /ingest: `{inspect.currentframe().f_code.co_name}`: {e}\nTraceback:\n", traceback.format_exc()

success_status['failure_ingest'].append(f"MAJOR ERROR IN /bulk_ingest: Error: {err}")
self.posthog.capture('distinct_id_of_the_user',
event='Ingest Failure',
properties={
'course_name': course_name,
's3_path': s3_paths,
'kwargs': kwargs,
'error': err
})

sentry_sdk.capture_exception(e)
print(f"MAJOR ERROR IN /bulk_ingest: Error: {str(e)}")
return success_status
Expand Down Expand Up @@ -933,10 +963,8 @@ def split_and_upload(self, texts: List[str], metadatas: List[Dict[str, Any]]):

# add to Nomic document map
if len(response.data) > 0:
pass
# TODO: reimplement nomic
# inserted_data = response.data[0]
# res = log_to_document_map(inserted_data)
inserted_data = response.data[0]
log_to_document_map(inserted_data)

self.posthog.capture('distinct_id_of_the_user',
event='split_and_upload_succeeded',
Expand Down Expand Up @@ -1068,7 +1096,7 @@ def delete_data(self, course_name: str, s3_path: str, source_url: str):
nomic_ids_to_delete.append(str(data['id']) + "_" + str(i))

# delete from Nomic
# res = delete_from_document_map(course_name, nomic_ids_to_delete)
delete_from_document_map(course_name, nomic_ids_to_delete)
except Exception as e:
print("Error in deleting file from Nomic:", e)
sentry_sdk.capture_exception(e)
Expand Down Expand Up @@ -1113,8 +1141,7 @@ def delete_data(self, course_name: str, s3_path: str, source_url: str):
nomic_ids_to_delete.append(str(data['id']) + "_" + str(i))

# delete from Nomic
# TODO: reimplement...
# res = delete_from_document_map(course_name, nomic_ids_to_delete)
delete_from_document_map(course_name, nomic_ids_to_delete)
except Exception as e:
print("Error in deleting file from Nomic:", e)
sentry_sdk.capture_exception(e)
Expand Down

0 comments on commit f40d533

Please sign in to comment.