Skip to content

Commit

Permalink
Enhancement to Ingest: ignore utf-8 errors, properly throw errors in …
Browse files Browse the repository at this point in the history
…split_and_upload()
  • Loading branch information
KastanDay committed Jul 18, 2024
1 parent 602dce7 commit 4f2c032
Showing 1 changed file with 56 additions and 31 deletions.
87 changes: 56 additions & 31 deletions ai_ta_backend/beam/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import Qdrant

#from nomic_logging import delete_from_document_map, log_to_document_map, rebuild_map
from OpenaiEmbeddings import OpenAIAPIProcessor
from PIL import Image
from posthog import Posthog
from pydub import AudioSegment
from qdrant_client import QdrantClient, models
from qdrant_client.models import PointStruct
from supabase.client import ClientOptions

# from langchain.schema.output_parser import StrOutputParser
# from langchain.chat_models import AzureChatOpenAI
Expand Down Expand Up @@ -80,16 +82,17 @@
]

# TODO: consider adding workers. They share CPU and memory https://docs.beam.cloud/deployment/autoscaling#worker-use-cases
app = App("ingest",
runtime=Runtime(
cpu=1,
memory="3Gi",
image=beam.Image(
python_version="python3.10",
python_packages=requirements,
commands=["apt-get update && apt-get install -y ffmpeg tesseract-ocr"],
),
))
app = App(
"ingest",
runtime=Runtime(
cpu=1,
memory="3Gi", # 3
image=beam.Image(
python_version="python3.10",
python_packages=requirements,
commands=["apt-get update && apt-get install -y ffmpeg tesseract-ocr"],
),
))


def loader():
Expand Down Expand Up @@ -120,7 +123,9 @@ def loader():

# Create a Supabase client
supabase_client = supabase.create_client( # type: ignore
supabase_url=os.environ['SUPABASE_URL'], supabase_key=os.environ['SUPABASE_API_KEY'])
supabase_url=os.environ['SUPABASE_URL'],
supabase_key=os.environ['SUPABASE_API_KEY'],
options=ClientOptions(postgrest_client_timeout=60,))

# llm = AzureChatOpenAI(
# temperature=0,
Expand Down Expand Up @@ -149,14 +154,13 @@ def loader():

# Triggers determine how your app is deployed
# @app.rest_api(
@app.task_queue(
workers=4,
callback_url='https://uiuc-chat-git-ingestprogresstracking-kastanday.vercel.app/api/UIUC-api/ingestTaskCallback',
max_pending_tasks=15_000,
max_retries=3,
timeout=60 * 15,
loader=loader,
autoscaler=autoscaler)
@app.task_queue(workers=4,
max_pending_tasks=15_000,
callback_url='https://uiuc.chat/api/UIUC-api/ingestTaskCallback',
timeout=60 * 15,
max_retries=3,
loader=loader,
autoscaler=autoscaler)
def ingest(**inputs: Dict[str, Any]):

qdrant_client, vectorstore, s3_client, supabase_client, posthog = inputs["context"]
Expand Down Expand Up @@ -273,6 +277,16 @@ def _ingest_single(ingest_method: Callable, s3_path, *args, **kwargs):
'.pptx': self._ingest_single_ppt,
'.xlsx': self._ingest_single_excel,
'.xls': self._ingest_single_excel,
'.xlsm': self._ingest_single_excel,
'.xlsb': self._ingest_single_excel,
'.xltx': self._ingest_single_excel,
'.xltm': self._ingest_single_excel,
'.xlt': self._ingest_single_excel,
'.xml': self._ingest_single_excel,
'.xlam': self._ingest_single_excel,
'.xla': self._ingest_single_excel,
'.xlw': self._ingest_single_excel,
'.xlr': self._ingest_single_excel,
'.csv': self._ingest_single_csv,
'.png': self._ingest_single_image,
'.jpg': self._ingest_single_image,
Expand Down Expand Up @@ -475,7 +489,7 @@ def _ingest_html(self, s3_path: str, course_name: str, **kwargs) -> str:
print(f"IN _ingest_html s3_path `{s3_path}` kwargs: {kwargs}")
try:
response = self.s3_client.get_object(Bucket=os.environ['S3_BUCKET_NAME'], Key=s3_path)
raw_html = response['Body'].read().decode('utf-8')
raw_html = response['Body'].read().decode('utf-8', errors='ignore')

