From 241f819fd4e002515c2c25a7b5bf87791c1657a8 Mon Sep 17 00:00:00 2001 From: Joshua K Min <76576647+jkmin3@users.noreply.github.com> Date: Fri, 15 Sep 2023 18:09:49 -0500 Subject: [PATCH] Refactor ingest file handling for maintainability; it's beautiful! (#73) * check if it is a github repo * refactoring finished and added better print statements to web scraping * added the texxt category * enhancement to Github url parsing, should match ALLL valid urls now. * major redesign of mime type handing, fully smooth and beautiful functions now --------- Co-authored-by: Kastan Day --- ai_ta_backend/vector_database.py | 160 ++++++++++--------------------- ai_ta_backend/web_scrape.py | 40 +++++--- 2 files changed, 79 insertions(+), 121 deletions(-) diff --git a/ai_ta_backend/vector_database.py b/ai_ta_backend/vector_database.py index 39773834..08aa37ed 100644 --- a/ai_ta_backend/vector_database.py +++ b/ai_ta_backend/vector_database.py @@ -157,125 +157,71 @@ def get_context_stuffed_prompt(self, user_question: str, course_name: str, top_n # "Please answer the following question. It's good to quote 'your documents' directly, something like 'from ABS source it says XYZ' Feel free to say you don't know. \nHere's a few passages of the high quality 'your documents':\n" return stuffed_prompt - - # def ai_summary(self, text: List[str], metadata: List[Dict[str, Any]]) -> List[str]: - # """ - # Given a textual input, return a summary of the text. - # """ - # #print("in AI SUMMARY") - # requests = [] - # for i in range(len(text)): - # dictionary = { - # "model": "gpt-3.5-turbo", - # "messages": [{ - # "role": - # "system", - # "content": - # "You are a factual summarizer of partial documents. Stick to the facts (including partial info when necessary to avoid making up potentially incorrect details), and say I don't know when necessary." - # }, { - # "role": - # "user", - # "content": - # f"Provide a descriptive summary of the given text:\n{text[i]}\nThe summary should cover all the key points, while also condensing the information into a concise format. The length of the summary should not exceed 3 sentences.", - # }], - # "n": 1, - # "max_tokens": 600, - # "metadata": metadata[i] - # } - # requests.append(dictionary) - - # oai = OpenAIAPIProcessor(input_prompts_list=requests, - # request_url='https://api.openai.com/v1/chat/completions', - # api_key=os.getenv("OPENAI_API_KEY"), - # max_requests_per_minute=1500, - # max_tokens_per_minute=90000, - # token_encoding_name='cl100k_base', - # max_attempts=5, - # logging_level=20) - - # asyncio.run(oai.process_api_requests_from_file()) - # #results: list[str] = oai.results - # #print(f"Cleaned results: {oai.cleaned_results}") - # summary = oai.cleaned_results - # return summary def bulk_ingest(self, s3_paths: Union[List[str], str], course_name: str, **kwargs) -> Dict[str, List[str]]: - # https://python.langchain.com/en/latest/modules/indexes/document_loaders/examples/microsoft_word.html - success_status = {"success_ingest": [], "failure_ingest": []} + def _ingest_single(file_ingest_methods, s3_path, *args, **kwargs): + """Handle running an arbitrary ingest function for an individual file.""" + handler = file_ingest_methods.get(Path(s3_path).suffix) + if handler: + # RUN INGEST METHOD + ret = handler(s3_path, *args, **kwargs) + if ret != "Success": + success_status['failure_ingest'].append(s3_path) + else: + success_status['success_ingest'].append(s3_path) + + # πŸ‘‡πŸ‘‡πŸ‘‡πŸ‘‡ ADD NEW INGEST METHODSE E HERπŸ‘‡πŸ‘‡πŸ‘‡πŸ‘‡πŸŽ‰ + file_ingest_methods = { + '.html': self._ingest_html, + '.py': self._ingest_single_py, + '.vtt': self._ingest_single_vtt, + '.pdf': self._ingest_single_pdf, + '.txt': self._ingest_single_txt, + '.md': self._ingest_single_txt, + '.srt': self._ingest_single_srt, + '.docx': self._ingest_single_docx, + '.ppt': self._ingest_single_ppt, + '.pptx': self._ingest_single_ppt, + } + + # Ingest methods via MIME type (more general than filetype) + mimetype_ingest_methods = { + 'video': self._ingest_single_video, + 'audio': self._ingest_single_video, + 'text': self._ingest_single_txt, + } + # πŸ‘†πŸ‘†πŸ‘†πŸ‘† ADD NEW INGEST METHODS ERE πŸ‘†πŸ‘†πŸ‘‡οΏ½DS πŸ‘‡οΏ½πŸŽ‰ + success_status = {"success_ingest": [], "failure_ingest": []} try: if isinstance(s3_paths, str): s3_paths = [s3_paths] + for s3_path in s3_paths: - ext = Path(s3_path).suffix # check mimetype of file - # TODO: no need to download, just guess_type against the s3_path... - with NamedTemporaryFile(suffix=ext) as tmpfile: + with NamedTemporaryFile(suffix=Path(s3_path).suffix) as tmpfile: self.s3_client.download_fileobj(Bucket=os.environ['S3_BUCKET_NAME'], Key=s3_path, Fileobj=tmpfile) - mime_type = str(mimetypes.guess_type(tmpfile.name)[0]) - category, subcategory = mime_type.split('/') - - # TODO: if mime-type is text, we should handle that via .txt ingest - - if s3_path.endswith('.html'): - ret = self._ingest_html(s3_path, course_name, kwargs=kwargs) - if ret != "Success": - success_status['failure_ingest'].append(s3_path) - else: - success_status['success_ingest'].append(s3_path) - elif s3_path.endswith('.py'): - ret = self._ingest_single_py(s3_path, course_name) - if ret != "Success": - success_status['failure_ingest'].append(s3_path) - else: - success_status['success_ingest'].append(s3_path) - elif s3_path.endswith('.vtt'): - ret = self._ingest_single_vtt(s3_path, course_name) - if ret != "Success": - success_status['failure_ingest'].append(s3_path) - else: - success_status['success_ingest'].append(s3_path) - elif s3_path.endswith('.pdf'): - ret = self._ingest_single_pdf(s3_path, course_name, kwargs=kwargs) - if ret != "Success": - success_status['failure_ingest'].append(s3_path) - else: - success_status['success_ingest'].append(s3_path) - elif s3_path.endswith('.txt') or s3_path.endswith('.md'): - ret = self._ingest_single_txt(s3_path, course_name) - if ret != "Success": - success_status['failure_ingest'].append(s3_path) - else: - success_status['success_ingest'].append(s3_path) - elif s3_path.endswith('.srt'): - ret = self._ingest_single_srt(s3_path, course_name) - if ret != "Success": - success_status['failure_ingest'].append(s3_path) - else: - success_status['success_ingest'].append(s3_path) - elif s3_path.endswith('.docx'): - ret = self._ingest_single_docx(s3_path, course_name) - if ret != "Success": - success_status['failure_ingest'].append(s3_path) - else: - success_status['success_ingest'].append(s3_path) - elif s3_path.endswith('.ppt') or s3_path.endswith('.pptx'): - ret = self._ingest_single_ppt(s3_path, course_name) - if ret != "Success": - success_status['failure_ingest'].append(s3_path) - else: - success_status['success_ingest'].append(s3_path) - elif category == 'video' or category == 'audio': - ret = self._ingest_single_video(s3_path, course_name) - if ret != "Success": - success_status['failure_ingest'].append(s3_path) - else: - success_status['success_ingest'].append(s3_path) + mime_type = mimetypes.guess_type(tmpfile.name, strict=False)[0] + mime_category, extension = mime_type.split('/') + file_ext = "." + extension + + if file_ext in file_ingest_methods: + # Use specialized functions when possible, fallback to mimetype. Else raise error. + _ingest_single(file_ingest_methods, s3_path, course_name, kwargs=kwargs) + elif mime_category in mimetype_ingest_methods: + # mime type + _ingest_single(mimetype_ingest_methods, s3_path, course_name, kwargs=kwargs) + else: + # failure + success_status['failure_ingest'].append(f"File ingest not supported for Mimetype: {mime_type}, with MimeCategory: {mime_category}, with file extension: {file_ext} for s3_path: {s3_path}") + continue + return success_status except Exception as e: - success_status['failure_ingest'].append("MAJOR ERROR IN /bulk_ingest: Error: " + str(e)) - return success_status + success_status['failure_ingest'].append(f"MAJOR ERROR IN /bulk_ingest: Error: {str(e)}") + return success_status + def _ingest_single_py(self, s3_path: str, course_name: str): try: diff --git a/ai_ta_backend/web_scrape.py b/ai_ta_backend/web_scrape.py index 36158db9..46e3154b 100644 --- a/ai_ta_backend/web_scrape.py +++ b/ai_ta_backend/web_scrape.py @@ -1,3 +1,4 @@ +import mimetypes import os import re import shutil @@ -7,13 +8,12 @@ import boto3 # type: ignore import requests -from bs4 import BeautifulSoup - import supabase +from bs4 import BeautifulSoup from ai_ta_backend.aws import upload_data_files_to_s3 from ai_ta_backend.vector_database import Ingest -import mimetypes + def get_file_extension(filename): match = re.search(r'\.([a-zA-Z0-9]+)$', filename) @@ -151,7 +151,7 @@ def remove_duplicates(urls:list, supabase_urls:list=None): print("deleted", og_len-len(not_repeated_files), "duplicate files") return urls -def crawler(url:str, max_urls:int=1000, max_depth:int=3, timeout:int=1, base_url_on:str=None, _depth:int=0, _soup:BeautifulSoup=None, _filetype:str=None, _invalid_urls:list=[], _existing_urls:list=None): +def crawler(url:str, max_urls:int=1000, max_depth:int=3, timeout:int=1, base_url_on:str=None, _depth:int=0, _soup:BeautifulSoup=None, _filetype:str=None, _invalid_urls:list=[], _existing_urls:list=[]): '''Function gets titles of urls and the urls themselves''' # Prints the depth of the current search print("depth: ", _depth) @@ -181,7 +181,7 @@ def crawler(url:str, max_urls:int=1000, max_depth:int=3, timeout:int=1, base_url url, s, filetype = valid_url(url) time.sleep(timeout) url_contents.append((url,s, filetype)) - print("Scraped:", url) + print("βœ…Scraped:", url, "βœ…") if url: if filetype == '.html': try: @@ -227,7 +227,7 @@ def crawler(url:str, max_urls:int=1000, max_depth:int=3, timeout:int=1, base_url if url.startswith(site): url, s, filetype = valid_url(url) if url: - print("Scraped:", url) + print("βœ…Scraped:", url, "βœ…") url_contents.append((url, s, filetype)) else: _invalid_urls.append(url) @@ -236,7 +236,7 @@ def crawler(url:str, max_urls:int=1000, max_depth:int=3, timeout:int=1, base_url else: url, s, filetype = valid_url(url) if url: - print("Scraped:", url) + print("βœ…Scraped:", url, "βœ…") url_contents.append((url, s, filetype)) else: _invalid_urls.append(url) @@ -285,6 +285,18 @@ def crawler(url:str, max_urls:int=1000, max_depth:int=3, timeout:int=1, base_url return url_contents +def is_github_repo(url): + # Split the URL by '?' to ignore any parameters + base_url = url.split('?')[0] + + # The regular expression now allows for optional 'http', 'https', and 'www' prefixes. + # It also accounts for optional trailing slashes. + # The pattern is also case-insensitive. + pattern = re.compile(r'^(https?://)?(www\.)?github\.com/[^/?]+/[^/?]+/?$', re.IGNORECASE) + + # The function returns True or False based on whether the pattern matches the base_url + return bool(pattern.match(base_url)) + def main_crawler(url:str, course_name:str, max_urls:int=100, max_depth:int=3, timeout:int=1, stay_on_baseurl:bool=False): """ Crawl a site and scrape its content and PDFs, then upload the data to S3 and ingest it. @@ -305,8 +317,8 @@ def main_crawler(url:str, course_name:str, max_urls:int=100, max_depth:int=3, ti timeout = int(timeout) stay_on_baseurl = bool(stay_on_baseurl) if stay_on_baseurl: - stay_on_baseurl = base_url(url) - print(stay_on_baseurl) + baseurl = base_url(url) + print("baseurl:", baseurl) ingester = Ingest() s3_client = boto3.client( @@ -316,7 +328,7 @@ def main_crawler(url:str, course_name:str, max_urls:int=100, max_depth:int=3, ti ) # Check for GitHub repository coming soon - if url.startswith("https://github.com/"): + if is_github_repo(url): print("Begin Ingesting GitHub page") results = ingester.ingest_github(url, course_name) print("Finished ingesting GitHub page") @@ -331,7 +343,7 @@ def main_crawler(url:str, course_name:str, max_urls:int=100, max_depth:int=3, ti urls = supabase_client.table(os.getenv('NEW_NEW_NEWNEW_MATERIALS_SUPABASE_TABLE')).select('course_name, url, contexts').eq('course_name', course_name).execute() del supabase_client if urls.data == []: - existing_urls = None + existing_urls = [] else: existing_urls = [] for thing in urls.data: @@ -340,13 +352,13 @@ def main_crawler(url:str, course_name:str, max_urls:int=100, max_depth:int=3, ti whole += t['text'] existing_urls.append((thing['url'], whole)) print("Finished gathering existing urls from Supabase") + print("Length of existing urls:", len(existing_urls)) except Exception as e: print("Error:", e) print("Could not gather existing urls from Supabase") - existing_urls = None - + existing_urls = [] print("Begin Ingesting Web page") - data = crawler(url=url, max_urls=max_urls, max_depth=max_depth, timeout=timeout, base_url_on=stay_on_baseurl, _existing_urls=existing_urls) + data = crawler(url=url, max_urls=max_urls, max_depth=max_depth, timeout=timeout, base_url_on=baseurl, _existing_urls=existing_urls) # Clean some keys for a proper file name # todo: have a default title