Skip to content

Commit

Permalink
Merge branch 'main' into export-link-update
Browse files Browse the repository at this point in the history
  • Loading branch information
star-nox authored Mar 20, 2024
2 parents 3d64116 + 4e89610 commit 5376201
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 26 deletions.
4 changes: 0 additions & 4 deletions ai_ta_backend/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import os
import threading
import time
from typing import List

Expand Down Expand Up @@ -112,10 +111,7 @@ def getTopContexts(service: RetrievalService) -> Response:
f"Missing one or more required parameters: 'search_query' and 'course_name' must be provided. Search query: `{search_query}`, Course name: `{course_name}`"
)

print("NUM ACTIVE THREADS (top of getTopContexts):", threading.active_count())

found_documents = service.getTopContexts(search_query, course_name, token_limit)
print("NUM ACTIVE THREADS (after instantiating Ingest() class in getTopContexts):", threading.active_count())

response = jsonify(found_documents)
response.headers.add('Access-Control-Allow-Origin', '*')
Expand Down
35 changes: 18 additions & 17 deletions ai_ta_backend/service/export_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ def export_documents_json(self, course_name: str, from_date='', to_date=''):
# background task of downloading data - map it with above ID
executor = ProcessPoolExecutor()
executor.submit(export_data_in_bg, response, "documents", course_name, s3_filepath)
return {"response": 'Download from S3',
"s3_path": s3_filepath}
return {"response": 'Download from S3', "s3_path": s3_filepath}

else:
# Fetch data
Expand All @@ -59,7 +58,7 @@ def export_documents_json(self, course_name: str, from_date='', to_date=''):
print("last_id: ", last_id)

curr_doc_count = 0
filename = course_name + '_' + str(uuid.uuid4()) + '_documents.json'
filename = course_name + '_' + str(uuid.uuid4()) + '_documents.jsonl'
file_path = os.path.join(os.getcwd(), filename)

while curr_doc_count < total_doc_count:
Expand All @@ -71,7 +70,7 @@ def export_documents_json(self, course_name: str, from_date='', to_date=''):

# writing to file
if not os.path.isfile(file_path):
df.to_json(file_path, orient='records')
df.to_json(file_path, orient='records', lines=True)
else:
df.to_json(file_path, orient='records', lines=True, mode='a')

Expand All @@ -96,8 +95,6 @@ def export_documents_json(self, course_name: str, from_date='', to_date=''):
else:
return {"response": "No data found between the given dates."}



