Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve subject line for data exports, clean up logs #236

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions ai_ta_backend/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import os
import threading
import time
from typing import List

Expand Down Expand Up @@ -112,10 +111,7 @@ def getTopContexts(service: RetrievalService) -> Response:
f"Missing one or more required parameters: 'search_query' and 'course_name' must be provided. Search query: `{search_query}`, Course name: `{course_name}`"
)

print("NUM ACTIVE THREADS (top of getTopContexts):", threading.active_count())

found_documents = service.getTopContexts(search_query, course_name, token_limit)
print("NUM ACTIVE THREADS (after instantiating Ingest() class in getTopContexts):", threading.active_count())

response = jsonify(found_documents)
response.headers.add('Access-Control-Allow-Origin', '*')
Expand Down
125 changes: 114 additions & 11 deletions ai_ta_backend/service/export_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ def export_documents_json(self, course_name: str, from_date='', to_date=''):
s3_filepath = f"courses/{course_name}/{filename}"
# background task of downloading data - map it with above ID
executor = ProcessPoolExecutor()
executor.submit(export_data_in_bg, response, "documents", course_name, s3_filepath)
return {"response": 'Download from S3',
"s3_path": s3_filepath}
executor.submit(self.export_data_in_bg, response, "documents", course_name, s3_filepath)
return {"response": 'Download from S3', "s3_path": s3_filepath}

else:
# Fetch data
Expand Down Expand Up @@ -96,7 +95,113 @@ def export_documents_json(self, course_name: str, from_date='', to_date=''):
else:
return {"response": "No data found between the given dates."}


def export_data_in_bg(self, response, download_type, course_name, s3_path):
"""
This function is called in export_documents_csv() to upload the documents to S3.
1. download the documents in batches of 100 and upload them to S3.
2. generate a pre-signed URL for the S3 file.
3. send an email to the course admins with the pre-signed URL.

Args:
response (dict): The response from the Supabase query.
download_type (str): The type of download - 'documents' or 'conversations'.
course_name (str): The name of the course.
s3_path (str): The S3 path where the file will be uploaded.
"""
total_doc_count = response.count
first_id = response.data[0]['id']
print("total_doc_count: ", total_doc_count)
print("pre-defined s3_path: ", s3_path)

curr_doc_count = 0
filename = s3_path.split('/')[-1].split('.')[0] + '.json'
file_path = os.path.join(os.getcwd(), filename)

# download data in batches of 100
while curr_doc_count < total_doc_count:
print("Fetching data from id: ", first_id)
response = self.sql.getAllFromTableForDownloadType(course_name, download_type, first_id)
df = pd.DataFrame(response.data)
curr_doc_count += len(response.data)

# writing to file
if not os.path.isfile(file_path):
df.to_json(file_path, orient='records')
else:
df.to_json(file_path, orient='records', lines=True, mode='a')

if len(response.data) > 0:
first_id = response.data[-1]['id'] + 1

# zip file
zip_filename = filename.split('.')[0] + '.zip'
zip_file_path = os.path.join(os.getcwd(), zip_filename)

with zipfile.ZipFile(zip_file_path, 'w', compression=zipfile.ZIP_DEFLATED) as zipf:
zipf.write(file_path, filename)

print("zip file created: ", zip_file_path)

try:
# upload to S3

#s3_file = f"courses/{course_name}/exports/{os.path.basename(zip_file_path)}"
s3_file = f"courses/{course_name}/{os.path.basename(zip_file_path)}"
self.s3.upload_file(zip_file_path, os.environ['S3_BUCKET_NAME'], s3_file)

# remove local files
os.remove(file_path)
os.remove(zip_file_path)

print("file uploaded to s3: ", s3_file)

# generate presigned URL
s3_url = self.s3.generatePresignedUrl('get_object', os.environ['S3_BUCKET_NAME'], s3_path, 3600)

# get admin email IDs
headers = {
"Authorization": f"Bearer {os.environ['VERCEL_READ_ONLY_API_KEY']}",
"Content-Type": "application/json"
}

hget_url = str(os.environ['VERCEL_BASE_URL']) + "course_metadatas/" + course_name
response = requests.get(hget_url, headers=headers)
course_metadata = response.json()
course_metadata = json.loads(course_metadata['result'])
admin_emails = course_metadata['course_admins']
bcc_emails = []

# check for Kastan's email and move to bcc
if '[email protected]' in admin_emails:
admin_emails.remove('[email protected]')
bcc_emails.append('[email protected]')

