Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor ingest file handling for maintainability #73

Merged
merged 7 commits into from
Sep 15, 2023
160 changes: 53 additions & 107 deletions ai_ta_backend/vector_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,125 +157,71 @@ def get_context_stuffed_prompt(self, user_question: str, course_name: str, top_n
# "Please answer the following question. It's good to quote 'your documents' directly, something like 'from ABS source it says XYZ' Feel free to say you don't know. \nHere's a few passages of the high quality 'your documents':\n"

return stuffed_prompt

# def ai_summary(self, text: List[str], metadata: List[Dict[str, Any]]) -> List[str]:
# """
# Given a textual input, return a summary of the text.
# """
# #print("in AI SUMMARY")
# requests = []
# for i in range(len(text)):
# dictionary = {
# "model": "gpt-3.5-turbo",
# "messages": [{
# "role":
# "system",
# "content":
# "You are a factual summarizer of partial documents. Stick to the facts (including partial info when necessary to avoid making up potentially incorrect details), and say I don't know when necessary."
# }, {
# "role":
# "user",
# "content":
# f"Provide a descriptive summary of the given text:\n{text[i]}\nThe summary should cover all the key points, while also condensing the information into a concise format. The length of the summary should not exceed 3 sentences.",
# }],
# "n": 1,
# "max_tokens": 600,
# "metadata": metadata[i]
# }
# requests.append(dictionary)

# oai = OpenAIAPIProcessor(input_prompts_list=requests,
# request_url='https://api.openai.com/v1/chat/completions',
# api_key=os.getenv("OPENAI_API_KEY"),
# max_requests_per_minute=1500,
# max_tokens_per_minute=90000,
# token_encoding_name='cl100k_base',
# max_attempts=5,
# logging_level=20)

# asyncio.run(oai.process_api_requests_from_file())
# #results: list[str] = oai.results
# #print(f"Cleaned results: {oai.cleaned_results}")
# summary = oai.cleaned_results
# return summary


def bulk_ingest(self, s3_paths: Union[List[str], str], course_name: str, **kwargs) -> Dict[str, List[str]]:
# https://python.langchain.com/en/latest/modules/indexes/document_loaders/examples/microsoft_word.html
success_status = {"success_ingest": [], "failure_ingest": []}
def _ingest_single(file_ingest_methods, s3_path, *args, **kwargs):
"""Handle running an arbitrary ingest function for an individual file."""
handler = file_ingest_methods.get(Path(s3_path).suffix)
if handler:
# RUN INGEST METHOD
ret = handler(s3_path, *args, **kwargs)
if ret != "Success":
success_status['failure_ingest'].append(s3_path)
else:
success_status['success_ingest'].append(s3_path)

# 👇👇👇👇 ADD NEW INGEST METHODSE E HER👇👇👇👇🎉
file_ingest_methods = {
'.html': self._ingest_html,
'.py': self._ingest_single_py,
'.vtt': self._ingest_single_vtt,
'.pdf': self._ingest_single_pdf,
'.txt': self._ingest_single_txt,
'.md': self._ingest_single_txt,
'.srt': self._ingest_single_srt,
'.docx': self._ingest_single_docx,
'.ppt': self._ingest_single_ppt,
'.pptx': self._ingest_single_ppt,
}

# Ingest methods via MIME type (more general than filetype)
mimetype_ingest_methods = {
'video': self._ingest_single_video,
'audio': self._ingest_single_video,
'text': self._ingest_single_txt,
}
# 👆👆👆👆 ADD NEW INGEST METHODS ERE 👆👆👇�DS 👇�🎉

success_status = {"success_ingest": [], "failure_ingest": []}
try:
if isinstance(s3_paths, str):
s3_paths = [s3_paths]


for s3_path in s3_paths:
ext = Path(s3_path).suffix # check mimetype of file
# TODO: no need to download, just guess_type against the s3_path...
with NamedTemporaryFile(suffix=ext) as tmpfile:
with NamedTemporaryFile(suffix=Path(s3_path).suffix) as tmpfile:
self.s3_client.download_fileobj(Bucket=os.environ['S3_BUCKET_NAME'], Key=s3_path, Fileobj=tmpfile)
mime_type = str(mimetypes.guess_type(tmpfile.name)[0])
category, subcategory = mime_type.split('/')

# TODO: if mime-type is text, we should handle that via .txt ingest

if s3_path.endswith('.html'):
ret = self._ingest_html(s3_path, course_name, kwargs=kwargs)
if ret != "Success":
success_status['failure_ingest'].append(s3_path)
else:
success_status['success_ingest'].append(s3_path)
elif s3_path.endswith('.py'):
ret = self._ingest_single_py(s3_path, course_name)
if ret != "Success":
success_status['failure_ingest'].append(s3_path)
else:
success_status['success_ingest'].append(s3_path)
elif s3_path.endswith('.vtt'):
ret = self._ingest_single_vtt(s3_path, course_name)
if ret != "Success":
success_status['failure_ingest'].append(s3_path)
else:
success_status['success_ingest'].append(s3_path)
elif s3_path.endswith('.pdf'):
ret = self._ingest_single_pdf(s3_path, course_name, kwargs=kwargs)
if ret != "Success":
success_status['failure_ingest'].append(s3_path)
else:
success_status['success_ingest'].append(s3_path)
elif s3_path.endswith('.txt') or s3_path.endswith('.md'):
ret = self._ingest_single_txt(s3_path, course_name)
if ret != "Success":
success_status['failure_ingest'].append(s3_path)
else:
success_status['success_ingest'].append(s3_path)
elif s3_path.endswith('.srt'):
ret = self._ingest_single_srt(s3_path, course_name)
if ret != "Success":
success_status['failure_ingest'].append(s3_path)
else:
success_status['success_ingest'].append(s3_path)
elif s3_path.endswith('.docx'):
ret = self._ingest_single_docx(s3_path, course_name)
if ret != "Success":
success_status['failure_ingest'].append(s3_path)
else:
success_status['success_ingest'].append(s3_path)
elif s3_path.endswith('.ppt') or s3_path.endswith('.pptx'):
ret = self._ingest_single_ppt(s3_path, course_name)
if ret != "Success":
success_status['failure_ingest'].append(s3_path)
else:
success_status['success_ingest'].append(s3_path)
elif category == 'video' or category == 'audio':
ret = self._ingest_single_video(s3_path, course_name)
if ret != "Success":
success_status['failure_ingest'].append(s3_path)
else:
success_status['success_ingest'].append(s3_path)
mime_type = mimetypes.guess_type(tmpfile.name, strict=False)[0]
mime_category, extension = mime_type.split('/')
file_ext = "." + extension

if file_ext in file_ingest_methods:
# Use specialized functions when possible, fallback to mimetype. Else raise error.
_ingest_single(file_ingest_methods, s3_path, course_name, kwargs=kwargs)
elif mime_category in mimetype_ingest_methods:
# mime type
_ingest_single(mimetype_ingest_methods, s3_path, course_name, kwargs=kwargs)
else:
# failure
success_status['failure_ingest'].append(f"File ingest not supported for Mimetype: {mime_type}, with MimeCategory: {mime_category}, with file extension: {file_ext} for s3_path: {s3_path}")
continue

return success_status
except Exception as e:
success_status['failure_ingest'].append("MAJOR ERROR IN /bulk_ingest: Error: " + str(e))
return success_status
success_status['failure_ingest'].append(f"MAJOR ERROR IN /bulk_ingest: Error: {str(e)}")
return success_status


def _ingest_single_py(self, s3_path: str, course_name: str):
try:
Expand Down
40 changes: 26 additions & 14 deletions ai_ta_backend/web_scrape.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import mimetypes
import os
import re
import shutil
Expand All @@ -7,13 +8,12 @@

import boto3 # type: ignore
import requests
from bs4 import BeautifulSoup

import supabase
from bs4 import BeautifulSoup

from ai_ta_backend.aws import upload_data_files_to_s3
from ai_ta_backend.vector_database import Ingest
import mimetypes


def get_file_extension(filename):
match = re.search(r'\.([a-zA-Z0-9]+)$', filename)
Expand Down Expand Up @@ -151,7 +151,7 @@ def remove_duplicates(urls:list, supabase_urls:list=None):
print("deleted", og_len-len(not_repeated_files), "duplicate files")
return urls

def crawler(url:str, max_urls:int=1000, max_depth:int=3, timeout:int=1, base_url_on:str=None, _depth:int=0, _soup:BeautifulSoup=None, _filetype:str=None, _invalid_urls:list=[], _existing_urls:list=None):
def crawler(url:str, max_urls:int=1000, max_depth:int=3, timeout:int=1, base_url_on:str=None, _depth:int=0, _soup:BeautifulSoup=None, _filetype:str=None, _invalid_urls:list=[], _existing_urls:list=[]):
'''Function gets titles of urls and the urls themselves'''
# Prints the depth of the current search
print("depth: ", _depth)
Expand Down Expand Up @@ -181,7 +181,7 @@ def crawler(url:str, max_urls:int=1000, max_depth:int=3, timeout:int=1, base_url
url, s, filetype = valid_url(url)
time.sleep(timeout)
url_contents.append((url,s, filetype))
print("Scraped:", url)
print("Scraped:", url, "✅")
if url:
if filetype == '.html':
try:
Expand Down Expand Up @@ -227,7 +227,7 @@ def crawler(url:str, max_urls:int=1000, max_depth:int=3, timeout:int=1, base_url
if url.startswith(site):
url, s, filetype = valid_url(url)
if url:
print("Scraped:", url)
print("Scraped:", url, "✅")
url_contents.append((url, s, filetype))
else:
_invalid_urls.append(url)
Expand All @@ -236,7 +236,7 @@ def crawler(url:str, max_urls:int=1000, max_depth:int=3, timeout:int=1, base_url
else:
url, s, filetype = valid_url(url)
if url:
print("Scraped:", url)
print("Scraped:", url, "✅")
url_contents.append((url, s, filetype))
else:
_invalid_urls.append(url)
Expand Down Expand Up @@ -285,6 +285,18 @@ def crawler(url:str, max_urls:int=1000, max_depth:int=3, timeout:int=1, base_url

return url_contents

def is_github_repo(url):
# Split the URL by '?' to ignore any parameters
base_url = url.split('?')[0]

# The regular expression now allows for optional 'http', 'https', and 'www' prefixes.
# It also accounts for optional trailing slashes.
# The pattern is also case-insensitive.
pattern = re.compile(r'^(https?://)?(www\.)?github\.com/[^/?]+/[^/?]+/?$', re.IGNORECASE)

# The function returns True or False based on whether the pattern matches the base_url
return bool(pattern.match(base_url))

def main_crawler(url:str, course_name:str, max_urls:int=100, max_depth:int=3, timeout:int=1, stay_on_baseurl:bool=False):
"""
Crawl a site and scrape its content and PDFs, then upload the data to S3 and ingest it.
Expand All @@ -305,8 +317,8 @@ def main_crawler(url:str, course_name:str, max_urls:int=100, max_depth:int=3, ti
timeout = int(timeout)
stay_on_baseurl = bool(stay_on_baseurl)
if stay_on_baseurl:
stay_on_baseurl = base_url(url)
print(stay_on_baseurl)
baseurl = base_url(url)
print("baseurl:", baseurl)

ingester = Ingest()
s3_client = boto3.client(
Expand All @@ -316,7 +328,7 @@ def main_crawler(url:str, course_name:str, max_urls:int=100, max_depth:int=3, ti
)

# Check for GitHub repository coming soon
if url.startswith("https://github.com/"):
if is_github_repo(url):
print("Begin Ingesting GitHub page")
results = ingester.ingest_github(url, course_name)
print("Finished ingesting GitHub page")
Expand All @@ -331,7 +343,7 @@ def main_crawler(url:str, course_name:str, max_urls:int=100, max_depth:int=3, ti
urls = supabase_client.table(os.getenv('NEW_NEW_NEWNEW_MATERIALS_SUPABASE_TABLE')).select('course_name, url, contexts').eq('course_name', course_name).execute()
del supabase_client
if urls.data == []:
existing_urls = None
existing_urls = []
else:
existing_urls = []
for thing in urls.data:
Expand All @@ -340,13 +352,13 @@ def main_crawler(url:str, course_name:str, max_urls:int=100, max_depth:int=3, ti
whole += t['text']
existing_urls.append((thing['url'], whole))
print("Finished gathering existing urls from Supabase")
print("Length of existing urls:", len(existing_urls))
except Exception as e:
print("Error:", e)
print("Could not gather existing urls from Supabase")
existing_urls = None

existing_urls = []
print("Begin Ingesting Web page")
data = crawler(url=url, max_urls=max_urls, max_depth=max_depth, timeout=timeout, base_url_on=stay_on_baseurl, _existing_urls=existing_urls)
data = crawler(url=url, max_urls=max_urls, max_depth=max_depth, timeout=timeout, base_url_on=baseurl, _existing_urls=existing_urls)

# Clean some keys for a proper file name
# todo: have a default title
Expand Down