Skip to content

Commit

Permalink
moved async download outside the class
Browse files Browse the repository at this point in the history
  • Loading branch information
star-nox committed Mar 18, 2024
1 parent e32ee20 commit 691958f
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 105 deletions.
2 changes: 1 addition & 1 deletion ai_ta_backend/database/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
class SQLDatabase:

@inject
def __init__(self, db_url: str):
def __init__(self):
# Create a Supabase client
self.supabase_client = supabase.create_client( # type: ignore
supabase_url=os.environ['SUPABASE_URL'], supabase_key=os.environ['SUPABASE_API_KEY'])
Expand Down
216 changes: 112 additions & 104 deletions ai_ta_backend/service/export_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def export_documents_json(self, course_name: str, from_date='', to_date=''):
s3_filepath = f"courses/{course_name}/{filename}"
# background task of downloading data - map it with above ID
executor = ProcessPoolExecutor()
executor.submit(self.export_data_in_bg, response, "documents", course_name, s3_filepath)
executor.submit(export_data_in_bg, response, "documents", course_name, s3_filepath)
return {"response": 'Download from S3',
"s3_path": s3_filepath}

Expand Down Expand Up @@ -96,108 +96,7 @@ def export_documents_json(self, course_name: str, from_date='', to_date=''):
else:
return {"response": "No data found between the given dates."}

def export_data_in_bg(self, response, download_type, course_name, s3_path):
"""
This function is called in export_documents_csv() to upload the documents to S3.
1. download the documents in batches of 100 and upload them to S3.
2. generate a pre-signed URL for the S3 file.
3. send an email to the course admins with the pre-signed URL.
Args:
response (dict): The response from the Supabase query.
download_type (str): The type of download - 'documents' or 'conversations'.
course_name (str): The name of the course.
s3_path (str): The S3 path where the file will be uploaded.
"""
total_doc_count = response.count
first_id = response.data[0]['id']
print("total_doc_count: ", total_doc_count)
print("pre-defined s3_path: ", s3_path)

curr_doc_count = 0
filename = s3_path.split('/')[-1].split('.')[0] + '.json'
file_path = os.path.join(os.getcwd(), filename)

# download data in batches of 100
while curr_doc_count < total_doc_count:
print("Fetching data from id: ", first_id)
response = self.sql.getAllFromTableForDownloadType(course_name, download_type, first_id)
df = pd.DataFrame(response.data)
curr_doc_count += len(response.data)

# writing to file
if not os.path.isfile(file_path):
df.to_json(file_path, orient='records')
else:
df.to_json(file_path, orient='records', lines=True, mode='a')

if len(response.data) > 0:
first_id = response.data[-1]['id'] + 1

# zip file
zip_filename = filename.split('.')[0] + '.zip'
zip_file_path = os.path.join(os.getcwd(), zip_filename)

with zipfile.ZipFile(zip_file_path, 'w', compression=zipfile.ZIP_DEFLATED) as zipf:
zipf.write(file_path, filename)

print("zip file created: ", zip_file_path)

try:
# upload to S3

#s3_file = f"courses/{course_name}/exports/{os.path.basename(zip_file_path)}"
s3_file = f"courses/{course_name}/{os.path.basename(zip_file_path)}"
self.s3.upload_file(zip_file_path, os.environ['S3_BUCKET_NAME'], s3_file)

# remove local files
os.remove(file_path)
os.remove(zip_file_path)

print("file uploaded to s3: ", s3_file)

# generate presigned URL
s3_url = self.s3.generatePresignedUrl('get_object', os.environ['S3_BUCKET_NAME'], s3_path, 3600)

# get admin email IDs
headers = {
"Authorization": f"Bearer {os.environ['VERCEL_READ_ONLY_API_KEY']}",
"Content-Type": "application/json"
}

hget_url = str(os.environ['VERCEL_BASE_URL']) + "course_metadatas/" + course_name
response = requests.get(hget_url, headers=headers)
course_metadata = response.json()
course_metadata = json.loads(course_metadata['result'])
admin_emails = course_metadata['course_admins']
bcc_emails = []

# check for Kastan's email and move to bcc
if '[email protected]' in admin_emails:
admin_emails.remove('[email protected]')
bcc_emails.append('[email protected]')

# add course owner email to admin_emails
admin_emails.append(course_metadata['course_owner'])
admin_emails = list(set(admin_emails))
print("admin_emails: ", admin_emails)
print("bcc_emails: ", bcc_emails)

# add a check for emails, don't send email if no admin emails
if len(admin_emails) == 0:
return "No admin emails found. Email not sent."

# send email to admins
subject = "UIUC.chat Data Export Complete for " + course_name
body_text = "The data export for " + course_name + " is complete.\n\nYou can download the file from the following link: \n\n" + s3_url + "\n\nThis link will expire in 48 hours."
email_status = send_email(subject, body_text, os.environ['EMAIL_SENDER'], admin_emails, bcc_emails)
print("email_status: ", email_status)

