diff --git a/ai_ta_backend/beam/canvas_ingest.py b/ai_ta_backend/beam/canvas_ingest.py index 1fed6acd..9476b80c 100644 --- a/ai_ta_backend/beam/canvas_ingest.py +++ b/ai_ta_backend/beam/canvas_ingest.py @@ -17,12 +17,16 @@ from beam import App, QueueDepthAutoscaler, Runtime # RequestLatencyAutoscaler, from canvasapi import Canvas from posthog import Posthog +from bs4 import BeautifulSoup +import yt_dlp requirements = [ "boto3==1.28.79", "posthog==3.1.0", "canvasapi==3.2.0", "sentry-sdk==1.39.1", + "bs4==0.0.2", + "yt-dlp==2024.7.16", ] app = App( @@ -186,7 +190,22 @@ def download_course_content(self, canvas_course_id: int, dest_folder: str, conte elif key == 'discussions': self.download_discussions(dest_folder, api_path) - # at this point, we have all extracted files in the dest_folder. + # at this point, we have all canvas files in the dest_folder. + # parse all HTML files in dest_folder and extract URLs + extracted_urls_from_html = self.extract_urls_from_html(dest_folder) + #print("extract_urls_from_html=", extract_urls_from_html) + + # links - canvas files, external urls, embedded videos + file_links = extracted_urls_from_html.get('file_links', []) + + video_links = extracted_urls_from_html.get('video_links', []) + #external_links = extract_urls_from_html.get('external_links', []) + + # download files from URLs + file_download_status = self.download_files_from_urls(file_links, canvas_course_id, dest_folder) + video_download_status = self.download_videos_from_urls(video_links, canvas_course_id, dest_folder) + print("file_download_status=", file_download_status) + print("video_download_status=", video_download_status) return "Success" except Exception as e: @@ -254,11 +273,13 @@ def ingest_course_content(self, uid = str(uuid.uuid4()) + '-' unique_filename = uid + name_without_extension + extension + s3_path = "courses/" + course_name + "/" + unique_filename readable_filename = name_without_extension + extension - all_s3_paths.append(unique_filename) + all_s3_paths.append(s3_path) all_readable_filenames.append(readable_filename) print("Uploading file: ", readable_filename) - self.upload_file(file_path, os.getenv('S3_BUCKET_NAME'), unique_filename) + print("Filepath: ", file_path) + self.upload_file(file_path, os.getenv('S3_BUCKET_NAME'), s3_path) # Delete files from local directory shutil.rmtree(folder_path) @@ -302,7 +323,7 @@ def download_files(self, dest_folder: str, api_path: str) -> str: course = self.canvas_client.get_course(api_path.split('/')[-1]) files = course.get_files() - + for file in files: # file_name = file['filename'] file_name = file.filename @@ -326,7 +347,7 @@ def download_pages(self, dest_folder: str, api_path: str) -> str: try: pages_request = requests.get(api_path + "/pages", headers=self.headers) pages = pages_request.json() - + #print("Pages: ", pages) for page in pages: if page['html_url'] != '': page_name = page['url'] + ".html" @@ -364,23 +385,70 @@ def download_modules(self, dest_folder: str, api_path: str) -> str: Rest of the things are covered in other functions. """ print("In download_modules") + # need to parse pages through modules try: module_request = requests.get(api_path + "/modules?include=items", headers=self.headers) + modules = module_request.json() for module in modules: module_items = module['items'] for item in module_items: - if item['type'] == 'ExternalUrl': + if item['type'] == 'ExternalUrl': # EXTERNAL LINK external_url = item['external_url'] url_title = item['title'] # Download external url as HTML response = requests.get(external_url) if response.status_code == 200: - html_file_name = url_title + ".html" + html_file_name = "external_link_" + url_title.replace(" ", "_") + ".html" with open(dest_folder + "/" + html_file_name, 'w') as html_file: html_file.write(response.text) + + elif item['type'] == 'Discussion': # DISCUSSION + discussion_url = item['url'] + discussion_req = requests.get(discussion_url, headers=self.headers) + + if discussion_req.status_code == 200: + discussion_data = discussion_req.json() + discussion_message = discussion_data['message'] + discussion_filename = "Discussion_" + discussion_data['title'].replace(" ", "_") + ".html" + + # write the message to a file + with open(dest_folder + "/" + discussion_filename, 'w') as html_file: + html_file.write(discussion_message) + + elif item['type'] == 'Assignment': # ASSIGNMENT + print("Assigments are handled via download_assignments()") + continue + + elif item['type'] == 'Quiz': + #print("Quizzes are not handled at the moment.") + continue + + else: # OTHER ITEMS - PAGES + if 'url' not in item: + #print("No URL in item: ", item['type']) + continue + + item_url = item['url'] + item_request = requests.get(item_url, headers=self.headers) + + if item_request.status_code == 200: + item_data = item_request.json() + if 'body' not in item_data: + #print("No body in item: ", item_data) + continue + + item_body = item_data['body'] + html_file_name = item['type'] + "_" + item_data['url'] + ".html" + + # write page body to a file + with open(dest_folder + "/" + html_file_name, 'w') as html_file: + html_file.write(item_body) + else: + print("Item request failed with status code: ", item_request.status_code) + return "Success" except Exception as e: sentry_sdk.capture_exception(e) @@ -415,7 +483,7 @@ def download_discussions(self, dest_folder: str, api_path: str) -> str: try: discussion_request = requests.get(api_path + "/discussion_topics", headers=self.headers) discussions = discussion_request.json() - + #print("Discussions: ", discussions) for discussion in discussions: discussion_content = discussion['message'] discussion_name = discussion['title'] + ".html" @@ -426,3 +494,120 @@ def download_discussions(self, dest_folder: str, api_path: str) -> str: except Exception as e: sentry_sdk.capture_exception(e) return "Failed! Error: " + str(e) + + def extract_urls_from_html(self, dir_path: str) -> Dict[str, List[str]]: + """ + Extracts URLs from all HTML files in a directory. + """ + print("In extract_urls_from_html") + try: + file_links = [] + video_links = [] + external_links = [] + for file_name in os.listdir(dir_path): + if file_name.endswith(".html"): + file_path = os.path.join(dir_path, file_name) + try: + with open(file_path, 'r', encoding='utf-8', errors='ignore') as file: + content = file.read() + except UnicodeDecodeError: + with open(file_path, 'r', encoding='latin-1') as file: + content = file.read() + + soup = BeautifulSoup(content, 'html.parser') + + # Extracting links from href attributes + href_links = soup.find_all('a', href=True) + for link in href_links: + href = link['href'] + if re.match(r'https://canvas\.illinois\.edu/courses/\d+/files/.*', href): + file_links.append(href) + else: + external_links.append(href) + + # Extracting video links from src attributes + src_links = soup.find_all('iframe', src=True) + for link in src_links: + src = link['src'] + if re.match(r'https://ensemble\.illinois\.edu/hapi/v1/contents/.*', src): + video_links.append(src) + + return { + 'file_links': file_links, + 'video_links': video_links,} + + except Exception as e: + sentry_sdk.capture_exception(e) + return {} + + def download_files_from_urls(self, urls: List[str], course_id: int, dir_path: str): + """ + This function downloads files from a given Canvas course using the URLs provided. + input: urls - list of URLs scraped from Canvas HTML pages. + """ + print("In download_files_from_urls") + #print("Number of URLs: ", len(urls)) + try: + for url in urls: + #print("Downloading file from URL: ", url) + with requests.get(url, stream=True) as r: + content_type = r.headers.get('Content-Type') + #print("Content type: ", content_type) + content_disposition = r.headers.get('Content-Disposition') + #print("Content disposition: ", content_disposition) + if content_disposition is None: + #print("No content disposition") + continue + + if 'filename=' in content_disposition: + filename = content_disposition.split('filename=')[1].strip('"') + #print("local filename: ", filename) + else: + #print("No filename in content disposition") + continue + + # write to PDF + file_path = os.path.join(dir_path, filename) + with open(file_path, 'wb') as f: + for chunk in r.iter_content(chunk_size=8192): + f.write(chunk) + print("Downloaded file: ", filename) + + return "Success" + except Exception as e: + sentry_sdk.capture_exception(e) + print("Error downloading files from URLs: ", e) + return "Failed! Error: " + str(e) + + def download_videos_from_urls(self, urls: List[str], course_id: int, dir_path: str): + """ + This function downloads videos from a given Canvas course using the URLs provided. + """ + print("In download_videos_from_urls") + #print("Video URLs: ", len(urls)) + try: + count = 0 + for url in urls: + count += 1 + with requests.get(url, stream=True) as r: + filename = f"{course_id}_video_{count}.mp4" + + # download video + file_path = os.path.join(dir_path, filename) + ydl_opts = { + 'outtmpl': f'{dir_path}/{course_id}_video_{count}.%(ext)s', # Dynamic extension + 'format': 'best', # Best quality format + } + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + info_dict = ydl.extract_info(url, download=True) + ext = info_dict.get('ext', 'mp4') # Get extension from info, default to mp4 + filename = f"{course_id}_video_{count}.{ext}" + + + print(f"Video downloaded successfully: {filename}") + + return "Success" + except Exception as e: + sentry_sdk.capture_exception(e) + print("Error downloading videos from URLs: ", e) + return "Failed! Error: " + str(e) diff --git a/ai_ta_backend/beam/ingest.py b/ai_ta_backend/beam/ingest.py index 830bcc8a..6337beb4 100644 --- a/ai_ta_backend/beam/ingest.py +++ b/ai_ta_backend/beam/ingest.py @@ -51,6 +51,9 @@ from qdrant_client.models import PointStruct from supabase.client import ClientOptions +import subprocess + + # from langchain.schema.output_parser import StrOutputParser # from langchain.chat_models import AzureChatOpenAI @@ -79,6 +82,7 @@ "sentry-sdk==1.39.1", "nomic==2.0.14", "pdfplumber==0.11.0", # PDF OCR, better performance than Fitz/PyMuPDF in my Gies PDF testing. + ] # TODO: consider adding workers. They share CPU and memory https://docs.beam.cloud/deployment/autoscaling#worker-use-cases @@ -539,8 +543,28 @@ def _ingest_single_video(self, s3_path: str, course_name: str, **kwargs) -> str: with NamedTemporaryFile(suffix=file_ext) as video_tmpfile: # download from S3 into an video tmpfile self.s3_client.download_fileobj(Bucket=os.environ['S3_BUCKET_NAME'], Key=s3_path, Fileobj=video_tmpfile) - # extract audio from video tmpfile - mp4_version = AudioSegment.from_file(video_tmpfile.name, file_ext[1:]) + + # try with original file first + try: + mp4_version = AudioSegment.from_file(video_tmpfile.name, file_ext[1:]) + except Exception as e: + print("Applying moov atom fix and retrying...") + # Fix the moov atom issue using FFmpeg + fixed_video_tmpfile = NamedTemporaryFile(suffix=file_ext, delete=False) + try: + result = subprocess.run([ + 'ffmpeg', '-y', '-i', video_tmpfile.name, '-c', 'copy', '-movflags', 'faststart', fixed_video_tmpfile.name + ], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + #print(result.stdout.decode()) + #print(result.stderr.decode()) + except subprocess.CalledProcessError as e: + #print(e.stdout.decode()) + #print(e.stderr.decode()) + print("Error in FFmpeg command: ", e) + raise e + + # extract audio from video tmpfile + mp4_version = AudioSegment.from_file(fixed_video_tmpfile.name, file_ext[1:]) # save the extracted audio as a temporary webm file with NamedTemporaryFile(suffix=".webm", dir=media_dir, delete=False) as webm_tmpfile: