diff --git a/requirements.txt b/requirements.txt index fa402560..253266a4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ flask==3.0.2 -gunicorn==21.2.0 +gunicorn==22.0.0 numpy==1.26.4 editdistance==0.8.1 tensorflow==2.14.1 diff --git a/src/api/app_utils.py b/src/api/app_utils.py index bc10160e..e165e0bf 100644 --- a/src/api/app_utils.py +++ b/src/api/app_utils.py @@ -110,6 +110,11 @@ def extract_request_data() -> Tuple[bytes, str, str, str, list]: image_content = image_file.read() + # Check if the image content is empty or None + if image_content is None or len(image_content) == 0: + raise ValueError( + "The uploaded image is empty. Please upload a valid image file.") + # Extract other form data group_id = request.form.get('group_id') if not group_id: diff --git a/src/api/batch_predictor.py b/src/api/batch_predictor.py index c3b89132..63e4d557 100644 --- a/src/api/batch_predictor.py +++ b/src/api/batch_predictor.py @@ -20,6 +20,7 @@ sys.path.append(parent_path) from model.management import load_model_from_directory # noqa: E402 +from setup.environment import initialize_strategy # noqa: E402 def create_model(model_path: str, strategy: tf.distribute.Strategy) \ @@ -61,7 +62,7 @@ def create_model(model_path: str, strategy: tf.distribute.Strategy) \ return model -def setup_gpu_environment(gpus: str) -> bool: +def setup_gpu_environment(gpus: str) -> List[tf.config.PhysicalDevice]: """ Set up the environment for batch prediction. @@ -72,8 +73,8 @@ def setup_gpu_environment(gpus: str) -> bool: Returns: -------- - bool - True if all GPUs support mixed precision, otherwise False. + List[tf.config.PhysicalDevice] + List of active GPUs. """ # Set the GPU @@ -99,24 +100,7 @@ def setup_gpu_environment(gpus: str) -> bool: tf.config.set_visible_devices(active_gpus, 'GPU') - # Check if all GPUs support mixed precision - gpus_support_mixed_precision = bool(active_gpus) - for device in active_gpus: - tf.config.experimental.set_memory_growth(device, True) - if tf.config.experimental.\ - get_device_details(device)['compute_capability'][0] < 7: - gpus_support_mixed_precision = False - - # If all GPUs support mixed precision, enable it - if gpus_support_mixed_precision: - tf.keras.mixed_precision.set_global_policy('mixed_float16') - logging.debug("Mixed precision set to 'mixed_float16'") - else: - logging.debug( - "Not all GPUs support efficient mixed precision. Running in " - "standard mode.") - - return gpus_support_mixed_precision + return active_gpus def batch_prediction_worker(prepared_queue: multiprocessing.Queue, @@ -151,8 +135,11 @@ def batch_prediction_worker(prepared_queue: multiprocessing.Queue, logging.info("Batch prediction process started") # If all GPUs support mixed precision, enable it - setup_gpu_environment(gpus) - strategy = tf.distribute.MirroredStrategy() + active_gpus = setup_gpu_environment(gpus) + + # Set up the strategy + strategy = initialize_strategy(use_float32=False, + active_gpus=active_gpus) # Create the model and utilities model = create_model(model_path, strategy) diff --git a/src/api/routes.py b/src/api/routes.py index 4a97c705..89a9d4bb 100644 --- a/src/api/routes.py +++ b/src/api/routes.py @@ -51,7 +51,19 @@ def predict() -> flask.Response: # Add incoming request to queue # Here, we're just queuing the raw data. - image_file, group_id, identifier, model, whitelist = extract_request_data() + try: + image_file, group_id, identifier, model, whitelist = extract_request_data() + except ValueError as e: + response = jsonify({ + "status": "error", + "code": 400, + "message": str(e), + "timestamp": datetime.datetime.now().isoformat() + }) + + response.status_code = 400 + logger.error("Error processing request: %s", str(e)) + return response logger.debug("Data received: %s, %s", group_id, identifier) logger.debug("Adding %s to queue", identifier) diff --git a/src/data/manager.py b/src/data/manager.py index 9fceca6d..c912097d 100644 --- a/src/data/manager.py +++ b/src/data/manager.py @@ -477,4 +477,6 @@ def _create_dataset(self, .prefetch(AUTOTUNE))\ .apply(tf.data.experimental.assert_cardinality(num_batches)) - return dataset + # Distribute the dataset if needed + strategy = tf.distribute.get_strategy() + return strategy.experimental_distribute_dataset(dataset) diff --git a/src/modes/training.py b/src/modes/training.py index 11b00e84..a74157f9 100644 --- a/src/modes/training.py +++ b/src/modes/training.py @@ -79,6 +79,13 @@ def train_model(model: tf.keras.Model, ) callbacks.append(early_stopping) + # Determine the number of steps per epoch + cardinality = training_dataset.cardinality().numpy() \ + if isinstance(training_dataset, tf.data.Dataset) \ + else training_dataset.cardinality + steps_per_epoch = config["steps_per_epoch"] \ + if config["steps_per_epoch"] else cardinality + # Train the model history = model.fit( training_dataset, @@ -88,7 +95,7 @@ def train_model(model: tf.keras.Model, shuffle=True, workers=num_workers, max_queue_size=config["max_queue_size"], - steps_per_epoch=config["steps_per_epoch"], + steps_per_epoch=steps_per_epoch, verbose=config["training_verbosity_mode"] ) return history diff --git a/src/setup/environment.py b/src/setup/environment.py index abdd4985..cffe90a0 100644 --- a/src/setup/environment.py +++ b/src/setup/environment.py @@ -111,7 +111,7 @@ def setup_environment(config: Config) -> tf.distribute.Strategy: tf.config.set_visible_devices(active_gpus, 'GPU') # Initialize the strategy - strategy = initialize_strategy(config["use_float32"], config["gpu"]) + strategy = initialize_strategy(config["use_float32"], active_gpus) return strategy @@ -143,7 +143,7 @@ def setup_logging() -> None: def initialize_strategy(use_float32: bool, - gpu: str) -> tf.distribute.Strategy: + active_gpus: list[str]) -> tf.distribute.Strategy: """ Initializes the TensorFlow distribution strategy and sets the mixed precision policy. @@ -152,9 +152,8 @@ def initialize_strategy(use_float32: bool, ---------- use_float32 : bool Flag indicating whether to use float32 precision. - gpu : str - A string indicating the GPU configuration. A value of "-1" indicates - CPU-only mode. + active_gpus : list[str] + A list of active GPU devices. Returns ------- @@ -170,13 +169,32 @@ def initialize_strategy(use_float32: bool, """ # Set the strategy for distributed training - strategy = tf.distribute.MirroredStrategy() + if len(active_gpus) > 1: + strategy = tf.distribute.MirroredStrategy() + logging.info("Detected multiple GPUs, using MirroredStrategy") + else: + strategy = tf.distribute.get_strategy() + logging.info("Using default strategy for single GPU/CPU") # Set mixed precision policy - if not use_float32 and gpu != "-1": - policy = tf.keras.mixed_precision.Policy('mixed_float16') - tf.keras.mixed_precision.set_global_policy(policy) - logging.info("Using mixed_float16 precision") + if not use_float32 and len(active_gpus) > 0: + # Check if all GPUs support mixed precision + gpus_support_mixed_precision = bool(active_gpus) + for device in active_gpus: + tf.config.experimental.set_memory_growth(device, True) + if tf.config.experimental.\ + get_device_details(device)['compute_capability'][0] < 7: + gpus_support_mixed_precision = False + + # If all GPUs support mixed precision, enable it + if gpus_support_mixed_precision: + policy = tf.keras.mixed_precision.Policy('mixed_float16') + tf.keras.mixed_precision.set_global_policy(policy) + logging.info("Mixed precision set to 'mixed_float16'") + else: + logging.warning( + "Not all GPUs support efficient mixed precision. Running in " + "standard mode.") else: logging.info("Using float32 precision")