Skip to content

Commit

Permalink
Export Convo History: Add markdown and excel export types for human-r…
Browse files Browse the repository at this point in the history
…eadability (#289)

* Add markdown and excel to conversation export with image uploads, if any

* Formatting ONLY

* Improvements to prints

* Formatting improvements
  • Loading branch information
rohan-uiuc authored Aug 5, 2024
1 parent 4f8b3fb commit a0fd946
Show file tree
Hide file tree
Showing 5 changed files with 452 additions and 9 deletions.
4 changes: 1 addition & 3 deletions ai_ta_backend/executors/process_pool_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ def __init__(self, max_workers=None):
self.executor = ProcessPoolExecutor(max_workers=max_workers)

def submit(self, fn, *args, **kwargs):
raise NotImplementedError(
"ProcessPoolExecutorAdapter does not support 'submit' directly due to its nature. Use 'map' or other methods as needed."
)
return self.executor.submit(fn, *args, **kwargs)

def map(self, fn, *iterables, timeout=None, chunksize=1):
return self.executor.map(fn, *iterables, timeout=timeout, chunksize=chunksize)
42 changes: 38 additions & 4 deletions ai_ta_backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ def nomic_map(service: NomicService):
# response.headers.add('Access-Control-Allow-Origin', '*')
# return response


# @app.route('/createConversationMap', methods=['GET'])
# def createConversationMap(service: NomicService):
# course_name: str = request.args.get('course_name', default='', type=str)
Expand All @@ -219,7 +218,6 @@ def nomic_map(service: NomicService):
# response.headers.add('Access-Control-Allow-Origin', '*')
# return response


# @app.route('/logToConversationMap', methods=['GET'])
# def logToConversationMap(service: NomicService, flaskExecutor: ExecutorInterface):
# course_name: str = request.args.get('course_name', default='', type=str)
Expand Down Expand Up @@ -290,6 +288,42 @@ def export_convo_history(service: ExportService):
return response


@app.route('/test-process', methods=['GET'])
def test_process(service: ExportService):
service.test_process()
return jsonify({"response": "success"})


@app.route('/export-convo-history', methods=['GET'])
def export_convo_history_v2(service: ExportService):
course_name: str = request.args.get('course_name', default='', type=str)
from_date: str = request.args.get('from_date', default='', type=str)
to_date: str = request.args.get('to_date', default='', type=str)

if course_name == '':
abort(400, description=f"Missing required parameter: 'course_name' must be provided. Course name: `{course_name}`")

export_status = service.export_convo_history(course_name, from_date, to_date)
print("Export processing response: ", export_status)

if export_status['response'] == "No data found between the given dates.":
response = Response(status=204)
response.headers.add('Access-Control-Allow-Origin', '*')

elif export_status['response'] == "Download from S3":
response = jsonify({"response": "Download from S3", "s3_path": export_status['s3_path']})
response.headers.add('Access-Control-Allow-Origin', '*')

else:
response = make_response(
send_from_directory(export_status['response'][2], export_status['response'][1], as_attachment=True))
response.headers.add('Access-Control-Allow-Origin', '*')
response.headers["Content-Disposition"] = f"attachment; filename={export_status['response'][1]}"
os.remove(export_status['response'][0])

return response


@app.route('/export-conversations-custom', methods=['GET'])
def export_conversations_custom(service: ExportService):
course_name: str = request.args.get('course_name', default='', type=str)
Expand Down Expand Up @@ -481,6 +515,8 @@ def run_flow(service: WorkflowService) -> Response:


def configure(binder: Binder) -> None:
binder.bind(ThreadPoolExecutorInterface, to=ThreadPoolExecutorAdapter, scope=SingletonScope)
binder.bind(ProcessPoolExecutorInterface, to=ProcessPoolExecutorAdapter, scope=SingletonScope)
binder.bind(RetrievalService, to=RetrievalService, scope=RequestScope)
binder.bind(PosthogService, to=PosthogService, scope=SingletonScope)
binder.bind(SentryService, to=SentryService, scope=SingletonScope)
Expand All @@ -491,8 +527,6 @@ def configure(binder: Binder) -> None:
binder.bind(SQLDatabase, to=SQLDatabase, scope=SingletonScope)
binder.bind(AWSStorage, to=AWSStorage, scope=SingletonScope)
binder.bind(ExecutorInterface, to=FlaskExecutorAdapter(executor), scope=SingletonScope)
binder.bind(ThreadPoolExecutorInterface, to=ThreadPoolExecutorAdapter, scope=SingletonScope)
binder.bind(ProcessPoolExecutorInterface, to=ProcessPoolExecutorAdapter, scope=SingletonScope)


FlaskInjector(app=app, modules=[configure])
Expand Down
208 changes: 206 additions & 2 deletions ai_ta_backend/service/export_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,49 @@
import os
import uuid
import zipfile
from concurrent.futures import ProcessPoolExecutor
from urllib.parse import urlparse

import pandas as pd
import requests
import xlsxwriter
from injector import inject

from ai_ta_backend.database.aws import AWSStorage
from ai_ta_backend.database.sql import SQLDatabase
from ai_ta_backend.executors.process_pool_executor import ProcessPoolExecutorAdapter
from ai_ta_backend.service.sentry_service import SentryService
from ai_ta_backend.utils.email.send_transactional_email import send_email
from ai_ta_backend.utils.export_utils import (
_cleanup,
_create_zip,
_initialize_excel,
_initialize_file_paths,
_process_conversation,
)


def _task_method(index):
print(f"Task {index} is running in process {os.getpid()}", flush=True)
return index


class ExportService:

@inject
def __init__(self, sql: SQLDatabase, s3: AWSStorage, sentry: SentryService):
def __init__(self, sql: SQLDatabase, s3: AWSStorage, sentry: SentryService, executor: ProcessPoolExecutorAdapter):
self.sql = sql
self.s3 = s3
self.sentry = sentry
self.executor = executor

def test_process(self):
"""
This function is used to test the process.
"""
futures = [self.executor.submit(_task_method, i) for i in range(5)]
results = [future.result() for future in futures]
print(results)
return {"response": "Test process successful.", "results": results}

def export_documents_json(self, course_name: str, from_date='', to_date=''):
"""
Expand Down Expand Up @@ -228,6 +252,186 @@ def export_conversations(self, course_name: str, from_date: str, to_date: str, e
else:
return {"response": "No data found between the given dates."}

def export_convo_history(self, course_name: str, from_date='', to_date=''):
"""
This function exports the conversation history to a zip file containing markdown files, an Excel file, and a JSONL file.
Args:
course_name (str): The name of the course.
from_date (str, optional): The start date for the data export. Defaults to ''.
to_date (str, optional): The end date for the data export. Defaults to ''.
"""
print(
f"Exporting extended conversation history for course: {course_name}, from_date: {from_date}, to_date: {to_date}"
)
error_log = []

try:
response = self.sql.getDocumentsBetweenDates(course_name, from_date, to_date, 'llm-convo-monitor')
responseCount = response.count or 0
print(f"Received request to export: {responseCount} conversations")
except Exception as e:
error_log.append(f"Error fetching documents: {str(e)}")
print(f"Error fetching documents: {str(e)}")
return {"response": "Error fetching documents!"}

if responseCount > 500:
filename = course_name[0:10] + '-' + str(generate_short_id()) + '_convos_extended.zip'
s3_filepath = f"courses/{course_name}/{filename}"
print(
f"Response count greater than 500, processing in background. Filename: {filename}, S3 filepath: {s3_filepath}"
)
self.executor.submit(export_data_in_bg_extended, response, "conversations", course_name, s3_filepath)
return {"response": 'Download from S3', "s3_path": s3_filepath}

if responseCount > 0:
try:
first_id = response.data[0]['id']
last_id = response.data[-1]['id']
total_count = response.count or 0
print(f"Processing conversations. First ID: {first_id}, Last ID: {last_id}, Total count: {total_count}")

file_paths = _initialize_file_paths(course_name)
# print(f"Initialized file paths: {file_paths}")
workbook, worksheet, wrap_format = _initialize_excel(file_paths['excel'])
# print(f"Initialized Excel workbook at path: {file_paths['excel']}")
except Exception as e:
error_log.append(f"Error initializing file paths or Excel: {str(e)}")
print(f"Error initializing file paths or Excel: {str(e)}")
return {"response": "Error initializing file paths or Excel!"}

curr_count = 0
row_num = 1

while curr_count < total_count:
try:
print(f"Fetching conversations from ID: {first_id} to {last_id}")
response = self.sql.getAllConversationsBetweenIds(course_name, first_id, last_id)
curr_count += len(response.data)
# print(f"Fetched {len(response.data)} conversations, current count: {curr_count}")

for convo in response.data:
# print(f"Processing conversation ID: {convo['convo_id']}")
_process_conversation(self.s3, convo, course_name, file_paths, worksheet, row_num, error_log, wrap_format)
row_num += len(convo['convo']['messages'])

if len(response.data) > 0:
first_id = response.data[-1]['id'] + 1
# print(f"Updated first ID to: {first_id}")
except Exception as e:
error_log.append(f"Error processing conversations: {str(e)}")
print(f"Error processing conversations: {str(e)}")
break

print(f"Processed {curr_count} conversations, ready to finalize export.")

try:
workbook.close()
print(f"Closed Excel workbook.")
zip_file_path = _create_zip(file_paths, error_log)
# print(f"Created zip file at path: {zip_file_path}")
_cleanup(file_paths)
# print(f"Cleaned up temporary files.")
except Exception as e:
error_log.append(f"Error finalizing export: {str(e)}")
print(f"Error finalizing export: {str(e)}")
return {"response": "Error finalizing export!"}

return {"response": (zip_file_path, file_paths['zip'], os.getcwd())}
else:
print("No data found between the given dates.")
return {"response": "No data found between the given dates."}


def export_data_in_bg_extended(response, download_type, course_name, s3_path):
"""
This function is called to upload the extended conversation history to S3.
Args:
response (dict): The response from the Supabase query.
download_type (str): The type of download - 'documents' or 'conversations'.
course_name (str): The name of the course.
s3_path (str): The S3 path where the file will be uploaded.
"""
print(f"Starting export in background for course: {course_name}, download_type: {download_type}, s3_path: {s3_path}")
s3 = AWSStorage()
sql = SQLDatabase()

total_doc_count = response.count
first_id = response.data[0]['id']
curr_doc_count = 0

file_paths = _initialize_file_paths(course_name)
workbook, worksheet, wrap_format = _initialize_excel(file_paths['excel'])
print(f"Initialized Excel workbook at path: {file_paths['excel']}")

row_num = 1
error_log = []
# Process conversations in batches
while curr_doc_count < total_doc_count:
try:
response = sql.getAllFromTableForDownloadType(course_name, download_type, first_id)
curr_doc_count += len(response.data)

for convo in response.data:
print(f"Processing conversation ID: {convo['convo_id']}")
_process_conversation(s3, convo, course_name, file_paths, worksheet, row_num, error_log, wrap_format)
row_num += len(convo['convo']['messages'])

# Update first_id for the next batch
if len(response.data) > 0:
first_id = response.data[-1]['id'] + 1
# print(f"Updated first ID to: {first_id}")
except Exception as e:
error_log.append(f"Error processing conversations: {str(e)}")
print(f"Error processing conversations: {str(e)}")
break

print(f"Processed {curr_doc_count} conversations, ready to finalize export.")

try:
workbook.close()
print(f"Closed Excel workbook.")
zip_file_path = _create_zip(file_paths, error_log)
print(f"Created zip file at path: {zip_file_path}")
_cleanup(file_paths)
print(f"Cleaned up temporary files.")

# Upload the zip file to S3
s3.upload_file(zip_file_path, os.environ['S3_BUCKET_NAME'], s3_path)
os.remove(zip_file_path)
s3_url = s3.generatePresignedUrl('get_object', os.environ['S3_BUCKET_NAME'], s3_path, 172800)

# Fetch course metadata to get admin emails
headers = {"Authorization": f"Bearer {os.environ['VERCEL_READ_ONLY_API_KEY']}", "Content-Type": "application/json"}
hget_url = str(os.environ['VERCEL_BASE_URL']) + "course_metadatas/" + course_name
response = requests.get(hget_url, headers=headers)
course_metadata = response.json()
course_metadata = json.loads(course_metadata['result'])
admin_emails = course_metadata['course_admins']
bcc_emails = []

# Handle specific email cases
if '[email protected]' in admin_emails:
admin_emails.remove('[email protected]')
bcc_emails.append('[email protected]')

admin_emails.append(course_metadata['course_owner'])
admin_emails = list(set(admin_emails))

if len(admin_emails) == 0:
return "No admin emails found. Email not sent."

# Send email notification to course admins
subject = "UIUC.chat Conversation History Export Complete for " + course_name
body_text = "The data export for " + course_name + " is complete.\n\nYou can download the file from the following link: \n\n" + s3_url + "\n\nThis link will expire in 48 hours."
email_status = send_email(subject, body_text, os.environ['EMAIL_SENDER'], admin_emails, bcc_emails)
print("email_status: ", email_status)

return "File uploaded to S3. Email sent to admins."

except Exception as e:
error_log.append(f"Error finalizing export: {str(e)}")
print(f"Error finalizing export: {str(e)}")
return {"response": "Error finalizing export!"}
# Encountered pickling error while running the background task. So, moved the function outside the class.


Expand Down
Loading

0 comments on commit a0fd946

Please sign in to comment.