From ef4c016cfe6fcc0b5a1b6dc6c5dc3ab5ebbf44c6 Mon Sep 17 00:00:00 2001 From: Asmita Dabholkar Date: Thu, 22 Feb 2024 12:05:36 -0600 Subject: [PATCH 1/6] BCC recipients in Document export email (#224) * added BCC receipients in send_email() * added export endpoint again, it dissapeared during merge --- ai_ta_backend/emails.py | 21 +++++++++++++++------ ai_ta_backend/export_data.py | 19 +++++++++++++++++-- ai_ta_backend/main.py | 29 +++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 8 deletions(-) diff --git a/ai_ta_backend/emails.py b/ai_ta_backend/emails.py index a2b3d18c..2f17dce0 100644 --- a/ai_ta_backend/emails.py +++ b/ai_ta_backend/emails.py @@ -4,16 +4,25 @@ from email.mime.multipart import MIMEMultipart -def send_email(subject, body_text, sender, receipients): +def send_email(subject: str, body_text: str, sender: str, receipients: list, bcc_receipients: list): + """ + Send an email using the AWS SES service + :param subject: The subject of the email + :param body_text: The body of the email + :param sender: The email address of the sender + :param receipients: A list of email addresses to send the email to + :param bcc_receipients: A list of email addresses to send the email to as BCC + :return: A string indicating the result of the email send operation + + """ # Create message content message = MIMEMultipart("alternative") message["Subject"] = subject message["From"] = sender + message["To"] = ", ".join(receipients) - if len(receipients) == 1: - message["To"] = receipients[0] - else: - message["To"] = ", ".join(receipients) + if len(bcc_receipients) > 0: + message["Bcc"] = ", ".join(bcc_receipients) # Add plain text part part1 = MIMEText(body_text, "plain") @@ -24,6 +33,6 @@ def send_email(subject, body_text, sender, receipients): # Connect to SMTP server with smtplib.SMTP_SSL(os.getenv('SES_HOST'), os.getenv('SES_PORT')) as server: # type: ignore server.login(os.getenv('USERNAME_SMTP'), os.getenv('PASSWORD_SMTP')) # type: ignore - server.sendmail(sender, receipients, message.as_string()) + server.sendmail(sender, receipients + bcc_receipients, message.as_string()) return "Email sent successfully!" \ No newline at end of file diff --git a/ai_ta_backend/export_data.py b/ai_ta_backend/export_data.py index d03f53dd..299b3435 100644 --- a/ai_ta_backend/export_data.py +++ b/ai_ta_backend/export_data.py @@ -19,7 +19,9 @@ def export_documents_json(course_name: str, from_date='', to_date=''): """ - This function exports the documents to a csv file. + This function exports the documents to a json file. + 1. If the number of documents is greater than 1000, it calls a background task to upload the documents to S3. + 2. If the number of documents is less than 1000, it fetches the documents and zips them. Args: course_name (str): The name of the course. from_date (str, optional): The start date for the data export. Defaults to ''. @@ -198,14 +200,27 @@ def export_data_in_bg(response, download_type, course_name, s3_path): course_metadata = response.json() course_metadata = json.loads(course_metadata['result']) admin_emails = course_metadata['course_admins'] + bcc_emails = [] + + # check for Kastan's email and move to bcc + if 'kvday2@illinois.edu' in admin_emails: + admin_emails.remove('kvday2@illinois.edu') + bcc_emails.append('kvday2@illinois.edu') + + # add course owner email to admin_emails admin_emails.append(course_metadata['course_owner']) admin_emails = list(set(admin_emails)) print("admin_emails: ", admin_emails) + print("bcc_emails: ", bcc_emails) + + # add a check for emails, don't send email if no admin emails + if len(admin_emails) == 0: + return "No admin emails found. Email not sent." # send email to admins subject = "UIUC.chat Data Export Complete for " + course_name body_text = "The data export for " + course_name + " is complete.\n\nYou can download the file from the following link: \n\n" + s3_url + "\n\nThis link will expire in 48 hours." - email_status = send_email(subject, body_text, os.getenv('EMAIL_SENDER'), admin_emails) + email_status = send_email(subject, body_text, os.getenv('EMAIL_SENDER'), admin_emails, bcc_emails) print("email_status: ", email_status) return "File uploaded to S3. Email sent to admins." diff --git a/ai_ta_backend/main.py b/ai_ta_backend/main.py index 4a5a140e..60d423a8 100644 --- a/ai_ta_backend/main.py +++ b/ai_ta_backend/main.py @@ -631,6 +631,35 @@ def export_convo_history(): return response +@app.route('/exportDocuments', methods=['GET']) +def exportDocuments(): + course_name: str = request.args.get('course_name', default='', type=str) + from_date: str = request.args.get('from_date', default='', type=str) + to_date: str = request.args.get('to_date', default='', type=str) + + if course_name == '': + # proper web error "400 Bad request" + abort(400, description=f"Missing required parameter: 'course_name' must be provided. Course name: `{course_name}`") + + export_status = export_documents_json(course_name, from_date, to_date) + print("EXPORT FILE LINKS: ", export_status) + + if export_status['response'] == "No data found between the given dates.": + response = Response(status=204) + response.headers.add('Access-Control-Allow-Origin', '*') + + elif export_status['response'] == "Download from S3": + response = jsonify({"response": "Download from S3", "s3_path": export_status['s3_path']}) + response.headers.add('Access-Control-Allow-Origin', '*') + + else: + response = make_response(send_from_directory(export_status['response'][2], export_status['response'][1], as_attachment=True)) + response.headers.add('Access-Control-Allow-Origin', '*') + response.headers["Content-Disposition"] = f"attachment; filename={export_status['response'][1]}" + os.remove(export_status['response'][0]) + + return response + @app.route('/getTopContextsWithMQR', methods=['GET']) def getTopContextsWithMQR() -> Response: From 20e048c59eef78715d9e02307a2d49497a1a845e Mon Sep 17 00:00:00 2001 From: star-nox Date: Wed, 28 Feb 2024 11:20:59 -0600 Subject: [PATCH 2/6] /onResponseCompletion fix --- ai_ta_backend/main.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/ai_ta_backend/main.py b/ai_ta_backend/main.py index 60d423a8..dae7ef0e 100644 --- a/ai_ta_backend/main.py +++ b/ai_ta_backend/main.py @@ -578,12 +578,9 @@ def createDocumentMap(): @app.route('/onResponseCompletion', methods=['POST']) def logToNomic(): - # data = request.get_json() - # course_name = data['course_name'] - # conversation = data['conversation'] - - course_name: str = request.args.get('course_name', default='', type=str) - conversation: str = request.args.get('conversation', default='', type=str) + data = request.get_json() + course_name = data['course_name'] + conversation = data['conversation'] if course_name == '' or conversation == '': # proper web error "400 Bad request" @@ -595,8 +592,7 @@ def logToNomic(): print(f"In /onResponseCompletion for course: {course_name}") # background execution of tasks!! - #response = executor.submit(log_convo_to_nomic, course_name, data) - response = executor.submit(log_convo_to_nomic, course_name, conversation) + response = executor.submit(log_convo_to_nomic, course_name, data) response = jsonify({'outcome': 'success'}) response.headers.add('Access-Control-Allow-Origin', '*') return response From f8f6c6e8cbb3534999ab1e6312410deff58a2f06 Mon Sep 17 00:00:00 2001 From: star-nox Date: Wed, 28 Feb 2024 11:35:45 -0600 Subject: [PATCH 3/6] testing data type in log function --- ai_ta_backend/nomic_logging.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ai_ta_backend/nomic_logging.py b/ai_ta_backend/nomic_logging.py index 146b572c..39c24705 100644 --- a/ai_ta_backend/nomic_logging.py +++ b/ai_ta_backend/nomic_logging.py @@ -80,6 +80,7 @@ def log_convo_to_nomic(course_name: str, conversation) -> str: print(f"in log_convo_to_nomic() for course: {course_name}") print("type of conversation:", type(conversation)) + print("conversation:", conversation) #conversation = json.loads(conversation) messages = conversation['conversation']['messages'] user_email = conversation['conversation']['user_email'] From 902edf83b80efa1bd062d4930afc67ee34c3baaa Mon Sep 17 00:00:00 2001 From: star-nox Date: Wed, 28 Feb 2024 13:05:13 -0600 Subject: [PATCH 4/6] added condition when user email is unavailable --- ai_ta_backend/nomic_logging.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ai_ta_backend/nomic_logging.py b/ai_ta_backend/nomic_logging.py index 39c24705..72e385cd 100644 --- a/ai_ta_backend/nomic_logging.py +++ b/ai_ta_backend/nomic_logging.py @@ -83,7 +83,10 @@ def log_convo_to_nomic(course_name: str, conversation) -> str: print("conversation:", conversation) #conversation = json.loads(conversation) messages = conversation['conversation']['messages'] - user_email = conversation['conversation']['user_email'] + if 'user_email' not in conversation['conversation']: + user_email = "NULL" + else: + user_email = conversation['conversation']['user_email'] conversation_id = conversation['conversation']['id'] # we have to upload whole conversations From eeff121ca5858b4d545c3a66248219699fc55a43 Mon Sep 17 00:00:00 2001 From: star-nox Date: Wed, 28 Feb 2024 13:15:55 -0600 Subject: [PATCH 5/6] removed test print statements --- ai_ta_backend/nomic_logging.py | 1 - 1 file changed, 1 deletion(-) diff --git a/ai_ta_backend/nomic_logging.py b/ai_ta_backend/nomic_logging.py index 72e385cd..cf5bc699 100644 --- a/ai_ta_backend/nomic_logging.py +++ b/ai_ta_backend/nomic_logging.py @@ -80,7 +80,6 @@ def log_convo_to_nomic(course_name: str, conversation) -> str: print(f"in log_convo_to_nomic() for course: {course_name}") print("type of conversation:", type(conversation)) - print("conversation:", conversation) #conversation = json.loads(conversation) messages = conversation['conversation']['messages'] if 'user_email' not in conversation['conversation']: From 882da25db86f15e30988c3787b6e79388eee13b2 Mon Sep 17 00:00:00 2001 From: Kastan Day Date: Tue, 5 Mar 2024 11:14:27 -0800 Subject: [PATCH 6/6] Remove ray from app. TBD refactor to asyncio --- ai_ta_backend/filtering_contexts.py | 409 +++++++++++++--------------- ai_ta_backend/main.py | 17 +- ai_ta_backend/vector_database.py | 165 +++++------ requirements.txt | 2 +- run.sh | 2 +- 5 files changed, 291 insertions(+), 304 deletions(-) diff --git a/ai_ta_backend/filtering_contexts.py b/ai_ta_backend/filtering_contexts.py index 8d9d7131..476df3d0 100644 --- a/ai_ta_backend/filtering_contexts.py +++ b/ai_ta_backend/filtering_contexts.py @@ -1,213 +1,196 @@ -import json -import os -import threading -import time -from typing import Optional - -import openai -import ray -import requests -# from langchain import hub -# import replicate -from posthog import Posthog -import sentry_sdk - -# from dotenv import load_dotenv -# load_dotenv(override=True) -# from transformers import AutoTokenizer -# tokenizer = AutoTokenizer.from_pretrained("HuggingFaceH4/zephyr-7b-beta") - -filter_unrelated_contexts_zephyr = """<|system|> -You are an expert at determining if a passage is relevant and helpful for answering a question. -To be valuable, a passage must have at least some amount of useful and meaningful information with more than a passing mention of the topic. -As part of your thinking process, you first write a few sentences evaluating the utility of the passage, given the question we're trying to answer. Limit yourself to writing only a sentence or two, no more. -Finally, you must submit your final answer by adding two newline characters then "Yes." or "No." or "I don't know.". Provide a single answer only. Providing multiple final results will disqualify you. -Here's a template code snippet of how it should work (with placeholder variables): -``` -Passage: -Question: -Your evaluation of the utility of the passage: - - -Final answer: -<|user|> -Passage: {context} -Question: {user_query} -Your evaluation of the utility of the passage: -<|assistant|>""" - - -@ray.remote -class AsyncActor: - - def filter_context(self, context, user_query, langsmith_prompt_obj): - final_prompt = str(langsmith_prompt_obj.format(context=context, user_query=user_query)) - # print(f"-------\nfinal_prompt:\n{final_prompt}\n^^^^^^^^^^^^^") - try: - # completion = run_caii_hosted_llm(final_prompt) - # completion = run_replicate(final_prompt) - completion = run_anyscale(final_prompt) - return {"completion": completion, "context": context} - except Exception as e: - sentry_sdk.capture_exception(e) - print(f"Error: {e}") - - -def run_caii_hosted_llm(prompt, max_tokens=300, temp=0.3, **kwargs): - """ - Local LLMs USAGE DOCS: https://kastanday.notion.site/LLM-Serving-on-prem-OpenAI-Clone-bb06028266d842b0872465f552684177 ## - """ - - url = "http://api.kastan.ai/v1/completions?model=HuggingFaceH4/zephyr-7b-alpha" - headers = {'Content-Type': 'application/json'} - data = {"prompt": prompt, "max_tokens": max_tokens, "temperature": temp, **kwargs} - - response = None - try: - response = requests.post(url, headers=headers, data=json.dumps(data), timeout=180) - return response.json()['choices'][0]['text'] - except Exception as e: - sentry_sdk.capture_exception(e) - # Probably cuda OOM error. - response_content = response.json() if response else "No response" - raise ValueError( - f"🚫🚫🚫 Failed inference attempt. Response: {response_content}\nError: {e}\nPromt that caused error: {prompt}" - ) from e - - -def run_replicate(prompt): - output = None - # output = replicate.run("tomasmcm/zephyr-7b-beta:961cd6665b811d0c43c0b9488b6dfa85ff5c7bfb875e93b4533e4c7f96c7c526", - # input={ - # "top_k": 50, - # "top_p": 0.95, - # "prompt": prompt, - # "temperature": 0.3, - # "max_new_tokens": 250, - # "presence_penalty": 1 - # }) - print(output) - return output - - -def run_anyscale(prompt, model_name="HuggingFaceH4/zephyr-7b-beta"): - start_time = time.monotonic() - ret = openai.ChatCompletion.create( - api_base="https://api.endpoints.anyscale.com/v1", - api_key=os.environ["ANYSCALE_ENDPOINT_TOKEN"], - api_type="openai", - # model="mistralai/Mistral-7B-Instruct-v0.1", - model="HuggingFaceH4/zephyr-7b-beta", - messages=[{ - "role": "system", - "content": "You are a helpful assistant." - }, { - "role": "user", - "content": prompt - }], - temperature=0.3, - max_tokens=250, - ) - - output = ret["choices"][0]["message"]["content"] # type: ignore - print("Response from Anyscale:", output[:150]) - - # input_length = len(tokenizer.encode(prompt)) - # output_length = len(tokenizer.encode(output)) - # Input tokens {input_length}, output tokens: {output_length}" - print(f"^^^^ one anyscale call Runtime: {(time.monotonic() - start_time):.2f} seconds.") - return output - - -def parse_result(result: str): - lines = result.split('\n') - for line in lines: - if 'Final answer' in line: - return 'yes' in line.lower() - return False - - -def filter_top_contexts(contexts, - user_query: str, - timeout: Optional[float] = None, - max_concurrency: Optional[int] = 180): - - print("⏰⏰⏰ Starting filter_top_contexts() ⏰⏰⏰") - - timeout = timeout or float(os.environ["FILTER_TOP_CONTEXTS_TIMEOUT_SECONDS"]) - # langsmith_prompt_obj = hub.pull("kastanday/filter-unrelated-contexts-zephyr") # TOO UNSTABLE, service offline - langsmith_prompt_obj = filter_unrelated_contexts_zephyr - posthog = Posthog(sync_mode=True, project_api_key=os.environ['POSTHOG_API_KEY'], host='https://app.posthog.com') - - print("NUM ACTIVE THREADS (top of filtering_contexts):", threading.active_count()) - - max_concurrency = min(100, len(contexts)) - print("max_concurrency is max of 100, or len(contexts), whichever is less ---- Max concurrency:", max_concurrency) - print("Num contexts to filter:", len(contexts)) - - # START TASKS - actor = AsyncActor.options(max_concurrency=max_concurrency, num_cpus=0.001).remote() # type: ignore - result_futures = [actor.filter_context.remote(c, user_query, langsmith_prompt_obj) for c in contexts] - - start_time = time.monotonic() - done_tasks, in_progress = ray.wait(result_futures, - num_returns=len(result_futures), - timeout=timeout, - fetch_local=False) - - print("NUM ACTIVE THREADS (before cleanup filtering_contexts):", threading.active_count()) - # Cleanup - for task in in_progress: - ray.cancel(task) - results = ray.get(done_tasks) - print("NUM ACTIVE THREADS (before kill filtering_contexts):", threading.active_count()) - ray.kill(actor) - print("NUM ACTIVE THREADS (after kill filtering_contexts):", threading.active_count()) - - best_contexts_to_keep = [ - r['context'] for r in results if r and 'context' in r and 'completion' in r and parse_result(r['completion']) - ] - - print("🧠🧠 TOTAL DOCS PROCESSED BY ANYSCALE FILTERING:", len(results)) - print("🧠🧠 TOTAL DOCS KEPT, AFTER FILTERING:", len(best_contexts_to_keep)) - mqr_runtime = round(time.monotonic() - start_time, 2) - print(f"⏰ Total elapsed time: {mqr_runtime} seconds") - - posthog.capture('distinct_id_of_the_user', - event='filter_top_contexts', - properties={ - 'user_query': user_query, - 'course_name': contexts[0].metadata.get('course_name', None), - 'percent_kept': len(best_contexts_to_keep) / max(1, len(results)), - 'total_docs_processed': len(results), - 'total_docs_kept': len(best_contexts_to_keep), - 'MQR_total_runtime_sec': mqr_runtime, - }) - posthog.shutdown() - return best_contexts_to_keep - - -def run_main(): - start_time = time.monotonic() - # final_passage_list = filter_top_contexts(contexts=CONTEXTS * 2, user_query=USER_QUERY) - # print("✅✅✅ TOTAL included in results: ", len(final_passage_list)) - print(f"⏰⏰⏰ Runtime: {(time.monotonic() - start_time):.2f} seconds") - # print("Total contexts:", len(CONTEXTS) * 2) - - -# ! CONDA ENV: llm-serving -if __name__ == "__main__": - run_main() +# import json +# import os +# import threading +# import time +# from typing import Optional + +# import openai +# import ray +# import requests +# from posthog import Posthog +# import sentry_sdk + +# filter_unrelated_contexts_zephyr = """<|system|> +# You are an expert at determining if a passage is relevant and helpful for answering a question. +# To be valuable, a passage must have at least some amount of useful and meaningful information with more than a passing mention of the topic. +# As part of your thinking process, you first write a few sentences evaluating the utility of the passage, given the question we're trying to answer. Limit yourself to writing only a sentence or two, no more. +# Finally, you must submit your final answer by adding two newline characters then "Yes." or "No." or "I don't know.". Provide a single answer only. Providing multiple final results will disqualify you. +# Here's a template code snippet of how it should work (with placeholder variables): +# ``` +# Passage: +# Question: +# Your evaluation of the utility of the passage: + +# Final answer: +# <|user|> +# Passage: {context} +# Question: {user_query} +# Your evaluation of the utility of the passage: +# <|assistant|>""" + +# @ray.remote +# class AsyncActor: + +# def filter_context(self, context, user_query, langsmith_prompt_obj): +# final_prompt = str(langsmith_prompt_obj.format(context=context, user_query=user_query)) +# # print(f"-------\nfinal_prompt:\n{final_prompt}\n^^^^^^^^^^^^^") +# try: +# # completion = run_caii_hosted_llm(final_prompt) +# # completion = run_replicate(final_prompt) +# completion = run_anyscale(final_prompt) +# return {"completion": completion, "context": context} +# except Exception as e: +# sentry_sdk.capture_exception(e) +# print(f"Error: {e}") + +# def run_caii_hosted_llm(prompt, max_tokens=300, temp=0.3, **kwargs): +# """ +# Local LLMs USAGE DOCS: https://kastanday.notion.site/LLM-Serving-on-prem-OpenAI-Clone-bb06028266d842b0872465f552684177 ## +# """ + +# url = "http://api.kastan.ai/v1/completions?model=HuggingFaceH4/zephyr-7b-alpha" +# headers = {'Content-Type': 'application/json'} +# data = {"prompt": prompt, "max_tokens": max_tokens, "temperature": temp, **kwargs} + +# response = None +# try: +# response = requests.post(url, headers=headers, data=json.dumps(data), timeout=180) +# return response.json()['choices'][0]['text'] +# except Exception as e: +# sentry_sdk.capture_exception(e) +# # Probably cuda OOM error. +# response_content = response.json() if response else "No response" +# raise ValueError( +# f"🚫🚫🚫 Failed inference attempt. Response: {response_content}\nError: {e}\nPromt that caused error: {prompt}" +# ) from e + +# def run_replicate(prompt): +# output = None +# # output = replicate.run("tomasmcm/zephyr-7b-beta:961cd6665b811d0c43c0b9488b6dfa85ff5c7bfb875e93b4533e4c7f96c7c526", +# # input={ +# # "top_k": 50, +# # "top_p": 0.95, +# # "prompt": prompt, +# # "temperature": 0.3, +# # "max_new_tokens": 250, +# # "presence_penalty": 1 +# # }) +# print(output) +# return output + +# def run_anyscale(prompt, model_name="HuggingFaceH4/zephyr-7b-beta"): +# start_time = time.monotonic() +# ret = openai.ChatCompletion.create( +# api_base="https://api.endpoints.anyscale.com/v1", +# api_key=os.environ["ANYSCALE_ENDPOINT_TOKEN"], +# api_type="openai", +# # model="mistralai/Mistral-7B-Instruct-v0.1", +# model="HuggingFaceH4/zephyr-7b-beta", +# messages=[{ +# "role": "system", +# "content": "You are a helpful assistant." +# }, { +# "role": "user", +# "content": prompt +# }], +# temperature=0.3, +# max_tokens=250, +# ) + +# output = ret["choices"][0]["message"]["content"] # type: ignore +# print("Response from Anyscale:", output[:150]) + +# # input_length = len(tokenizer.encode(prompt)) +# # output_length = len(tokenizer.encode(output)) +# # Input tokens {input_length}, output tokens: {output_length}" +# print(f"^^^^ one anyscale call Runtime: {(time.monotonic() - start_time):.2f} seconds.") +# return output + +# def parse_result(result: str): +# lines = result.split('\n') +# for line in lines: +# if 'Final answer' in line: +# return 'yes' in line.lower() +# return False + +# def filter_top_contexts(contexts, +# user_query: str, +# timeout: Optional[float] = None, +# max_concurrency: Optional[int] = 180): + +# print("⏰⏰⏰ Starting filter_top_contexts() ⏰⏰⏰") + +# timeout = timeout or float(os.environ["FILTER_TOP_CONTEXTS_TIMEOUT_SECONDS"]) +# # langsmith_prompt_obj = hub.pull("kastanday/filter-unrelated-contexts-zephyr") # TOO UNSTABLE, service offline +# langsmith_prompt_obj = filter_unrelated_contexts_zephyr +# posthog = Posthog(sync_mode=True, project_api_key=os.environ['POSTHOG_API_KEY'], host='https://app.posthog.com') + +# print("NUM ACTIVE THREADS (top of filtering_contexts):", threading.active_count()) + +# max_concurrency = min(100, len(contexts)) +# print("max_concurrency is max of 100, or len(contexts), whichever is less ---- Max concurrency:", max_concurrency) +# print("Num contexts to filter:", len(contexts)) + +# # START TASKS +# actor = AsyncActor.options(max_concurrency=max_concurrency, num_cpus=0.001).remote() # type: ignore +# result_futures = [actor.filter_context.remote(c, user_query, langsmith_prompt_obj) for c in contexts] + +# start_time = time.monotonic() +# done_tasks, in_progress = ray.wait(result_futures, +# num_returns=len(result_futures), +# timeout=timeout, +# fetch_local=False) + +# print("NUM ACTIVE THREADS (before cleanup filtering_contexts):", threading.active_count()) +# # Cleanup +# for task in in_progress: +# ray.cancel(task) +# results = ray.get(done_tasks) +# print("NUM ACTIVE THREADS (before kill filtering_contexts):", threading.active_count()) +# ray.kill(actor) +# print("NUM ACTIVE THREADS (after kill filtering_contexts):", threading.active_count()) + +# best_contexts_to_keep = [ +# r['context'] for r in results if r and 'context' in r and 'completion' in r and parse_result(r['completion']) +# ] + +# print("🧠🧠 TOTAL DOCS PROCESSED BY ANYSCALE FILTERING:", len(results)) +# print("🧠🧠 TOTAL DOCS KEPT, AFTER FILTERING:", len(best_contexts_to_keep)) +# mqr_runtime = round(time.monotonic() - start_time, 2) +# print(f"⏰ Total elapsed time: {mqr_runtime} seconds") + +# posthog.capture('distinct_id_of_the_user', +# event='filter_top_contexts', +# properties={ +# 'user_query': user_query, +# 'course_name': contexts[0].metadata.get('course_name', None), +# 'percent_kept': len(best_contexts_to_keep) / max(1, len(results)), +# 'total_docs_processed': len(results), +# 'total_docs_kept': len(best_contexts_to_keep), +# 'MQR_total_runtime_sec': mqr_runtime, +# }) +# posthog.shutdown() +# return best_contexts_to_keep + +# def run_main(): +# start_time = time.monotonic() +# # final_passage_list = filter_top_contexts(contexts=CONTEXTS * 2, user_query=USER_QUERY) +# # print("✅✅✅ TOTAL included in results: ", len(final_passage_list)) +# print(f"⏰⏰⏰ Runtime: {(time.monotonic() - start_time):.2f} seconds") +# # print("Total contexts:", len(CONTEXTS) * 2) + +# # ! CONDA ENV: llm-serving +# if __name__ == "__main__": +# run_main() diff --git a/ai_ta_backend/main.py b/ai_ta_backend/main.py index dae7ef0e..3c87f04b 100644 --- a/ai_ta_backend/main.py +++ b/ai_ta_backend/main.py @@ -6,7 +6,6 @@ import requests from threading import Thread - from dotenv import load_dotenv from flask import ( Flask, @@ -21,7 +20,7 @@ from flask_cors import CORS from flask_executor import Executor from posthog import Posthog -import ray +# import ray import sentry_sdk from ai_ta_backend.canvas import CanvasAPI @@ -50,7 +49,7 @@ # load API keys from globally-availabe .env file load_dotenv() -ray.init() +# ray.init() print("NUM ACTIVE THREADS (top of main):", threading.active_count()) @@ -561,6 +560,7 @@ def nomic_map(): response.headers.add('Access-Control-Allow-Origin', '*') return response + @app.route('/createDocumentMap', methods=['GET']) def createDocumentMap(): course_name: str = request.args.get('course_name', default='', type=str) @@ -620,13 +620,15 @@ def export_convo_history(): response.headers.add('Access-Control-Allow-Origin', '*') else: - response = make_response(send_from_directory(export_status['response'][2], export_status['response'][1], as_attachment=True)) + response = make_response( + send_from_directory(export_status['response'][2], export_status['response'][1], as_attachment=True)) response.headers.add('Access-Control-Allow-Origin', '*') response.headers["Content-Disposition"] = f"attachment; filename={export_status['response'][1]}" os.remove(export_status['response'][0]) - + return response + @app.route('/exportDocuments', methods=['GET']) def exportDocuments(): course_name: str = request.args.get('course_name', default='', type=str) @@ -649,11 +651,12 @@ def exportDocuments(): response.headers.add('Access-Control-Allow-Origin', '*') else: - response = make_response(send_from_directory(export_status['response'][2], export_status['response'][1], as_attachment=True)) + response = make_response( + send_from_directory(export_status['response'][2], export_status['response'][1], as_attachment=True)) response.headers.add('Access-Control-Allow-Origin', '*') response.headers["Content-Disposition"] = f"attachment; filename={export_status['response'][1]}" os.remove(export_status['response'][0]) - + return response diff --git a/ai_ta_backend/vector_database.py b/ai_ta_backend/vector_database.py index daab6c6e..739d5b16 100644 --- a/ai_ta_backend/vector_database.py +++ b/ai_ta_backend/vector_database.py @@ -42,13 +42,12 @@ from pydub import AudioSegment from qdrant_client import QdrantClient, models from qdrant_client.models import PointStruct -from langchain.schema.output_parser import StrOutputParser from ai_ta_backend.aws import upload_data_files_to_s3 from ai_ta_backend.extreme_context_stuffing import OpenAIAPIProcessor from ai_ta_backend.utils_tokenization import count_tokens_and_cost -from ai_ta_backend.context_parent_doc_padding import context_parent_doc_padding -from ai_ta_backend.filtering_contexts import filter_top_contexts +# from ai_ta_backend.context_parent_doc_padding import context_parent_doc_padding +# from ai_ta_backend.filtering_contexts import filter_top_contexts from ai_ta_backend.nomic_logging import log_to_document_map, delete_from_document_map MULTI_QUERY_PROMPT = hub.pull("langchain-ai/rag-fusion-query-generation") @@ -1356,85 +1355,87 @@ def getTopContextsWithMQR(self, 4. [CANCELED BEC POINTLESS] Rank the docs based on the relevance score. 5. Parent-doc-retrieval: Pad just the top 5 docs with expanded context from the original document. """ - try: - top_n_per_query = 40 # HARD CODE TO ENSURE WE HIT THE MAX TOKENS - start_time_overall = time.monotonic() - mq_start_time = time.monotonic() - - # 1. GENERATE MULTIPLE QUERIES - generate_queries = ( - MULTI_QUERY_PROMPT | self.llm | StrOutputParser() | (lambda x: x.split("\n")) | - (lambda x: list(filter(None, x))) # filter out non-empty strings - ) - - generated_queries = generate_queries.invoke({"original_query": search_query}) - print("generated_queries", generated_queries) - - # 2. VECTOR SEARCH FOR EACH QUERY - batch_found_docs_nested: list[list[Document]] = self.batch_vector_search(search_queries=generated_queries, - course_name=course_name, - top_n=top_n_per_query) - - # 3. RANK REMAINING DOCUMENTS -- good for parent doc padding of top 5 at the end. - found_docs = self.reciprocal_rank_fusion(batch_found_docs_nested) - found_docs = [doc for doc, score in found_docs] - print(f"Num docs after re-ranking: {len(found_docs)}") - if len(found_docs) == 0: - return [] - print(f"⏰ Total multi-query processing runtime: {(time.monotonic() - mq_start_time):.2f} seconds") - - # 4. FILTER DOCS - filtered_docs = filter_top_contexts(contexts=found_docs, user_query=search_query, timeout=30, max_concurrency=180) - if len(filtered_docs) == 0: - return [] - - # 5. TOP DOC CONTEXT PADDING // parent document retriever - final_docs = context_parent_doc_padding(filtered_docs, search_query, course_name) - print(f"Number of final docs after context padding: {len(final_docs)}") - - pre_prompt = "Please answer the following question. Use the context below, called your documents, only if it's helpful and don't use parts that are very irrelevant. It's good to quote from your documents directly, when you do always use Markdown footnotes for citations. Use react-markdown superscript to number the sources at the end of sentences (1, 2, 3...) and use react-markdown Footnotes to list the full document names for each number. Use ReactMarkdown aka 'react-markdown' formatting for super script citations, use semi-formal style. Feel free to say you don't know. \nHere's a few passages of the high quality documents:\n" - token_counter, _ = count_tokens_and_cost(pre_prompt + '\n\nNow please respond to my query: ' + - search_query) # type: ignore - - valid_docs = [] - num_tokens = 0 - for doc in final_docs: - doc_string = f"Document: {doc['readable_filename']}{', page: ' + str(doc['pagenumber']) if doc['pagenumber'] else ''}\n{str(doc['text'])}\n" - num_tokens, prompt_cost = count_tokens_and_cost(doc_string) # type: ignore - - print(f"token_counter: {token_counter}, num_tokens: {num_tokens}, max_tokens: {token_limit}") - if token_counter + num_tokens <= token_limit: - token_counter += num_tokens - valid_docs.append(doc) - else: - # filled our token size, time to return - break - - print(f"Total tokens used: {token_counter} Used {len(valid_docs)} of total unique docs {len(found_docs)}.") - print(f"Course: {course_name} ||| search_query: {search_query}") - print(f"⏰ ^^ Runtime of getTopContextsWithMQR: {(time.monotonic() - start_time_overall):.2f} seconds") - - if len(valid_docs) == 0: - return [] - - self.posthog.capture('distinct_id_of_the_user', - event='filter_top_contexts_succeeded', - properties={ - 'user_query': search_query, - 'course_name': course_name, - 'token_limit': token_limit, - 'total_tokens_used': token_counter, - 'total_contexts_used': len(valid_docs), - 'total_unique_docs_retrieved': len(found_docs), - }) - - return self.format_for_json_mqr(valid_docs) - except Exception as e: - # return full traceback to front end - err: str = f"ERROR: In /getTopContextsWithMQR. Course: {course_name} ||| search_query: {search_query}\nTraceback: {traceback.format_exc()}❌❌ Error in {inspect.currentframe().f_code.co_name}:\n{e}" # type: ignore - print(err) - sentry_sdk.capture_exception(e) - return err + return 'fail' + + # try: + # top_n_per_query = 40 # HARD CODE TO ENSURE WE HIT THE MAX TOKENS + # start_time_overall = time.monotonic() + # mq_start_time = time.monotonic() + + # # 1. GENERATE MULTIPLE QUERIES + # generate_queries = ( + # MULTI_QUERY_PROMPT | self.llm | StrOutputParser() | (lambda x: x.split("\n")) | + # (lambda x: list(filter(None, x))) # filter out non-empty strings + # ) + + # generated_queries = generate_queries.invoke({"original_query": search_query}) + # print("generated_queries", generated_queries) + + # # 2. VECTOR SEARCH FOR EACH QUERY + # batch_found_docs_nested: list[list[Document]] = self.batch_vector_search(search_queries=generated_queries, + # course_name=course_name, + # top_n=top_n_per_query) + + # # 3. RANK REMAINING DOCUMENTS -- good for parent doc padding of top 5 at the end. + # found_docs = self.reciprocal_rank_fusion(batch_found_docs_nested) + # found_docs = [doc for doc, score in found_docs] + # print(f"Num docs after re-ranking: {len(found_docs)}") + # if len(found_docs) == 0: + # return [] + # print(f"⏰ Total multi-query processing runtime: {(time.monotonic() - mq_start_time):.2f} seconds") + + # # 4. FILTER DOCS + # filtered_docs = filter_top_contexts(contexts=found_docs, user_query=search_query, timeout=30, max_concurrency=180) + # if len(filtered_docs) == 0: + # return [] + + # # 5. TOP DOC CONTEXT PADDING // parent document retriever + # final_docs = context_parent_doc_padding(filtered_docs, search_query, course_name) + # print(f"Number of final docs after context padding: {len(final_docs)}") + + # pre_prompt = "Please answer the following question. Use the context below, called your documents, only if it's helpful and don't use parts that are very irrelevant. It's good to quote from your documents directly, when you do always use Markdown footnotes for citations. Use react-markdown superscript to number the sources at the end of sentences (1, 2, 3...) and use react-markdown Footnotes to list the full document names for each number. Use ReactMarkdown aka 'react-markdown' formatting for super script citations, use semi-formal style. Feel free to say you don't know. \nHere's a few passages of the high quality documents:\n" + # token_counter, _ = count_tokens_and_cost(pre_prompt + '\n\nNow please respond to my query: ' + + # search_query) # type: ignore + + # valid_docs = [] + # num_tokens = 0 + # for doc in final_docs: + # doc_string = f"Document: {doc['readable_filename']}{', page: ' + str(doc['pagenumber']) if doc['pagenumber'] else ''}\n{str(doc['text'])}\n" + # num_tokens, prompt_cost = count_tokens_and_cost(doc_string) # type: ignore + + # print(f"token_counter: {token_counter}, num_tokens: {num_tokens}, max_tokens: {token_limit}") + # if token_counter + num_tokens <= token_limit: + # token_counter += num_tokens + # valid_docs.append(doc) + # else: + # # filled our token size, time to return + # break + + # print(f"Total tokens used: {token_counter} Used {len(valid_docs)} of total unique docs {len(found_docs)}.") + # print(f"Course: {course_name} ||| search_query: {search_query}") + # print(f"⏰ ^^ Runtime of getTopContextsWithMQR: {(time.monotonic() - start_time_overall):.2f} seconds") + + # if len(valid_docs) == 0: + # return [] + + # self.posthog.capture('distinct_id_of_the_user', + # event='filter_top_contexts_succeeded', + # properties={ + # 'user_query': search_query, + # 'course_name': course_name, + # 'token_limit': token_limit, + # 'total_tokens_used': token_counter, + # 'total_contexts_used': len(valid_docs), + # 'total_unique_docs_retrieved': len(found_docs), + # }) + + # return self.format_for_json_mqr(valid_docs) + # except Exception as e: + # # return full traceback to front end + # err: str = f"ERROR: In /getTopContextsWithMQR. Course: {course_name} ||| search_query: {search_query}\nTraceback: {traceback.format_exc()}❌❌ Error in {inspect.currentframe().f_code.co_name}:\n{e}" # type: ignore + # print(err) + # sentry_sdk.capture_exception(e) + # return err def format_for_json_mqr(self, found_docs) -> List[Dict]: """ diff --git a/requirements.txt b/requirements.txt index acc2eed2..f4503824 100644 --- a/requirements.txt +++ b/requirements.txt @@ -53,7 +53,7 @@ unstructured==0.10.29 # causes huge ~5.3 GB of installs. Probbably from onnx: ht # Not currently supporting coursera ingest # cs-dlp @ git+https://github.com/raffaem/cs-dlp.git@0.12.0b0 # previously called coursera-dl pydantic==1.10.13 # pydantic v1 works better for ray -ray==2.8.1 posthog==3.1.0 sentry-sdk==1.39.1 +# ray==2.8.1 # newrelic==9.3.0 \ No newline at end of file diff --git a/run.sh b/run.sh index 20a23b02..0d77691a 100755 --- a/run.sh +++ b/run.sh @@ -3,6 +3,6 @@ # Docs https://docs.gunicorn.org/en/stable/settings.html#workers # 200 MB object store memory.. necessary to statically allocate or will crash in Railway env restrictions. -ray start --head --num-cpus 6 --object-store-memory 300000000 +# ray start --head --num-cpus 6 --object-store-memory 300000000 export PYTHONPATH=${PYTHONPATH}:$(pwd)/ai_ta_backend exec gunicorn --workers=6 --threads=20000 --worker-class=gthread ai_ta_backend.main:app --timeout 1800