return "File uploaded to S3. Email sent to admins."

except Exception as e:
print(e)
return "Error: " + str(e)


def export_convo_history_json(self, course_name: str, from_date='', to_date=''):
"""
Expand All @@ -217,7 +116,7 @@ def export_convo_history_json(self, course_name: str, from_date='', to_date=''):
s3_filepath = f"courses/{course_name}/{filename}"
# background task of downloading data - map it with above ID
executor = ProcessPoolExecutor()
executor.submit(self.export_data_in_bg, response, "conversations", course_name, s3_filepath)
executor.submit(export_data_in_bg, response, "conversations", course_name, s3_filepath)
return {"response": 'Download from S3', "s3_path": s3_filepath}

# Fetch data
Expand Down Expand Up @@ -266,3 +165,112 @@ def export_convo_history_json(self, course_name: str, from_date='', to_date=''):
return {"response": "Error downloading file!"}
else:
return {"response": "No data found between the given dates."}


# Encountered pickling error while running the background task. So, moved the function outside the class.

def export_data_in_bg(response, download_type, course_name, s3_path):
"""
This function is called in export_documents_csv() to upload the documents to S3.
1. download the documents in batches of 100 and upload them to S3.
2. generate a pre-signed URL for the S3 file.
3. send an email to the course admins with the pre-signed URL.
Args:
response (dict): The response from the Supabase query.
download_type (str): The type of download - 'documents' or 'conversations'.
course_name (str): The name of the course.
s3_path (str): The S3 path where the file will be uploaded.
"""
s3 = AWSStorage()
sql = SQLDatabase()

total_doc_count = response.count
first_id = response.data[0]['id']
print("total_doc_count: ", total_doc_count)
print("pre-defined s3_path: ", s3_path)

curr_doc_count = 0
filename = s3_path.split('/')[-1].split('.')[0] + '.json'
file_path = os.path.join(os.getcwd(), filename)

# download data in batches of 100
while curr_doc_count < total_doc_count:
print("Fetching data from id: ", first_id)
response = sql.getAllFromTableForDownloadType(course_name, download_type, first_id)
df = pd.DataFrame(response.data)
curr_doc_count += len(response.data)

# writing to file
if not os.path.isfile(file_path):
df.to_json(file_path, orient='records')
else:
df.to_json(file_path, orient='records', lines=True, mode='a')

if len(response.data) > 0:
first_id = response.data[-1]['id'] + 1

# zip file
zip_filename = filename.split('.')[0] + '.zip'
zip_file_path = os.path.join(os.getcwd(), zip_filename)

with zipfile.ZipFile(zip_file_path, 'w', compression=zipfile.ZIP_DEFLATED) as zipf:
zipf.write(file_path, filename)

print("zip file created: ", zip_file_path)

try:
# upload to S3

#s3_file = f"courses/{course_name}/exports/{os.path.basename(zip_file_path)}"
s3_file = f"courses/{course_name}/{os.path.basename(zip_file_path)}"
s3.upload_file(zip_file_path, os.environ['S3_BUCKET_NAME'], s3_file)

# remove local files
os.remove(file_path)
os.remove(zip_file_path)

print("file uploaded to s3: ", s3_file)

# generate presigned URL
s3_url = s3.generatePresignedUrl('get_object', os.environ['S3_BUCKET_NAME'], s3_path, 3600)

# get admin email IDs
headers = {
"Authorization": f"Bearer {os.environ['VERCEL_READ_ONLY_API_KEY']}",
"Content-Type": "application/json"
}

hget_url = str(os.environ['VERCEL_BASE_URL']) + "course_metadatas/" + course_name
response = requests.get(hget_url, headers=headers)
course_metadata = response.json()
course_metadata = json.loads(course_metadata['result'])
admin_emails = course_metadata['course_admins']
bcc_emails = []

# check for Kastan's email and move to bcc
if '[email protected]' in admin_emails:
admin_emails.remove('[email protected]')
bcc_emails.append('[email protected]')

# add course owner email to admin_emails
admin_emails.append(course_metadata['course_owner'])
admin_emails = list(set(admin_emails))
print("admin_emails: ", admin_emails)
print("bcc_emails: ", bcc_emails)

# add a check for emails, don't send email if no admin emails
if len(admin_emails) == 0:
return "No admin emails found. Email not sent."

# send email to admins
subject = "UIUC.chat Data Export Complete for " + course_name
body_text = "The data export for " + course_name + " is complete.\n\nYou can download the file from the following link: \n\n" + s3_url + "\n\nThis link will expire in 48 hours."
email_status = send_email(subject, body_text, os.environ['EMAIL_SENDER'], admin_emails, bcc_emails)
print("email_status: ", email_status)

return "File uploaded to S3. Email sent to admins."

except Exception as e:
print(e)
return "Error: " + str(e)

0 comments on commit 691958f

Please sign in to comment.