Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
rvankoert committed Jun 5, 2024
2 parents 2bd9abd + 38e396a commit b08ceb9
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 64 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ prometheus-client==0.20.0
tf-models-official==2.14.1
xlsxwriter==3.2.0
six
Pillow==10.3.0
2 changes: 2 additions & 0 deletions src/api/batch_predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ def setup_gpu_environment(gpus: str) -> List[tf.config.PhysicalDevice]:
logging.info("Using CPU")

tf.config.set_visible_devices(active_gpus, 'GPU')
for gpu in active_gpus:
tf.config.experimental.set_memory_growth(gpu, True)

return active_gpus

Expand Down
168 changes: 108 additions & 60 deletions src/api/image_preparator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Imports

# > Standard library
from concurrent.futures import ThreadPoolExecutor
import json
import logging
import multiprocessing
Expand All @@ -10,6 +11,8 @@
import uuid

# > Third-party dependencies
import numpy as np
from PIL import Image, ImageOps
import tensorflow as tf


Expand Down Expand Up @@ -308,51 +311,6 @@ def fetch_and_prepare_images(request_queue: multiprocessing.Queue,
return num_channels, current_model, metadata, old_whitelist


def prepare_image(image_bytes: bytes,
num_channels: int) -> tf.Tensor:
"""
Prepare a single raw image for batch processing.
Decodes, resizes, normalizes, pads, and transposes the image for further
processing.
Parameters
----------
image_bytes : bytes
Raw bytes of the image.
num_channels : int
Number of channels desired for the output image (e.g., 1 for grayscale,
3 for RGB, 4 for RGBA).
Returns
-------
tf.Tensor
Prepared image tensor.
"""

image = tf.io.decode_image(image_bytes, channels=num_channels,
expand_animations=False)

# Resize while preserving aspect ratio
target_height = 64
aspect_ratio = tf.shape(image)[1] / tf.shape(image)[0]
target_width = tf.cast(target_height * aspect_ratio, tf.int32)
image = tf.image.resize(image,
[target_height, target_width])

image = tf.image.resize_with_pad(image,
target_height,
target_width + 50)

# Normalize the image and something else
image = 0.5 - (image / 255)

# Transpose the image
image = tf.transpose(image, perm=[1, 0, 2])

return image


def fetch_metadata(whitelist: list, model_path: str):
"""
Fetch metadata values based on the whitelist keys from a JSON configuration
Expand Down Expand Up @@ -415,6 +373,105 @@ def search_key(data, key):
return values


def prepare_image(image_bytes: bytes,
num_channels: int) -> np.ndarray:
"""
Prepare a single raw image for batch processing.
Decodes, resizes, normalizes, pads, and transposes the image for further
processing.
Parameters
----------
image_bytes : bytes
Raw bytes of the image.
num_channels : int
Number of channels desired for the output image (e.g., 1 for grayscale,
3 for RGB, 4 for RGBA).
Returns
-------
np.ndarray
Prepared image array.
"""

# Load the image using TensorFlow
image = tf.image.decode_image(image_bytes, channels=num_channels,
expand_animations=False)
image = tf.image.convert_image_dtype(image, tf.float32)

# Convert TensorFlow tensor to PIL Image
if num_channels == 1:
image = tf.squeeze(image, axis=-1) # Remove the channel dimension
image = Image.fromarray(
(image.numpy() * 255).astype(np.uint8), mode='L')
else:
image = Image.fromarray((image.numpy() * 255).astype(np.uint8))

# Convert to desired number of channels
if num_channels == 1:
image = image.convert("L")
elif num_channels == 3:
image = image.convert("RGB")
elif num_channels == 4:
image = image.convert("RGBA")

# Resize while preserving aspect ratio
target_height = 64
aspect_ratio = image.width / image.height
target_width = int(target_height * aspect_ratio)
image = image.resize((target_width, target_height), Image.BILINEAR)

# Calculate padding sizes
padding_height = target_height
padding_width = target_width + 50

# Create new image with padding, centering the original image
padded_image = ImageOps.pad(
image, (padding_width, padding_height), color=0, centering=(0.5, 0.5))

# Normalize the image
image_array = np.array(padded_image) / 255.0
image_array = 0.5 - image_array

# Ensure the image has the right number of channels
if num_channels == 1:
image_array = np.expand_dims(image_array, axis=-1)

# Transpose the image
image_array = np.transpose(image_array, (1, 0, 2))

return image_array


def pad_images(images: list, num_channels: int) -> np.ndarray:
"""
Pad a list of images to have the same dimensions.
Parameters
----------
images : list
List of image arrays to be padded.
num_channels : int
Number of channels in the images.
Returns
-------
np.ndarray
Padded batch of images.
"""
max_height = max(image.shape[0] for image in images)
max_width = max(image.shape[1] for image in images)

padded_batch = np.full((len(images), max_height, max_width, num_channels),
-10.0, dtype=np.float32)

for i, image in enumerate(images):
padded_batch[i, :image.shape[0], :image.shape[1], ...] = image

return padded_batch


def pad_and_queue_batch(model_path: str,
batch_images: list,
batch_groups: list,
Expand Down Expand Up @@ -449,25 +506,16 @@ def pad_and_queue_batch(model_path: str,
# Generate a unique identifier for the batch
batch_id = str(uuid.uuid4())

# Convert the batch to a TensorFlow dataset
dataset = tf.data.Dataset.from_tensor_slices(batch_images)

# Preprocess the images
dataset = dataset.map(lambda image: prepare_image(image, channels),
num_parallel_calls=tf.data.experimental.AUTOTUNE)
# Prepare images in parallel using ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
prepared_images = list(executor.map(
lambda image: prepare_image(image, channels), batch_images))

# Pad the batch
dataset = dataset.padded_batch(len(batch_images),
padded_shapes=(
[None, None, channels]),
padding_values=(
tf.constant(-10, dtype=tf.float32)))

# Convert the dataset to a padded batch
padded_batch = tf.data.Dataset.get_single_element(dataset).numpy()
padded_images = pad_images(prepared_images, channels)

# Push the prepared batch to the prepared_queue
prepared_queue.put((padded_batch, batch_groups, batch_identifiers,
prepared_queue.put((padded_images, batch_groups, batch_identifiers,
batch_metadata, model_path, batch_id))
logging.info("Prepared batch %s (%s items) for "
"prediction", batch_id, len(batch_images))
Expand Down
5 changes: 1 addition & 4 deletions src/api/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def predict() -> flask.Response:

try:
app.request_queue.put((image_file, group_id, identifier,
model, whitelist), block=True, timeout=3)
model, whitelist))
except Full:
response = jsonify({
"status": "error",
Expand All @@ -86,9 +86,6 @@ def predict() -> flask.Response:

response.status_code = 429

logger.warning("Request queue is full. Maybe one of the workers has "
"died?")

return response

response = jsonify({
Expand Down

0 comments on commit b08ceb9

Please sign in to comment.