diff --git a/ai_ta_backend/executors/flask_executor.py b/ai_ta_backend/executors/flask_executor.py new file mode 100644 index 00000000..b9a78540 --- /dev/null +++ b/ai_ta_backend/executors/flask_executor.py @@ -0,0 +1,23 @@ +from flask_executor import Executor +from injector import inject + + +class ExecutorInterface: + + def submit(self, fn, *args, **kwargs): + raise NotImplementedError + + +class FlaskExecutorAdapter(ExecutorInterface): + """ + Adapter for Flask Executor, suitable for I/O-bound tasks that benefit from asynchronous execution. + Use this executor for tasks that involve waiting for I/O operations (e.g., network requests, file I/O), + where the overhead of creating new threads or processes is justified by the time spent waiting. + """ + + @inject + def __init__(self, executor: Executor): + self.executor = executor + + def submit(self, fn, *args, **kwargs): + return self.executor.submit(fn, *args, **kwargs) diff --git a/ai_ta_backend/executors/process_pool_executor.py b/ai_ta_backend/executors/process_pool_executor.py new file mode 100644 index 00000000..08464017 --- /dev/null +++ b/ai_ta_backend/executors/process_pool_executor.py @@ -0,0 +1,33 @@ +from concurrent.futures import ProcessPoolExecutor + +from injector import inject + + +class ProcessPoolExecutorInterface: + + def submit(self, fn, *args, **kwargs): + raise NotImplementedError + + +class ProcessPoolExecutorAdapter(ProcessPoolExecutorInterface): + """ + Adapter for Python's ProcessPoolExecutor, suitable for CPU-bound tasks that benefit from parallel execution. + Use this executor for tasks that require significant computation and can be efficiently parallelized across multiple CPUs. + Not for I/O-bound tasks like database queries, file I/O, or network requests, as the overhead of creating and managing processes can outweigh the benefits. + + This executor is ideal for scenarios where the task execution time would significantly benefit from being distributed + across multiple processes, thereby bypassing the GIL (Global Interpreter Lock) and utilizing multiple CPU cores. + + Note: ProcessPoolExecutor is best used with tasks that are relatively heavy and can be executed independently of each other. + """ + + def __init__(self, max_workers=None): + self.executor = ProcessPoolExecutor(max_workers=max_workers) + + def submit(self, fn, *args, **kwargs): + raise NotImplementedError( + "ProcessPoolExecutorAdapter does not support 'submit' directly due to its nature. Use 'map' or other methods as needed." + ) + + def map(self, fn, *iterables, timeout=None, chunksize=1): + return self.executor.map(fn, *iterables, timeout=timeout, chunksize=chunksize) diff --git a/ai_ta_backend/executors/thread_pool_executor.py b/ai_ta_backend/executors/thread_pool_executor.py new file mode 100644 index 00000000..124ac2b3 --- /dev/null +++ b/ai_ta_backend/executors/thread_pool_executor.py @@ -0,0 +1,31 @@ +from concurrent.futures import ThreadPoolExecutor + +from injector import inject + + +class ThreadPoolExecutorInterface: + + def submit(self, fn, *args, **kwargs): + raise NotImplementedError + + +class ThreadPoolExecutorAdapter(ThreadPoolExecutorInterface): + """ + Adapter for Python's ThreadPoolExecutor, suitable for I/O-bound tasks that can be performed concurrently. + Use this executor for tasks that are largely waiting on I/O operations, such as database queries or file reads, + where the GIL (Global Interpreter Lock) does not become a bottleneck. + + Not for CPU-bound tasks like heavy computation, as the GIL would prevent true parallel execution. + + This executor is particularly useful when you want more control over the number of concurrent threads + than what Flask Executor provides, or when you're not working within a Flask application context. + """ + + def __init__(self, max_workers=None): + self.executor = ThreadPoolExecutor(max_workers=max_workers) + + def submit(self, fn, *args, **kwargs): + return self.executor.submit(fn, *args, **kwargs) + + def map(self, fn, *iterables, timeout=None, chunksize=1): + return self.executor.map(fn, *iterables, timeout=timeout, chunksize=chunksize) diff --git a/ai_ta_backend/main.py b/ai_ta_backend/main.py index d72442af..c5d9cb46 100644 --- a/ai_ta_backend/main.py +++ b/ai_ta_backend/main.py @@ -21,6 +21,18 @@ from ai_ta_backend.database.aws import AWSStorage from ai_ta_backend.database.sql import SQLDatabase from ai_ta_backend.database.vector import VectorDatabase +from ai_ta_backend.executors.flask_executor import ( + ExecutorInterface, + FlaskExecutorAdapter, +) +from ai_ta_backend.executors.process_pool_executor import ( + ProcessPoolExecutorAdapter, + ProcessPoolExecutorInterface, +) +from ai_ta_backend.executors.thread_pool_executor import ( + ThreadPoolExecutorAdapter, + ThreadPoolExecutorInterface, +) from ai_ta_backend.service.export_service import ExportService from ai_ta_backend.service.nomic_service import NomicService from ai_ta_backend.service.posthog_service import PosthogService @@ -130,7 +142,7 @@ def getAll(service: RetrievalService) -> Response: @app.route('/delete', methods=['DELETE']) -def delete(service: RetrievalService): +def delete(service: RetrievalService, flaskExecutor: ExecutorInterface): """ Delete a single file from all our database: S3, Qdrant, and Supabase (for now). Note, of course, we still have parts of that file in our logs. @@ -149,7 +161,7 @@ def delete(service: RetrievalService): start_time = time.monotonic() # background execution of tasks!! - executor.submit(service.delete_data, course_name, s3_path, source_url) + flaskExecutor.submit(service.delete_data, course_name, s3_path, source_url) print(f"From {course_name}, deleted file: {s3_path}") print(f"⏰ Runtime of FULL delete func: {(time.monotonic() - start_time):.2f} seconds") # we need instant return. Delets are "best effort" assume always successful... sigh :( @@ -191,7 +203,7 @@ def createDocumentMap(service: NomicService): @app.route('/onResponseCompletion', methods=['POST']) -def logToNomic(service: NomicService): +def logToNomic(service: NomicService, flaskExecutor: ExecutorInterface): data = request.get_json() course_name = data['course_name'] conversation = data['conversation'] @@ -206,7 +218,7 @@ def logToNomic(service: NomicService): print(f"In /onResponseCompletion for course: {course_name}") # background execution of tasks!! - response = executor.submit(service.log_convo_to_nomic, course_name, data) + response = flaskExecutor.submit(service.log_convo_to_nomic, course_name, data) response = jsonify({'outcome': 'success'}) response.headers.add('Access-Control-Allow-Origin', '*') return response @@ -313,6 +325,9 @@ def configure(binder: Binder) -> None: binder.bind(VectorDatabase, to=VectorDatabase, scope=SingletonScope) binder.bind(SQLDatabase, to=SQLDatabase, scope=SingletonScope) binder.bind(AWSStorage, to=AWSStorage, scope=SingletonScope) + binder.bind(ExecutorInterface, to=FlaskExecutorAdapter(executor), scope=SingletonScope) + binder.bind(ThreadPoolExecutorInterface, to=ThreadPoolExecutorAdapter, scope=SingletonScope) + binder.bind(ProcessPoolExecutorInterface, to=ProcessPoolExecutorAdapter, scope=SingletonScope) FlaskInjector(app=app, modules=[configure])