diff --git a/ai_ta_backend/executors/process_pool_executor.py b/ai_ta_backend/executors/process_pool_executor.py index 81b4860c..8b92bb12 100644 --- a/ai_ta_backend/executors/process_pool_executor.py +++ b/ai_ta_backend/executors/process_pool_executor.py @@ -23,9 +23,7 @@ def __init__(self, max_workers=None): self.executor = ProcessPoolExecutor(max_workers=max_workers) def submit(self, fn, *args, **kwargs): - raise NotImplementedError( - "ProcessPoolExecutorAdapter does not support 'submit' directly due to its nature. Use 'map' or other methods as needed." - ) + return self.executor.submit(fn, *args, **kwargs) def map(self, fn, *iterables, timeout=None, chunksize=1): return self.executor.map(fn, *iterables, timeout=timeout, chunksize=chunksize) diff --git a/ai_ta_backend/main.py b/ai_ta_backend/main.py index 4c91317d..fecfd3be 100644 --- a/ai_ta_backend/main.py +++ b/ai_ta_backend/main.py @@ -204,7 +204,6 @@ def nomic_map(service: NomicService): # response.headers.add('Access-Control-Allow-Origin', '*') # return response - # @app.route('/createConversationMap', methods=['GET']) # def createConversationMap(service: NomicService): # course_name: str = request.args.get('course_name', default='', type=str) @@ -219,7 +218,6 @@ def nomic_map(service: NomicService): # response.headers.add('Access-Control-Allow-Origin', '*') # return response - # @app.route('/logToConversationMap', methods=['GET']) # def logToConversationMap(service: NomicService, flaskExecutor: ExecutorInterface): # course_name: str = request.args.get('course_name', default='', type=str) @@ -290,6 +288,42 @@ def export_convo_history(service: ExportService): return response +@app.route('/test-process', methods=['GET']) +def test_process(service: ExportService): + service.test_process() + return jsonify({"response": "success"}) + + +@app.route('/export-convo-history', methods=['GET']) +def export_convo_history_v2(service: ExportService): + course_name: str = request.args.get('course_name', default='', type=str) + from_date: str = request.args.get('from_date', default='', type=str) + to_date: str = request.args.get('to_date', default='', type=str) + + if course_name == '': + abort(400, description=f"Missing required parameter: 'course_name' must be provided. Course name: `{course_name}`") + + export_status = service.export_convo_history(course_name, from_date, to_date) + print("Export processing response: ", export_status) + + if export_status['response'] == "No data found between the given dates.": + response = Response(status=204) + response.headers.add('Access-Control-Allow-Origin', '*') + + elif export_status['response'] == "Download from S3": + response = jsonify({"response": "Download from S3", "s3_path": export_status['s3_path']}) + response.headers.add('Access-Control-Allow-Origin', '*') + + else: + response = make_response( + send_from_directory(export_status['response'][2], export_status['response'][1], as_attachment=True)) + response.headers.add('Access-Control-Allow-Origin', '*') + response.headers["Content-Disposition"] = f"attachment; filename={export_status['response'][1]}" + os.remove(export_status['response'][0]) + + return response + + @app.route('/export-conversations-custom', methods=['GET']) def export_conversations_custom(service: ExportService): course_name: str = request.args.get('course_name', default='', type=str) @@ -481,6 +515,8 @@ def run_flow(service: WorkflowService) -> Response: def configure(binder: Binder) -> None: + binder.bind(ThreadPoolExecutorInterface, to=ThreadPoolExecutorAdapter, scope=SingletonScope) + binder.bind(ProcessPoolExecutorInterface, to=ProcessPoolExecutorAdapter, scope=SingletonScope) binder.bind(RetrievalService, to=RetrievalService, scope=RequestScope) binder.bind(PosthogService, to=PosthogService, scope=SingletonScope) binder.bind(SentryService, to=SentryService, scope=SingletonScope) @@ -491,8 +527,6 @@ def configure(binder: Binder) -> None: binder.bind(SQLDatabase, to=SQLDatabase, scope=SingletonScope) binder.bind(AWSStorage, to=AWSStorage, scope=SingletonScope) binder.bind(ExecutorInterface, to=FlaskExecutorAdapter(executor), scope=SingletonScope) - binder.bind(ThreadPoolExecutorInterface, to=ThreadPoolExecutorAdapter, scope=SingletonScope) - binder.bind(ProcessPoolExecutorInterface, to=ProcessPoolExecutorAdapter, scope=SingletonScope) FlaskInjector(app=app, modules=[configure]) diff --git a/ai_ta_backend/service/export_service.py b/ai_ta_backend/service/export_service.py index 75b3b272..45cfe97c 100644 --- a/ai_ta_backend/service/export_service.py +++ b/ai_ta_backend/service/export_service.py @@ -3,25 +3,49 @@ import os import uuid import zipfile -from concurrent.futures import ProcessPoolExecutor +from urllib.parse import urlparse import pandas as pd import requests +import xlsxwriter from injector import inject from ai_ta_backend.database.aws import AWSStorage from ai_ta_backend.database.sql import SQLDatabase +from ai_ta_backend.executors.process_pool_executor import ProcessPoolExecutorAdapter from ai_ta_backend.service.sentry_service import SentryService from ai_ta_backend.utils.email.send_transactional_email import send_email +from ai_ta_backend.utils.export_utils import ( + _cleanup, + _create_zip, + _initialize_excel, + _initialize_file_paths, + _process_conversation, +) + + +def _task_method(index): + print(f"Task {index} is running in process {os.getpid()}", flush=True) + return index class ExportService: @inject - def __init__(self, sql: SQLDatabase, s3: AWSStorage, sentry: SentryService): + def __init__(self, sql: SQLDatabase, s3: AWSStorage, sentry: SentryService, executor: ProcessPoolExecutorAdapter): self.sql = sql self.s3 = s3 self.sentry = sentry + self.executor = executor + + def test_process(self): + """ + This function is used to test the process. + """ + futures = [self.executor.submit(_task_method, i) for i in range(5)] + results = [future.result() for future in futures] + print(results) + return {"response": "Test process successful.", "results": results} def export_documents_json(self, course_name: str, from_date='', to_date=''): """ @@ -228,6 +252,186 @@ def export_conversations(self, course_name: str, from_date: str, to_date: str, e else: return {"response": "No data found between the given dates."} + def export_convo_history(self, course_name: str, from_date='', to_date=''): + """ + This function exports the conversation history to a zip file containing markdown files, an Excel file, and a JSONL file. + Args: + course_name (str): The name of the course. + from_date (str, optional): The start date for the data export. Defaults to ''. + to_date (str, optional): The end date for the data export. Defaults to ''. + """ + print( + f"Exporting extended conversation history for course: {course_name}, from_date: {from_date}, to_date: {to_date}" + ) + error_log = [] + + try: + response = self.sql.getDocumentsBetweenDates(course_name, from_date, to_date, 'llm-convo-monitor') + responseCount = response.count or 0 + print(f"Received request to export: {responseCount} conversations") + except Exception as e: + error_log.append(f"Error fetching documents: {str(e)}") + print(f"Error fetching documents: {str(e)}") + return {"response": "Error fetching documents!"} + + if responseCount > 500: + filename = course_name[0:10] + '-' + str(generate_short_id()) + '_convos_extended.zip' + s3_filepath = f"courses/{course_name}/{filename}" + print( + f"Response count greater than 500, processing in background. Filename: {filename}, S3 filepath: {s3_filepath}" + ) + self.executor.submit(export_data_in_bg_extended, response, "conversations", course_name, s3_filepath) + return {"response": 'Download from S3', "s3_path": s3_filepath} + + if responseCount > 0: + try: + first_id = response.data[0]['id'] + last_id = response.data[-1]['id'] + total_count = response.count or 0 + print(f"Processing conversations. First ID: {first_id}, Last ID: {last_id}, Total count: {total_count}") + + file_paths = _initialize_file_paths(course_name) + # print(f"Initialized file paths: {file_paths}") + workbook, worksheet, wrap_format = _initialize_excel(file_paths['excel']) + # print(f"Initialized Excel workbook at path: {file_paths['excel']}") + except Exception as e: + error_log.append(f"Error initializing file paths or Excel: {str(e)}") + print(f"Error initializing file paths or Excel: {str(e)}") + return {"response": "Error initializing file paths or Excel!"} + + curr_count = 0 + row_num = 1 + + while curr_count < total_count: + try: + print(f"Fetching conversations from ID: {first_id} to {last_id}") + response = self.sql.getAllConversationsBetweenIds(course_name, first_id, last_id) + curr_count += len(response.data) + # print(f"Fetched {len(response.data)} conversations, current count: {curr_count}") + + for convo in response.data: + # print(f"Processing conversation ID: {convo['convo_id']}") + _process_conversation(self.s3, convo, course_name, file_paths, worksheet, row_num, error_log, wrap_format) + row_num += len(convo['convo']['messages']) + + if len(response.data) > 0: + first_id = response.data[-1]['id'] + 1 + # print(f"Updated first ID to: {first_id}") + except Exception as e: + error_log.append(f"Error processing conversations: {str(e)}") + print(f"Error processing conversations: {str(e)}") + break + + print(f"Processed {curr_count} conversations, ready to finalize export.") + + try: + workbook.close() + print(f"Closed Excel workbook.") + zip_file_path = _create_zip(file_paths, error_log) + # print(f"Created zip file at path: {zip_file_path}") + _cleanup(file_paths) + # print(f"Cleaned up temporary files.") + except Exception as e: + error_log.append(f"Error finalizing export: {str(e)}") + print(f"Error finalizing export: {str(e)}") + return {"response": "Error finalizing export!"} + + return {"response": (zip_file_path, file_paths['zip'], os.getcwd())} + else: + print("No data found between the given dates.") + return {"response": "No data found between the given dates."} + + +def export_data_in_bg_extended(response, download_type, course_name, s3_path): + """ + This function is called to upload the extended conversation history to S3. + Args: + response (dict): The response from the Supabase query. + download_type (str): The type of download - 'documents' or 'conversations'. + course_name (str): The name of the course. + s3_path (str): The S3 path where the file will be uploaded. + """ + print(f"Starting export in background for course: {course_name}, download_type: {download_type}, s3_path: {s3_path}") + s3 = AWSStorage() + sql = SQLDatabase() + + total_doc_count = response.count + first_id = response.data[0]['id'] + curr_doc_count = 0 + + file_paths = _initialize_file_paths(course_name) + workbook, worksheet, wrap_format = _initialize_excel(file_paths['excel']) + print(f"Initialized Excel workbook at path: {file_paths['excel']}") + + row_num = 1 + error_log = [] + # Process conversations in batches + while curr_doc_count < total_doc_count: + try: + response = sql.getAllFromTableForDownloadType(course_name, download_type, first_id) + curr_doc_count += len(response.data) + + for convo in response.data: + print(f"Processing conversation ID: {convo['convo_id']}") + _process_conversation(s3, convo, course_name, file_paths, worksheet, row_num, error_log, wrap_format) + row_num += len(convo['convo']['messages']) + + # Update first_id for the next batch + if len(response.data) > 0: + first_id = response.data[-1]['id'] + 1 + # print(f"Updated first ID to: {first_id}") + except Exception as e: + error_log.append(f"Error processing conversations: {str(e)}") + print(f"Error processing conversations: {str(e)}") + break + + print(f"Processed {curr_doc_count} conversations, ready to finalize export.") + + try: + workbook.close() + print(f"Closed Excel workbook.") + zip_file_path = _create_zip(file_paths, error_log) + print(f"Created zip file at path: {zip_file_path}") + _cleanup(file_paths) + print(f"Cleaned up temporary files.") + + # Upload the zip file to S3 + s3.upload_file(zip_file_path, os.environ['S3_BUCKET_NAME'], s3_path) + os.remove(zip_file_path) + s3_url = s3.generatePresignedUrl('get_object', os.environ['S3_BUCKET_NAME'], s3_path, 172800) + + # Fetch course metadata to get admin emails + headers = {"Authorization": f"Bearer {os.environ['VERCEL_READ_ONLY_API_KEY']}", "Content-Type": "application/json"} + hget_url = str(os.environ['VERCEL_BASE_URL']) + "course_metadatas/" + course_name + response = requests.get(hget_url, headers=headers) + course_metadata = response.json() + course_metadata = json.loads(course_metadata['result']) + admin_emails = course_metadata['course_admins'] + bcc_emails = [] + + # Handle specific email cases + if 'kvday2@illinois.edu' in admin_emails: + admin_emails.remove('kvday2@illinois.edu') + bcc_emails.append('kvday2@illinois.edu') + + admin_emails.append(course_metadata['course_owner']) + admin_emails = list(set(admin_emails)) + + if len(admin_emails) == 0: + return "No admin emails found. Email not sent." + + # Send email notification to course admins + subject = "UIUC.chat Conversation History Export Complete for " + course_name + body_text = "The data export for " + course_name + " is complete.\n\nYou can download the file from the following link: \n\n" + s3_url + "\n\nThis link will expire in 48 hours." + email_status = send_email(subject, body_text, os.environ['EMAIL_SENDER'], admin_emails, bcc_emails) + print("email_status: ", email_status) + + return "File uploaded to S3. Email sent to admins." + + except Exception as e: + error_log.append(f"Error finalizing export: {str(e)}") + print(f"Error finalizing export: {str(e)}") + return {"response": "Error finalizing export!"} # Encountered pickling error while running the background task. So, moved the function outside the class. diff --git a/ai_ta_backend/utils/export_utils.py b/ai_ta_backend/utils/export_utils.py new file mode 100644 index 00000000..395337e9 --- /dev/null +++ b/ai_ta_backend/utils/export_utils.py @@ -0,0 +1,206 @@ +import json +import os +import zipfile +from urllib.parse import urlparse + +import xlsxwriter + + +def _initialize_file_paths(course_name): + base_name = course_name[0:15] + '-conversation-export' + file_paths = { + 'zip': base_name + '.zip', + 'excel': base_name + '.xlsx', + 'jsonl': base_name + '.jsonl', + 'markdown_dir': os.path.join(os.getcwd(), 'markdown export'), + 'media_dir': os.path.join(os.getcwd(), 'media_files') + } + os.makedirs(file_paths['markdown_dir'], exist_ok=True) + os.makedirs(file_paths['media_dir'], exist_ok=True) + print(f"Initialized directories: {file_paths['markdown_dir']}, {file_paths['media_dir']}") + return file_paths + + +def _initialize_excel(excel_file_path): + workbook = xlsxwriter.Workbook(excel_file_path) + worksheet = workbook.add_worksheet() + worksheet.autofit() + wrap_format = workbook.add_format() + wrap_format.set_text_wrap() + worksheet.set_column('G:G', 100) + worksheet.set_column('A:A', 35) + worksheet.set_column('B:E', 20) + worksheet.set_column('F:F', 10) + worksheet.set_column('H:H', 15) + headers = [ + "Conversation ID", "User Email", "Course Name", "Message ID", "Timestamp", "User Role", "Message Content", + "Had Images(Yes/No)" + ] + for col_num, header in enumerate(headers): + worksheet.write(0, col_num, header) + print(f"Initialized Excel headers: {headers}") + return workbook, worksheet, wrap_format + + +def _process_conversation(s3, convo, course_name, file_paths, worksheet, row_num, error_log, wrap_format): + try: + convo_id = convo['convo_id'] + convo_data = convo['convo'] + user_email = convo['user_email'] + timestamp = convo['created_at'] + messages = convo_data['messages'] + if isinstance(messages[0]['content'], list) and messages[0]['role'] == 'user': + convo_name = messages[0]['content'][0]['text'][:15] + else: + convo_name = messages[0]['content'][:15] + print(f"Processing conversation ID: {convo_id}, User email: {user_email}") + + _create_markdown(s3, convo_id, messages, file_paths['markdown_dir'], file_paths['media_dir'], user_email, error_log, + timestamp, convo_name) + # print(f"Created markdown for conversation ID: {convo_id}") + _write_to_excel(convo_id, course_name, messages, worksheet, row_num, user_email, timestamp, error_log, wrap_format) + # print(f"Wrote to Excel for conversation ID: {convo_id}") + _append_to_jsonl(convo_data, file_paths['jsonl'], error_log) + # print(f"Appended to JSONL for conversation ID: {convo_id}") + print(f"Processed conversation ID: {convo_id}") + except Exception as e: + print(f"Error processing conversation ID {convo['convo_id']}: {str(e)}") + error_log.append(f"Error processing conversation ID {convo['convo_id']}: {str(e)}") + + +def _create_markdown(s3, convo_id, messages, markdown_dir, media_dir, user_email, error_log, timestamp, convo_name): + try: + markdown_filename = f"{timestamp.split('T')[0]}-{convo_name}.md" + markdown_file_path = os.path.join(markdown_dir, markdown_filename) + with open(markdown_file_path, 'w') as md_file: + md_file.write(f"## Conversation ID: {convo_id}\n") + md_file.write(f"## **User Email**: {user_email}\n\n") + md_file.write(f"### **Timestamp**: {timestamp}\n\n") + + for message in messages: + role = "User" if message['role'] == 'user' else "Assistant" + content = _process_message_content(s3, message['content'], convo_id, media_dir, error_log) + md_file.write(f"### {role}:\n") + md_file.write(f"{content}\n\n") + md_file.write("---\n\n") # Separator for each message for better readability + + print(f"Created markdown file at path: {markdown_file_path}") + except Exception as e: + print(f"Error creating markdown for conversation ID {convo_id}: {str(e)}") + error_log.append(f"Error creating markdown for conversation ID {convo_id}: {str(e)}") + + +def _process_message_content(s3, content, convo_id, media_dir, error_log): + try: + if isinstance(content, list): + flattened_content = [] + for item in content: + if item['type'] == 'text': + flattened_content.append(item['text']) + elif item['type'] == 'image_url': + # Use only the UUID part of the image URL for the filename + image_filename = f"{item['image_url']['url'].split('/')[-1].split('?')[0]}" + image_file_path = os.path.join(media_dir, image_filename) + image_s3_path = _extract_path_from_url(item['image_url']['url']) + # Save the image to the media directory + s3.download_file(image_s3_path, os.environ['S3_BUCKET_NAME'], image_file_path) + # Adjust the path to be relative from the markdown file's perspective + relative_image_path = os.path.join('..', media_dir.split('/')[-1], image_filename) + flattened_content.append(f"![Image]({relative_image_path})") + print(f"Processed message content for conversation ID: {convo_id}") + return ' '.join(flattened_content) + else: + return content + except Exception as e: + print(f"Error processing message content for conversation ID {convo_id}: {str(e)}") + error_log.append(f"Error processing message content for conversation ID {convo_id}: {str(e)}") + return content + + +def _extract_path_from_url(url: str) -> str: + urlObject = urlparse(url) + path = urlObject.path + if path.startswith('/'): + path = path[1:] + return path + + +def _write_to_excel(convo_id, course_name, messages, worksheet, row_num, user_email, timestamp, error_log, wrap_format): + try: + start_row = row_num + for message_id, message in enumerate(messages): + if message_id == 0: + # if convo_id == '3f1827f5-3d6c-4743-b467-12ef4b2059c5': + # print(f"Message: {message}") + worksheet.write(row_num, 0, convo_id) + worksheet.write(row_num, 1, user_email) + worksheet.write(row_num, 2, course_name, wrap_format) + worksheet.write(row_num, 3, message_id) # Add message ID as the index of the message + worksheet.write(row_num, 4, timestamp, wrap_format) + worksheet.write(row_num, 5, message['role'], wrap_format) + if message['role'] == 'user' and isinstance(message['content'], list): + # if convo_id == '3f1827f5-3d6c-4743-b467-12ef4b2059c5': + # print(f"Message content: {message['content']}") + content = ' '.join([item['text'] for item in message['content'] if item['type'] == 'text']) + contains_image = any(item['type'] == 'image_url' and 'url' in item['image_url'] for item in message['content']) + if contains_image: + worksheet.write(row_num, 7, 'Yes') + else: + worksheet.write(row_num, 7, 'No') + else: + content = message['content'] + worksheet.write(row_num, 6, content, wrap_format) + row_num += 1 + + # Merge the rows in the first column for the same convo_id + if row_num > start_row + 1: + worksheet.merge_range(start_row, 0, row_num - 1, 0, convo_id, wrap_format) + + print(f"Wrote messages to Excel for conversation ID: {convo_id}") + except Exception as e: + print(f"Error writing to Excel for conversation ID {convo_id}: {str(e)}") + error_log.append(f"Error writing to Excel for conversation ID {convo_id}: {str(e)}") + + +def _append_to_jsonl(convo_data, jsonl_file_path, error_log): + try: + with open(jsonl_file_path, 'a') as jsonl_file: + jsonl_file.write(json.dumps(convo_data) + '\n') + print(f"Appended conversation data to JSONL file at path: {jsonl_file_path}") + except Exception as e: + print(f"Error appending to JSONL for conversation ID {convo_data['convo_id']}: {str(e)}") + error_log.append(f"Error appending to JSONL for conversation ID {convo_data['convo_id']}: {str(e)}") + + +def _create_zip(file_paths, error_log): + zip_file_path = os.path.join(os.getcwd(), file_paths['zip']) + error_log_path = os.path.join(os.getcwd(), 'error.log') + with open(error_log_path, 'w') as log_file: + for error in error_log: + log_file.write(error + '\n') + with zipfile.ZipFile(zip_file_path, 'w', compression=zipfile.ZIP_DEFLATED) as zipf: + for root, _, files in os.walk(file_paths['markdown_dir']): + for file in files: + zipf.write( + os.path.join(root, file), + os.path.join('markdown export', os.path.relpath(os.path.join(root, file), file_paths['markdown_dir']))) + for root, _, files in os.walk(file_paths['media_dir']): + for file in files: + zipf.write( + os.path.join(root, file), + os.path.join(file_paths['media_dir'].split('/')[-1], + os.path.relpath(os.path.join(root, file), file_paths['media_dir']))) + zipf.write(file_paths['excel'], os.path.basename(file_paths['excel'])) + zipf.write(file_paths['jsonl'], os.path.basename(file_paths['jsonl'])) + zipf.write(error_log_path, 'error.log') + print(f"Created zip file at path: {zip_file_path}") + return zip_file_path + + +def _cleanup(file_paths): + os.remove(file_paths['excel']) + os.remove(file_paths['jsonl']) + import shutil + shutil.rmtree(file_paths['markdown_dir']) + shutil.rmtree(file_paths['media_dir']) + print(f"Cleaned up files: {file_paths}") diff --git a/requirements.txt b/requirements.txt index d228546a..8d5dd65b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -24,6 +24,7 @@ python-dotenv==1.0.0 pydantic==1.10.13 # pydantic v1 works better for ray flask-executor==1.0.0 retry==0.9.2 +XlsxWriter==3.2.0 # AI & core services nomic==2.0.14