# add course owner email to admin_emails
admin_emails.append(course_metadata['course_owner'])
admin_emails = list(set(admin_emails))
print("admin_emails: ", admin_emails)
print("bcc_emails: ", bcc_emails)

# add a check for emails, don't send email if no admin emails
if len(admin_emails) == 0:
return "No admin emails found. Email not sent."

# send email to admins
if download_type == "documents":
subject = "UIUC.chat Documents Export Complete for " + course_name
elif download_type == "conversations":
subject = "UIUC.chat Conversation History Export Complete for " + course_name
else:
subject = "UIUC.chat Export Complete for " + course_name
body_text = "The data export for " + course_name + " is complete.\n\nYou can download the file from the following link: \n\n" + s3_url + "\n\nThis link will expire in 48 hours."
email_status = send_email(subject, body_text, os.environ['EMAIL_SENDER'], admin_emails, bcc_emails)
print("email_status: ", email_status)

return "File uploaded to S3. Email sent to admins."

except Exception as e:
print(e)
return "Error: " + str(e)

def export_convo_history_json(self, course_name: str, from_date='', to_date=''):
"""
Expand Down Expand Up @@ -169,6 +274,7 @@ def export_convo_history_json(self, course_name: str, from_date='', to_date=''):

# Encountered pickling error while running the background task. So, moved the function outside the class.


def export_data_in_bg(response, download_type, course_name, s3_path):
"""
This function is called in export_documents_csv() to upload the documents to S3.
Expand All @@ -184,7 +290,7 @@ def export_data_in_bg(response, download_type, course_name, s3_path):
"""
s3 = AWSStorage()
sql = SQLDatabase()

total_doc_count = response.count
first_id = response.data[0]['id']
print("total_doc_count: ", total_doc_count)
Expand All @@ -203,7 +309,7 @@ def export_data_in_bg(response, download_type, course_name, s3_path):

# writing to file
if not os.path.isfile(file_path):
df.to_json(file_path, orient='records', lines=True)
df.to_json(file_path, orient='records', lines=True)
else:
df.to_json(file_path, orient='records', lines=True, mode='a')

Expand Down Expand Up @@ -237,10 +343,7 @@ def export_data_in_bg(response, download_type, course_name, s3_path):
#print("s3_url: ", s3_url)

# get admin email IDs
headers = {
"Authorization": f"Bearer {os.environ['VERCEL_READ_ONLY_API_KEY']}",
"Content-Type": "application/json"
}
headers = {"Authorization": f"Bearer {os.environ['VERCEL_READ_ONLY_API_KEY']}", "Content-Type": "application/json"}

hget_url = str(os.environ['VERCEL_BASE_URL']) + "course_metadatas/" + course_name
response = requests.get(hget_url, headers=headers)
Expand Down Expand Up @@ -274,4 +377,4 @@ def export_data_in_bg(response, download_type, course_name, s3_path):

except Exception as e:
print(e)
return "Error: " + str(e)
return "Error: " + str(e)
6 changes: 1 addition & 5 deletions ai_ta_backend/utils/filtering_contexts.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,6 @@
# langsmith_prompt_obj = filter_unrelated_contexts_zephyr
# posthog = Posthog(sync_mode=True, project_api_key=os.environ['POSTHOG_API_KEY'], host='https://app.posthog.com')

# print("NUM ACTIVE THREADS (top of filtering_contexts):", threading.active_count())

# max_concurrency = min(100, len(contexts))
# print("max_concurrency is max of 100, or len(contexts), whichever is less ---- Max concurrency:", max_concurrency)
# print("Num contexts to filter:", len(contexts))
Expand All @@ -153,14 +151,12 @@
# timeout=timeout,
# fetch_local=False)

# print("NUM ACTIVE THREADS (before cleanup filtering_contexts):", threading.active_count())
# # Cleanup
# for task in in_progress:
# ray.cancel(task)
# results = ray.get(done_tasks)
# print("NUM ACTIVE THREADS (before kill filtering_contexts):", threading.active_count())
# print(" THREADS (before kill filtering_contexts):", threading.active_count())
# ray.kill(actor)
# print("NUM ACTIVE THREADS (after kill filtering_contexts):", threading.active_count())

# best_contexts_to_keep = [
# r['context'] for r in results if r and 'context' in r and 'completion' in r and parse_result(r['completion'])
Expand Down