From f4cd916f4d378cb68f330a4c0143deb3134b456a Mon Sep 17 00:00:00 2001 From: Asmita Dabholkar Date: Mon, 18 Mar 2024 19:18:03 -0500 Subject: [PATCH] Fix "Export all Documents" after refactor (#234) --- ai_ta_backend/database/sql.py | 2 +- ai_ta_backend/service/export_service.py | 216 ++++++++++++------------ 2 files changed, 113 insertions(+), 105 deletions(-) diff --git a/ai_ta_backend/database/sql.py b/ai_ta_backend/database/sql.py index a9819657..223bc386 100644 --- a/ai_ta_backend/database/sql.py +++ b/ai_ta_backend/database/sql.py @@ -7,7 +7,7 @@ class SQLDatabase: @inject - def __init__(self, db_url: str): + def __init__(self): # Create a Supabase client self.supabase_client = supabase.create_client( # type: ignore supabase_url=os.environ['SUPABASE_URL'], supabase_key=os.environ['SUPABASE_API_KEY']) diff --git a/ai_ta_backend/service/export_service.py b/ai_ta_backend/service/export_service.py index 1c300bc4..3ceaabb7 100644 --- a/ai_ta_backend/service/export_service.py +++ b/ai_ta_backend/service/export_service.py @@ -42,7 +42,7 @@ def export_documents_json(self, course_name: str, from_date='', to_date=''): s3_filepath = f"courses/{course_name}/{filename}" # background task of downloading data - map it with above ID executor = ProcessPoolExecutor() - executor.submit(self.export_data_in_bg, response, "documents", course_name, s3_filepath) + executor.submit(export_data_in_bg, response, "documents", course_name, s3_filepath) return {"response": 'Download from S3', "s3_path": s3_filepath} @@ -96,108 +96,7 @@ def export_documents_json(self, course_name: str, from_date='', to_date=''): else: return {"response": "No data found between the given dates."} - def export_data_in_bg(self, response, download_type, course_name, s3_path): - """ - This function is called in export_documents_csv() to upload the documents to S3. - 1. download the documents in batches of 100 and upload them to S3. - 2. generate a pre-signed URL for the S3 file. - 3. send an email to the course admins with the pre-signed URL. - - Args: - response (dict): The response from the Supabase query. - download_type (str): The type of download - 'documents' or 'conversations'. - course_name (str): The name of the course. - s3_path (str): The S3 path where the file will be uploaded. - """ - total_doc_count = response.count - first_id = response.data[0]['id'] - print("total_doc_count: ", total_doc_count) - print("pre-defined s3_path: ", s3_path) - - curr_doc_count = 0 - filename = s3_path.split('/')[-1].split('.')[0] + '.json' - file_path = os.path.join(os.getcwd(), filename) - - # download data in batches of 100 - while curr_doc_count < total_doc_count: - print("Fetching data from id: ", first_id) - response = self.sql.getAllFromTableForDownloadType(course_name, download_type, first_id) - df = pd.DataFrame(response.data) - curr_doc_count += len(response.data) - - # writing to file - if not os.path.isfile(file_path): - df.to_json(file_path, orient='records') - else: - df.to_json(file_path, orient='records', lines=True, mode='a') - - if len(response.data) > 0: - first_id = response.data[-1]['id'] + 1 - - # zip file - zip_filename = filename.split('.')[0] + '.zip' - zip_file_path = os.path.join(os.getcwd(), zip_filename) - - with zipfile.ZipFile(zip_file_path, 'w', compression=zipfile.ZIP_DEFLATED) as zipf: - zipf.write(file_path, filename) - - print("zip file created: ", zip_file_path) - - try: - # upload to S3 - - #s3_file = f"courses/{course_name}/exports/{os.path.basename(zip_file_path)}" - s3_file = f"courses/{course_name}/{os.path.basename(zip_file_path)}" - self.s3.upload_file(zip_file_path, os.environ['S3_BUCKET_NAME'], s3_file) - - # remove local files - os.remove(file_path) - os.remove(zip_file_path) - - print("file uploaded to s3: ", s3_file) - - # generate presigned URL - s3_url = self.s3.generatePresignedUrl('get_object', os.environ['S3_BUCKET_NAME'], s3_path, 3600) - - # get admin email IDs - headers = { - "Authorization": f"Bearer {os.environ['VERCEL_READ_ONLY_API_KEY']}", - "Content-Type": "application/json" - } - - hget_url = str(os.environ['VERCEL_BASE_URL']) + "course_metadatas/" + course_name - response = requests.get(hget_url, headers=headers) - course_metadata = response.json() - course_metadata = json.loads(course_metadata['result']) - admin_emails = course_metadata['course_admins'] - bcc_emails = [] - - # check for Kastan's email and move to bcc - if 'kvday2@illinois.edu' in admin_emails: - admin_emails.remove('kvday2@illinois.edu') - bcc_emails.append('kvday2@illinois.edu') - - # add course owner email to admin_emails - admin_emails.append(course_metadata['course_owner']) - admin_emails = list(set(admin_emails)) - print("admin_emails: ", admin_emails) - print("bcc_emails: ", bcc_emails) - - # add a check for emails, don't send email if no admin emails - if len(admin_emails) == 0: - return "No admin emails found. Email not sent." - - # send email to admins - subject = "UIUC.chat Data Export Complete for " + course_name - body_text = "The data export for " + course_name + " is complete.\n\nYou can download the file from the following link: \n\n" + s3_url + "\n\nThis link will expire in 48 hours." - email_status = send_email(subject, body_text, os.environ['EMAIL_SENDER'], admin_emails, bcc_emails) - print("email_status: ", email_status) - - return "File uploaded to S3. Email sent to admins." - - except Exception as e: - print(e) - return "Error: " + str(e) + def export_convo_history_json(self, course_name: str, from_date='', to_date=''): """ @@ -217,7 +116,7 @@ def export_convo_history_json(self, course_name: str, from_date='', to_date=''): s3_filepath = f"courses/{course_name}/{filename}" # background task of downloading data - map it with above ID executor = ProcessPoolExecutor() - executor.submit(self.export_data_in_bg, response, "conversations", course_name, s3_filepath) + executor.submit(export_data_in_bg, response, "conversations", course_name, s3_filepath) return {"response": 'Download from S3', "s3_path": s3_filepath} # Fetch data @@ -266,3 +165,112 @@ def export_convo_history_json(self, course_name: str, from_date='', to_date=''): return {"response": "Error downloading file!"} else: return {"response": "No data found between the given dates."} + + +# Encountered pickling error while running the background task. So, moved the function outside the class. + +def export_data_in_bg(response, download_type, course_name, s3_path): + """ + This function is called in export_documents_csv() to upload the documents to S3. + 1. download the documents in batches of 100 and upload them to S3. + 2. generate a pre-signed URL for the S3 file. + 3. send an email to the course admins with the pre-signed URL. + + Args: + response (dict): The response from the Supabase query. + download_type (str): The type of download - 'documents' or 'conversations'. + course_name (str): The name of the course. + s3_path (str): The S3 path where the file will be uploaded. + """ + s3 = AWSStorage() + sql = SQLDatabase() + + total_doc_count = response.count + first_id = response.data[0]['id'] + print("total_doc_count: ", total_doc_count) + print("pre-defined s3_path: ", s3_path) + + curr_doc_count = 0 + filename = s3_path.split('/')[-1].split('.')[0] + '.json' + file_path = os.path.join(os.getcwd(), filename) + + # download data in batches of 100 + while curr_doc_count < total_doc_count: + print("Fetching data from id: ", first_id) + response = sql.getAllFromTableForDownloadType(course_name, download_type, first_id) + df = pd.DataFrame(response.data) + curr_doc_count += len(response.data) + + # writing to file + if not os.path.isfile(file_path): + df.to_json(file_path, orient='records') + else: + df.to_json(file_path, orient='records', lines=True, mode='a') + + if len(response.data) > 0: + first_id = response.data[-1]['id'] + 1 + + # zip file + zip_filename = filename.split('.')[0] + '.zip' + zip_file_path = os.path.join(os.getcwd(), zip_filename) + + with zipfile.ZipFile(zip_file_path, 'w', compression=zipfile.ZIP_DEFLATED) as zipf: + zipf.write(file_path, filename) + + print("zip file created: ", zip_file_path) + + try: + # upload to S3 + + #s3_file = f"courses/{course_name}/exports/{os.path.basename(zip_file_path)}" + s3_file = f"courses/{course_name}/{os.path.basename(zip_file_path)}" + s3.upload_file(zip_file_path, os.environ['S3_BUCKET_NAME'], s3_file) + + # remove local files + os.remove(file_path) + os.remove(zip_file_path) + + print("file uploaded to s3: ", s3_file) + + # generate presigned URL + s3_url = s3.generatePresignedUrl('get_object', os.environ['S3_BUCKET_NAME'], s3_path, 3600) + + # get admin email IDs + headers = { + "Authorization": f"Bearer {os.environ['VERCEL_READ_ONLY_API_KEY']}", + "Content-Type": "application/json" + } + + hget_url = str(os.environ['VERCEL_BASE_URL']) + "course_metadatas/" + course_name + response = requests.get(hget_url, headers=headers) + course_metadata = response.json() + course_metadata = json.loads(course_metadata['result']) + admin_emails = course_metadata['course_admins'] + bcc_emails = [] + + # check for Kastan's email and move to bcc + if 'kvday2@illinois.edu' in admin_emails: + admin_emails.remove('kvday2@illinois.edu') + bcc_emails.append('kvday2@illinois.edu') + + # add course owner email to admin_emails + admin_emails.append(course_metadata['course_owner']) + admin_emails = list(set(admin_emails)) + print("admin_emails: ", admin_emails) + print("bcc_emails: ", bcc_emails) + + # add a check for emails, don't send email if no admin emails + if len(admin_emails) == 0: + return "No admin emails found. Email not sent." + + # send email to admins + subject = "UIUC.chat Data Export Complete for " + course_name + body_text = "The data export for " + course_name + " is complete.\n\nYou can download the file from the following link: \n\n" + s3_url + "\n\nThis link will expire in 48 hours." + email_status = send_email(subject, body_text, os.environ['EMAIL_SENDER'], admin_emails, bcc_emails) + print("email_status: ", email_status) + + return "File uploaded to S3. Email sent to admins." + + except Exception as e: + print(e) + return "Error: " + str(e) \ No newline at end of file