diff --git a/requirements.txt b/requirements.txt index 253266a..e975720 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,3 +14,4 @@ prometheus-client==0.20.0 tf-models-official==2.14.1 xlsxwriter==3.2.0 six +Pillow==10.3.0 diff --git a/src/api/batch_predictor.py b/src/api/batch_predictor.py index 63e4d55..08b2d02 100644 --- a/src/api/batch_predictor.py +++ b/src/api/batch_predictor.py @@ -99,6 +99,8 @@ def setup_gpu_environment(gpus: str) -> List[tf.config.PhysicalDevice]: logging.info("Using CPU") tf.config.set_visible_devices(active_gpus, 'GPU') + for gpu in active_gpus: + tf.config.experimental.set_memory_growth(gpu, True) return active_gpus diff --git a/src/api/image_preparator.py b/src/api/image_preparator.py index d78dd9d..ddda421 100644 --- a/src/api/image_preparator.py +++ b/src/api/image_preparator.py @@ -1,6 +1,7 @@ # Imports # > Standard library +from concurrent.futures import ThreadPoolExecutor import json import logging import multiprocessing @@ -10,6 +11,8 @@ import uuid # > Third-party dependencies +import numpy as np +from PIL import Image, ImageOps import tensorflow as tf @@ -308,51 +311,6 @@ def fetch_and_prepare_images(request_queue: multiprocessing.Queue, return num_channels, current_model, metadata, old_whitelist -def prepare_image(image_bytes: bytes, - num_channels: int) -> tf.Tensor: - """ - Prepare a single raw image for batch processing. - - Decodes, resizes, normalizes, pads, and transposes the image for further - processing. - - Parameters - ---------- - image_bytes : bytes - Raw bytes of the image. - num_channels : int - Number of channels desired for the output image (e.g., 1 for grayscale, - 3 for RGB, 4 for RGBA). - - Returns - ------- - tf.Tensor - Prepared image tensor. - """ - - image = tf.io.decode_image(image_bytes, channels=num_channels, - expand_animations=False) - - # Resize while preserving aspect ratio - target_height = 64 - aspect_ratio = tf.shape(image)[1] / tf.shape(image)[0] - target_width = tf.cast(target_height * aspect_ratio, tf.int32) - image = tf.image.resize(image, - [target_height, target_width]) - - image = tf.image.resize_with_pad(image, - target_height, - target_width + 50) - - # Normalize the image and something else - image = 0.5 - (image / 255) - - # Transpose the image - image = tf.transpose(image, perm=[1, 0, 2]) - - return image - - def fetch_metadata(whitelist: list, model_path: str): """ Fetch metadata values based on the whitelist keys from a JSON configuration @@ -415,6 +373,105 @@ def search_key(data, key): return values +def prepare_image(image_bytes: bytes, + num_channels: int) -> np.ndarray: + """ + Prepare a single raw image for batch processing. + + Decodes, resizes, normalizes, pads, and transposes the image for further + processing. + + Parameters + ---------- + image_bytes : bytes + Raw bytes of the image. + num_channels : int + Number of channels desired for the output image (e.g., 1 for grayscale, + 3 for RGB, 4 for RGBA). + + Returns + ------- + np.ndarray + Prepared image array. + """ + + # Load the image using TensorFlow + image = tf.image.decode_image(image_bytes, channels=num_channels, + expand_animations=False) + image = tf.image.convert_image_dtype(image, tf.float32) + + # Convert TensorFlow tensor to PIL Image + if num_channels == 1: + image = tf.squeeze(image, axis=-1) # Remove the channel dimension + image = Image.fromarray( + (image.numpy() * 255).astype(np.uint8), mode='L') + else: + image = Image.fromarray((image.numpy() * 255).astype(np.uint8)) + + # Convert to desired number of channels + if num_channels == 1: + image = image.convert("L") + elif num_channels == 3: + image = image.convert("RGB") + elif num_channels == 4: + image = image.convert("RGBA") + + # Resize while preserving aspect ratio + target_height = 64 + aspect_ratio = image.width / image.height + target_width = int(target_height * aspect_ratio) + image = image.resize((target_width, target_height), Image.BILINEAR) + + # Calculate padding sizes + padding_height = target_height + padding_width = target_width + 50 + + # Create new image with padding, centering the original image + padded_image = ImageOps.pad( + image, (padding_width, padding_height), color=0, centering=(0.5, 0.5)) + + # Normalize the image + image_array = np.array(padded_image) / 255.0 + image_array = 0.5 - image_array + + # Ensure the image has the right number of channels + if num_channels == 1: + image_array = np.expand_dims(image_array, axis=-1) + + # Transpose the image + image_array = np.transpose(image_array, (1, 0, 2)) + + return image_array + + +def pad_images(images: list, num_channels: int) -> np.ndarray: + """ + Pad a list of images to have the same dimensions. + + Parameters + ---------- + images : list + List of image arrays to be padded. + num_channels : int + Number of channels in the images. + + Returns + ------- + np.ndarray + Padded batch of images. + """ + max_height = max(image.shape[0] for image in images) + max_width = max(image.shape[1] for image in images) + + padded_batch = np.full((len(images), max_height, max_width, num_channels), + -10.0, dtype=np.float32) + + for i, image in enumerate(images): + padded_batch[i, :image.shape[0], :image.shape[1], ...] = image + + return padded_batch + + def pad_and_queue_batch(model_path: str, batch_images: list, batch_groups: list, @@ -449,25 +506,16 @@ def pad_and_queue_batch(model_path: str, # Generate a unique identifier for the batch batch_id = str(uuid.uuid4()) - # Convert the batch to a TensorFlow dataset - dataset = tf.data.Dataset.from_tensor_slices(batch_images) - - # Preprocess the images - dataset = dataset.map(lambda image: prepare_image(image, channels), - num_parallel_calls=tf.data.experimental.AUTOTUNE) + # Prepare images in parallel using ThreadPoolExecutor + with ThreadPoolExecutor() as executor: + prepared_images = list(executor.map( + lambda image: prepare_image(image, channels), batch_images)) # Pad the batch - dataset = dataset.padded_batch(len(batch_images), - padded_shapes=( - [None, None, channels]), - padding_values=( - tf.constant(-10, dtype=tf.float32))) - - # Convert the dataset to a padded batch - padded_batch = tf.data.Dataset.get_single_element(dataset).numpy() + padded_images = pad_images(prepared_images, channels) # Push the prepared batch to the prepared_queue - prepared_queue.put((padded_batch, batch_groups, batch_identifiers, + prepared_queue.put((padded_images, batch_groups, batch_identifiers, batch_metadata, model_path, batch_id)) logging.info("Prepared batch %s (%s items) for " "prediction", batch_id, len(batch_images)) diff --git a/src/api/routes.py b/src/api/routes.py index 89a9d4b..3cad251 100644 --- a/src/api/routes.py +++ b/src/api/routes.py @@ -72,7 +72,7 @@ def predict() -> flask.Response: try: app.request_queue.put((image_file, group_id, identifier, - model, whitelist), block=True, timeout=3) + model, whitelist)) except Full: response = jsonify({ "status": "error", @@ -86,9 +86,6 @@ def predict() -> flask.Response: response.status_code = 429 - logger.warning("Request queue is full. Maybe one of the workers has " - "died?") - return response response = jsonify({