Skip to content

Commit

Permalink
Merge branch 'main' into refactor-nomic-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
star-nox authored Mar 13, 2024
2 parents c7233a1 + c249036 commit 689dc4a
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 92 deletions.
41 changes: 22 additions & 19 deletions ai_ta_backend/beam/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""
import asyncio
import inspect
import json
import logging
import mimetypes
import os
Expand All @@ -13,7 +14,7 @@
import uuid
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Any, Callable, Dict, List, Union
from typing import Any, Callable, Dict, List, Optional, Union

import beam
import boto3
Expand All @@ -39,19 +40,15 @@
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import Qdrant
from ai_ta_backend.beam.OpenaiEmbeddings import OpenAIAPIProcessor

from nomic_logging import delete_from_document_map, log_to_document_map, rebuild_map
from OpenaiEmbeddings import OpenAIAPIProcessor
from PIL import Image
from posthog import Posthog
from pydub import AudioSegment
from qdrant_client import QdrantClient, models
from qdrant_client.models import PointStruct

from ai_ta_backend.beam.nomic_logging import (
delete_from_document_map,
log_to_document_map,
rebuild_map
)

# from langchain.schema.output_parser import StrOutputParser
# from langchain.chat_models import AzureChatOpenAI

Expand Down Expand Up @@ -149,7 +146,14 @@ def loader():


# Triggers determine how your app is deployed
@app.rest_api(workers=2, max_pending_tasks=15_000, max_retries=3, timeout=-1, loader=loader, autoscaler=autoscaler)
@app.rest_api(
workers=4,
# callback_url='https://uiuc-chat-git-refactoringesttobeamserverless-kastanday.vercel.app/api/UIUC-api/ingestCallback',
max_pending_tasks=15_000,
max_retries=3,
timeout=-1,
loader=loader,
autoscaler=autoscaler)
def ingest(**inputs: Dict[str, Any]):

qdrant_client, vectorstore, s3_client, supabase_client, posthog = inputs["context"]
Expand All @@ -159,8 +163,7 @@ def ingest(**inputs: Dict[str, Any]):
url: List[str] | str | None = inputs.get('url', None)
base_url: List[str] | str | None = inputs.get('base_url', None)
readable_filename: List[str] | str = inputs.get('readable_filename', '')
content: str | None = inputs.get('content', None) # is webtext
# is_webtext: bool | None = inputs.get('url', False)
content: str | None = inputs.get('content', None) # is webtext if content exists

print(
f"In top of /ingest route. course: {course_name}, s3paths: {s3_paths}, readable_filename: {readable_filename}, base_url: {base_url}, url: {url}, content: {content}"
Expand All @@ -182,7 +185,8 @@ def ingest(**inputs: Dict[str, Any]):

# rebuild nomic document map after all ingests are done
rebuild_status = rebuild_map(course_name, map_type='document')
return success_fail_dict

return json.dumps(success_fail_dict)


class Ingest():
Expand Down Expand Up @@ -247,7 +251,7 @@ def _ingest_single(ingest_method: Callable, s3_path, *args, **kwargs):
with NamedTemporaryFile(suffix=file_extension) as tmpfile:
self.s3_client.download_fileobj(Bucket=os.environ['S3_BUCKET_NAME'], Key=s3_path, Fileobj=tmpfile)
mime_type = str(mimetypes.guess_type(tmpfile.name, strict=False)[0])
mime_category, mime_subcategory = mime_type.split('/')
mime_category = mime_type.split('/')[0] if '/' in mime_type else mime_type

if file_extension in file_ingest_methods:
# Use specialized functions when possible, fallback to mimetype. Else raise error.
Expand All @@ -263,7 +267,7 @@ def _ingest_single(ingest_method: Callable, s3_path, *args, **kwargs):
try:
self._ingest_single_txt(s3_path, course_name)
success_status['success_ingest'].append(s3_path)
print("✅ FALLBACK TO UTF-8 INGEST WAS SUCCESSFUL :) ")
print(f"No ingest methods -- Falling back to UTF-8 INGEST... s3_path = {s3_path}")
except Exception as e:
print(
f"We don't have a ingest method for this filetype: {file_extension}. As a last-ditch effort, we tried to ingest the file as utf-8 text, but that failed too. File is unsupported: {s3_path}. UTF-8 ingest error: {e}"
Expand All @@ -273,7 +277,7 @@ def _ingest_single(ingest_method: Callable, s3_path, *args, **kwargs):
)
self.posthog.capture(
'distinct_id_of_the_user',
event='Ingest Failure',
event='ingest_failure',
properties={
'course_name':
course_name,
Expand All @@ -291,7 +295,7 @@ def _ingest_single(ingest_method: Callable, s3_path, *args, **kwargs):

success_status['failure_ingest'].append(f"MAJOR ERROR IN /bulk_ingest: Error: {err}")
self.posthog.capture('distinct_id_of_the_user',
event='Ingest Failure',
event='ingest_failure',
properties={
'course_name': course_name,
's3_path': s3_paths,
Expand Down Expand Up @@ -753,13 +757,12 @@ def _ingest_single_txt(self, s3_path: str, course_name: str, **kwargs) -> str:
Returns:
str: "Success" or an error message
"""
print("In text ingest")
print("In text ingest, UTF-8")
try:
# NOTE: slightly different method for .txt files, no need for download. It's part of the 'body'
response = self.s3_client.get_object(Bucket=os.environ['S3_BUCKET_NAME'], Key=s3_path)
print("s3 Resonse:", response)
text = response['Body'].read().decode('utf-8')
print("Text from s3:", text)
print("UTF-8 text to ignest (from s3)", text)
text = [text]

metadatas: List[Dict[str, Any]] = [{
Expand Down
Loading

0 comments on commit 689dc4a

Please sign in to comment.