def export_convo_history_json(self, course_name: str, from_date='', to_date=''):
"""
This function exports the conversation history to a csv file.
Expand Down Expand Up @@ -126,7 +123,7 @@ def export_convo_history_json(self, course_name: str, from_date='', to_date=''):
last_id = response.data[-1]['id']
total_count = response.count

filename = course_name + '_' + str(uuid.uuid4()) + '_convo_history.json'
filename = course_name + '_' + str(uuid.uuid4()) + '_convo_history.jsonl'
file_path = os.path.join(os.getcwd(), filename)
curr_count = 0
# Fetch data in batches of 25 from first_id to last_id
Expand Down Expand Up @@ -169,6 +166,7 @@ def export_convo_history_json(self, course_name: str, from_date='', to_date=''):

# Encountered pickling error while running the background task. So, moved the function outside the class.


def export_data_in_bg(response, download_type, course_name, s3_path):
"""
This function is called in export_documents_csv() to upload the documents to S3.
Expand All @@ -184,14 +182,14 @@ def export_data_in_bg(response, download_type, course_name, s3_path):
"""
s3 = AWSStorage()
sql = SQLDatabase()

total_doc_count = response.count
first_id = response.data[0]['id']
print("total_doc_count: ", total_doc_count)
print("pre-defined s3_path: ", s3_path)

curr_doc_count = 0
filename = s3_path.split('/')[-1].split('.')[0] + '.json'
filename = s3_path.split('/')[-1].split('.')[0] + '.jsonl'
file_path = os.path.join(os.getcwd(), filename)

# download data in batches of 100
Expand All @@ -203,7 +201,7 @@ def export_data_in_bg(response, download_type, course_name, s3_path):

# writing to file
if not os.path.isfile(file_path):
df.to_json(file_path, orient='records')
df.to_json(file_path, orient='records', lines=True)
else:
df.to_json(file_path, orient='records', lines=True, mode='a')

Expand All @@ -223,7 +221,7 @@ def export_data_in_bg(response, download_type, course_name, s3_path):
# upload to S3

#s3_file = f"courses/{course_name}/exports/{os.path.basename(zip_file_path)}"
s3_file = f"courses/{course_name}/{os.path.basename(zip_file_path)}"
s3_file = f"courses/{course_name}/{os.path.basename(s3_path)}"
s3.upload_file(zip_file_path, os.environ['S3_BUCKET_NAME'], s3_file)

# remove local files
Expand All @@ -235,11 +233,9 @@ def export_data_in_bg(response, download_type, course_name, s3_path):
# generate presigned URL
s3_url = s3.generatePresignedUrl('get_object', os.environ['S3_BUCKET_NAME'], s3_path, 172800)


# get admin email IDs
headers = {
"Authorization": f"Bearer {os.environ['VERCEL_READ_ONLY_API_KEY']}",
"Content-Type": "application/json"
}
headers = {"Authorization": f"Bearer {os.environ['VERCEL_READ_ONLY_API_KEY']}", "Content-Type": "application/json"}

hget_url = str(os.environ['VERCEL_BASE_URL']) + "course_metadatas/" + course_name
response = requests.get(hget_url, headers=headers)
Expand All @@ -264,7 +260,12 @@ def export_data_in_bg(response, download_type, course_name, s3_path):
return "No admin emails found. Email not sent."

# send email to admins
subject = "UIUC.chat Data Export Complete for " + course_name
if download_type == "documents":
subject = "UIUC.chat Documents Export Complete for " + course_name
elif download_type == "conversations":
subject = "UIUC.chat Conversation History Export Complete for " + course_name
else:
subject = "UIUC.chat Export Complete for " + course_name
body_text = "The data export for " + course_name + " is complete.\n\nYou can download the file from the following link: \n\n" + s3_url + "\n\nThis link will expire in 48 hours."
email_status = send_email(subject, body_text, os.environ['EMAIL_SENDER'], admin_emails, bcc_emails)
print("email_status: ", email_status)
Expand All @@ -273,4 +274,4 @@ def export_data_in_bg(response, download_type, course_name, s3_path):

except Exception as e:
print(e)
return "Error: " + str(e)
return "Error: " + str(e)
5 changes: 0 additions & 5 deletions ai_ta_backend/utils/filtering_contexts.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,6 @@
# langsmith_prompt_obj = filter_unrelated_contexts_zephyr
# posthog = Posthog(sync_mode=True, project_api_key=os.environ['POSTHOG_API_KEY'], host='https://app.posthog.com')

# print("NUM ACTIVE THREADS (top of filtering_contexts):", threading.active_count())

# max_concurrency = min(100, len(contexts))
# print("max_concurrency is max of 100, or len(contexts), whichever is less ---- Max concurrency:", max_concurrency)
# print("Num contexts to filter:", len(contexts))
Expand All @@ -153,14 +151,11 @@
# timeout=timeout,
# fetch_local=False)

# print("NUM ACTIVE THREADS (before cleanup filtering_contexts):", threading.active_count())
# # Cleanup
# for task in in_progress:
# ray.cancel(task)
# results = ray.get(done_tasks)
# print("NUM ACTIVE THREADS (before kill filtering_contexts):", threading.active_count())
# ray.kill(actor)
# print("NUM ACTIVE THREADS (after kill filtering_contexts):", threading.active_count())

# best_contexts_to_keep = [
# r['context'] for r in results if r and 'context' in r and 'completion' in r and parse_result(r['completion'])
Expand Down

0 comments on commit 5376201

Please sign in to comment.