soup = BeautifulSoup(raw_html, 'html.parser')
title = s3_path.replace("courses/" + course_name, "")
Expand Down Expand Up @@ -623,7 +637,7 @@ def _ingest_single_srt(self, s3_path: str, course_name: str, **kwargs) -> str:

# NOTE: slightly different method for .txt files, no need for download. It's part of the 'body'
response = self.s3_client.get_object(Bucket=os.environ['S3_BUCKET_NAME'], Key=s3_path)
raw_text = response['Body'].read().decode('utf-8')
raw_text = response['Body'].read().decode('utf-8', errors='ignore')

print("UTF-8 text to ingest as SRT:", raw_text)
parsed_info = pysrt.from_string(raw_text)
Expand Down Expand Up @@ -891,7 +905,7 @@ def _ingest_single_txt(self, s3_path: str, course_name: str, **kwargs) -> str:
try:
# NOTE: slightly different method for .txt files, no need for download. It's part of the 'body'
response = self.s3_client.get_object(Bucket=os.environ['S3_BUCKET_NAME'], Key=s3_path)
text = response['Body'].read().decode('utf-8')
text = response['Body'].read().decode('utf-8', errors='ignore')
print("UTF-8 text to ignest (from s3)", text)
text = [text]

Expand Down Expand Up @@ -1056,11 +1070,11 @@ def split_and_upload(self, texts: List[str], metadatas: List[Dict[str, Any]]):
api_key=os.getenv('VLADS_OPENAI_KEY'),
# request_url='https://uiuc-chat-canada-east.openai.azure.com/openai/deployments/text-embedding-ada-002/embeddings?api-version=2023-05-15',
# api_key=os.getenv('AZURE_OPENAI_KEY'),
max_requests_per_minute=5_000,
max_tokens_per_minute=300_000,
max_attempts=20,
max_requests_per_minute=10_000,
max_tokens_per_minute=10_000_000,
max_attempts=500,
logging_level=logging.INFO,
token_encoding_name='cl100k_base') # nosec -- reasonable bandit error suppression
token_encoding_name='cl100k_base')
asyncio.run(oai.process_api_requests_from_file())
print(f"⏰ embeddings tuntime: {(time.monotonic() - embeddings_start_time):.2f} seconds")
# parse results into dict of shape page_content -> embedding
Expand All @@ -1076,10 +1090,17 @@ def split_and_upload(self, texts: List[str], metadatas: List[Dict[str, Any]]):
vectors.append(
PointStruct(id=str(uuid.uuid4()), vector=embeddings_dict[context.page_content], payload=upload_metadata))

self.qdrant_client.upsert(
collection_name=os.environ['QDRANT_COLLECTION_NAME'], # type: ignore
points=vectors, # type: ignore
)
try:
self.qdrant_client.upsert(
collection_name=os.environ['QDRANT_COLLECTION_NAME'], # type: ignore
points=vectors, # type: ignore
)
except Exception as e:
# it's fine if this gets timeout error. it will still post, according to devs: https://github.com/qdrant/qdrant/issues/3654
print("Warning: all update and/or upsert timouts are fine (completed in background), but errors might not be: ",
e)
pass

### Supabase SQL ###
contexts_for_supa = [{
"text": context.page_content,
Expand All @@ -1098,6 +1119,10 @@ def split_and_upload(self, texts: List[str], metadatas: List[Dict[str, Any]]):
"contexts": contexts_for_supa,
}

# Calculate the size of the document object in MB
document_size_mb = len(json.dumps(document).encode('utf-8')) / (1024 * 1024)
print(f"Document size: {document_size_mb:.2f} MB")

response = self.supabase_client.table(
os.getenv('SUPABASE_DOCUMENTS_TABLE')).insert(document).execute() # type: ignore

Expand All @@ -1123,7 +1148,7 @@ def split_and_upload(self, texts: List[str], metadatas: List[Dict[str, Any]]):
print(err)
sentry_sdk.capture_exception(e)
sentry_sdk.flush(timeout=20)
return err
raise Exception(err)

def check_for_duplicates(self, texts: List[Dict], metadatas: List[Dict[str, Any]]) -> bool:
"""
Expand Down

0 comments on commit 4f2c032

Please sign in to comment.