Skip to content

Commit

Permalink
Added executors for async operations
Browse files Browse the repository at this point in the history
  • Loading branch information
rohan-uiuc committed Mar 7, 2024
1 parent f6a787e commit a14cc44
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 4 deletions.
23 changes: 23 additions & 0 deletions ai_ta_backend/executors/flask_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from flask_executor import Executor
from injector import inject


class ExecutorInterface:

def submit(self, fn, *args, **kwargs):
raise NotImplementedError


class FlaskExecutorAdapter(ExecutorInterface):
"""
Adapter for Flask Executor, suitable for I/O-bound tasks that benefit from asynchronous execution.
Use this executor for tasks that involve waiting for I/O operations (e.g., network requests, file I/O),
where the overhead of creating new threads or processes is justified by the time spent waiting.
"""

@inject
def __init__(self, executor: Executor):
self.executor = executor

def submit(self, fn, *args, **kwargs):
return self.executor.submit(fn, *args, **kwargs)
33 changes: 33 additions & 0 deletions ai_ta_backend/executors/process_pool_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from concurrent.futures import ProcessPoolExecutor

from injector import inject


class ProcessPoolExecutorInterface:

def submit(self, fn, *args, **kwargs):
raise NotImplementedError


class ProcessPoolExecutorAdapter(ProcessPoolExecutorInterface):
"""
Adapter for Python's ProcessPoolExecutor, suitable for CPU-bound tasks that benefit from parallel execution.
Use this executor for tasks that require significant computation and can be efficiently parallelized across multiple CPUs.
Not for I/O-bound tasks like database queries, file I/O, or network requests, as the overhead of creating and managing processes can outweigh the benefits.
This executor is ideal for scenarios where the task execution time would significantly benefit from being distributed
across multiple processes, thereby bypassing the GIL (Global Interpreter Lock) and utilizing multiple CPU cores.
Note: ProcessPoolExecutor is best used with tasks that are relatively heavy and can be executed independently of each other.
"""

def __init__(self, max_workers=None):
self.executor = ProcessPoolExecutor(max_workers=max_workers)

def submit(self, fn, *args, **kwargs):
raise NotImplementedError(
"ProcessPoolExecutorAdapter does not support 'submit' directly due to its nature. Use 'map' or other methods as needed."
)

def map(self, fn, *iterables, timeout=None, chunksize=1):
return self.executor.map(fn, *iterables, timeout=timeout, chunksize=chunksize)
31 changes: 31 additions & 0 deletions ai_ta_backend/executors/thread_pool_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from concurrent.futures import ThreadPoolExecutor

from injector import inject


class ThreadPoolExecutorInterface:

def submit(self, fn, *args, **kwargs):
raise NotImplementedError


class ThreadPoolExecutorAdapter(ThreadPoolExecutorInterface):
"""
Adapter for Python's ThreadPoolExecutor, suitable for I/O-bound tasks that can be performed concurrently.
Use this executor for tasks that are largely waiting on I/O operations, such as database queries or file reads,
where the GIL (Global Interpreter Lock) does not become a bottleneck.
Not for CPU-bound tasks like heavy computation, as the GIL would prevent true parallel execution.
This executor is particularly useful when you want more control over the number of concurrent threads
than what Flask Executor provides, or when you're not working within a Flask application context.
"""

def __init__(self, max_workers=None):
self.executor = ThreadPoolExecutor(max_workers=max_workers)

def submit(self, fn, *args, **kwargs):
return self.executor.submit(fn, *args, **kwargs)

def map(self, fn, *iterables, timeout=None, chunksize=1):
return self.executor.map(fn, *iterables, timeout=timeout, chunksize=chunksize)
23 changes: 19 additions & 4 deletions ai_ta_backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@
from ai_ta_backend.database.aws import AWSStorage
from ai_ta_backend.database.sql import SQLDatabase
from ai_ta_backend.database.vector import VectorDatabase
from ai_ta_backend.executors.flask_executor import (
ExecutorInterface,
FlaskExecutorAdapter,
)
from ai_ta_backend.executors.process_pool_executor import (
ProcessPoolExecutorAdapter,
ProcessPoolExecutorInterface,
)
from ai_ta_backend.executors.thread_pool_executor import (
ThreadPoolExecutorAdapter,
ThreadPoolExecutorInterface,
)
from ai_ta_backend.service.export_service import ExportService
from ai_ta_backend.service.nomic_service import NomicService
from ai_ta_backend.service.posthog_service import PosthogService
Expand Down Expand Up @@ -130,7 +142,7 @@ def getAll(service: RetrievalService) -> Response:


@app.route('/delete', methods=['DELETE'])
def delete(service: RetrievalService):
def delete(service: RetrievalService, flaskExecutor: ExecutorInterface):
"""
Delete a single file from all our database: S3, Qdrant, and Supabase (for now).
Note, of course, we still have parts of that file in our logs.
Expand All @@ -149,7 +161,7 @@ def delete(service: RetrievalService):

start_time = time.monotonic()
# background execution of tasks!!
executor.submit(service.delete_data, course_name, s3_path, source_url)
flaskExecutor.submit(service.delete_data, course_name, s3_path, source_url)
print(f"From {course_name}, deleted file: {s3_path}")
print(f"⏰ Runtime of FULL delete func: {(time.monotonic() - start_time):.2f} seconds")
# we need instant return. Delets are "best effort" assume always successful... sigh :(
Expand Down Expand Up @@ -191,7 +203,7 @@ def createDocumentMap(service: NomicService):


@app.route('/onResponseCompletion', methods=['POST'])
def logToNomic(service: NomicService):
def logToNomic(service: NomicService, flaskExecutor: ExecutorInterface):
data = request.get_json()
course_name = data['course_name']
conversation = data['conversation']
Expand All @@ -206,7 +218,7 @@ def logToNomic(service: NomicService):
print(f"In /onResponseCompletion for course: {course_name}")

# background execution of tasks!!
response = executor.submit(service.log_convo_to_nomic, course_name, data)
response = flaskExecutor.submit(service.log_convo_to_nomic, course_name, data)
response = jsonify({'outcome': 'success'})
response.headers.add('Access-Control-Allow-Origin', '*')
return response
Expand Down Expand Up @@ -313,6 +325,9 @@ def configure(binder: Binder) -> None:
binder.bind(VectorDatabase, to=VectorDatabase, scope=SingletonScope)
binder.bind(SQLDatabase, to=SQLDatabase, scope=SingletonScope)
binder.bind(AWSStorage, to=AWSStorage, scope=SingletonScope)
binder.bind(ExecutorInterface, to=FlaskExecutorAdapter(executor), scope=SingletonScope)
binder.bind(ThreadPoolExecutorInterface, to=ThreadPoolExecutorAdapter, scope=SingletonScope)
binder.bind(ProcessPoolExecutorInterface, to=ProcessPoolExecutorAdapter, scope=SingletonScope)


FlaskInjector(app=app, modules=[configure])
Expand Down

0 comments on commit a14cc44

Please sign in to comment.