Skip to content

Commit

Permalink
Merge branch 'add_beam_serverless_ingest' of github.com:UIUC-Chatbot/…
Browse files Browse the repository at this point in the history
…ai-ta-backend into dependency_injection
  • Loading branch information
rohan-uiuc committed Mar 12, 2024
2 parents d6976fe + 107eef5 commit cf5e4df
Show file tree
Hide file tree
Showing 2 changed files with 759 additions and 13 deletions.
29 changes: 16 additions & 13 deletions ai_ta_backend/beam/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""
import asyncio
import inspect
import json
import logging
import mimetypes
import os
Expand All @@ -13,7 +14,7 @@
import uuid
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Any, Callable, Dict, List, Union
from typing import Any, Callable, Dict, List, Optional, Union

import beam
import boto3
Expand All @@ -39,18 +40,14 @@
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import Qdrant
from nomic_logging import delete_from_document_map, log_to_document_map
from OpenaiEmbeddings import OpenAIAPIProcessor
from PIL import Image
from posthog import Posthog
from pydub import AudioSegment
from qdrant_client import QdrantClient, models
from qdrant_client.models import PointStruct

from ai_ta_backend.beam.nomic_logging import (
delete_from_document_map,
log_to_document_map,
)

# from langchain.schema.output_parser import StrOutputParser
# from langchain.chat_models import AzureChatOpenAI

Expand Down Expand Up @@ -148,7 +145,14 @@ def loader():


# Triggers determine how your app is deployed
@app.rest_api(workers=2, max_pending_tasks=15_000, max_retries=3, timeout=-1, loader=loader, autoscaler=autoscaler)
@app.rest_api(
workers=4,
# callback_url='https://uiuc-chat-git-refactoringesttobeamserverless-kastanday.vercel.app/api/UIUC-api/ingestCallback',
max_pending_tasks=15_000,
max_retries=3,
timeout=-1,
loader=loader,
autoscaler=autoscaler)
def ingest(**inputs: Dict[str, Any]):
qdrant_client, vectorstore, s3_client, supabase_client, posthog = inputs["context"]

Expand All @@ -157,8 +161,7 @@ def ingest(**inputs: Dict[str, Any]):
url: List[str] | str | None = inputs.get('url', None)
base_url: List[str] | str | None = inputs.get('base_url', None)
readable_filename: List[str] | str = inputs.get('readable_filename', '')
content: str | None = inputs.get('content', None) # is webtext
# is_webtext: bool | None = inputs.get('url', False)
content: str | None = inputs.get('content', None) # is webtext if content exists

print(
f"In top of /ingest route. course: {course_name}, s3paths: {s3_paths}, readable_filename: {readable_filename}, base_url: {base_url}, url: {url}, content: {content}"
Expand All @@ -177,7 +180,7 @@ def ingest(**inputs: Dict[str, Any]):
base_url=base_url,
url=url)
print("Final success_fail_dict: ", success_fail_dict)
return success_fail_dict
return json.dumps(success_fail_dict)


class Ingest():
Expand Down Expand Up @@ -242,7 +245,7 @@ def _ingest_single(ingest_method: Callable, s3_path, *args, **kwargs):
with NamedTemporaryFile(suffix=file_extension) as tmpfile:
self.s3_client.download_fileobj(Bucket=os.environ['S3_BUCKET_NAME'], Key=s3_path, Fileobj=tmpfile)
mime_type = str(mimetypes.guess_type(tmpfile.name, strict=False)[0])
mime_category, mime_subcategory = mime_type.split('/')
mime_category = mime_type.split('/')[0] if '/' in mime_type else mime_type

if file_extension in file_ingest_methods:
# Use specialized functions when possible, fallback to mimetype. Else raise error.
Expand All @@ -268,7 +271,7 @@ def _ingest_single(ingest_method: Callable, s3_path, *args, **kwargs):
)
self.posthog.capture(
'distinct_id_of_the_user',
event='Ingest Failure',
event='ingest_failure',
properties={
'course_name':
course_name,
Expand All @@ -286,7 +289,7 @@ def _ingest_single(ingest_method: Callable, s3_path, *args, **kwargs):

success_status['failure_ingest'].append(f"MAJOR ERROR IN /bulk_ingest: Error: {err}")
self.posthog.capture('distinct_id_of_the_user',
event='Ingest Failure',
event='ingest_failure',
properties={
'course_name': course_name,
's3_path': s3_paths,
Expand Down
Loading

0 comments on commit cf5e4df

Please sign in to comment.