From 2172196f50bdefc14ba2c4c8f43b5df500a9540d Mon Sep 17 00:00:00 2001 From: prashantkul Date: Thu, 18 Jul 2024 17:02:07 +0000 Subject: [PATCH 1/4] changes to support call_n pred --- usl_models/usl_models/flood_ml/dataset.py | 5 +++ usl_models/usl_models/flood_ml/model.py | 37 ++++++++++++++++++----- 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/usl_models/usl_models/flood_ml/dataset.py b/usl_models/usl_models/flood_ml/dataset.py index 048b0550..22a6d019 100644 --- a/usl_models/usl_models/flood_ml/dataset.py +++ b/usl_models/usl_models/flood_ml/dataset.py @@ -87,6 +87,7 @@ def generator(): ), dtype=tf.float32, ), + n = tf.TensorSpec(shape=(), dtype=tf.int32) ), tf.TensorSpec( shape=(None, constants.MAP_HEIGHT, constants.MAP_WIDTH), @@ -241,6 +242,9 @@ def _iter_model_inputs( firestore_client, storage_client, sim_name ) + rainfall_config = metastore.get_temporal_feature_metadata(firestore_client, sim_name) + rainfall = rainfall_config["rainfall_duration"] + for i, (geospatial, labels) in enumerate(feature_label_gen): if max_chunks is not None and i >= max_chunks: return @@ -256,6 +260,7 @@ def _iter_model_inputs( 1, ) ), + n = rainfall ) yield model_input, labels diff --git a/usl_models/usl_models/flood_ml/model.py b/usl_models/usl_models/flood_ml/model.py index 01100a75..bb6b3801 100644 --- a/usl_models/usl_models/flood_ml/model.py +++ b/usl_models/usl_models/flood_ml/model.py @@ -34,6 +34,7 @@ class Input(TypedDict): geospatial: tf.Tensor temporal: tf.Tensor spatiotemporal: tf.Tensor + n: int def __init__( self, @@ -292,7 +293,7 @@ def __init__( name="output_cnn", ) - def call(self, input: FloodModel.Input) -> tf.Tensor: + def call_v1(self, input: FloodModel.Input) -> tf.Tensor: """Makes a single forward pass on a batch of data. The forward pass represents a single prediction on an input batch @@ -350,7 +351,7 @@ def call(self, input: FloodModel.Input) -> tf.Tensor: return output - def call_n(self, full_input: FloodModel.Input, n: int = 1) -> tf.Tensor: + def call(self, full_input: FloodModel.Input) -> tf.Tensor: """Runs the entire autoregressive model. Args: @@ -365,6 +366,9 @@ def call_n(self, full_input: FloodModel.Input, n: int = 1) -> tf.Tensor: spatiotemporal = full_input["spatiotemporal"] geospatial = full_input["geospatial"] temporal = full_input["temporal"] + n = full_input["n"] + n = tf.squeeze(n) # Remove any extra dimensions + n = tf.cast(n, tf.int32) # Ensure it's an integer B = spatiotemporal.shape[0] C = 1 # Channel dimension for spatiotemporal tensor @@ -387,8 +391,9 @@ def call_n(self, full_input: FloodModel.Input, n: int = 1) -> tf.Tensor: geospatial=geospatial, temporal=self._get_temporal_window(temporal, t, N), spatiotemporal=spatiotemporal, + n=n ) - prediction = self.call(input) + prediction = self.call_v1(input) predictions = predictions.write(t - 1, prediction) # Append new predictions along time axis, drop the first. @@ -400,6 +405,7 @@ def call_n(self, full_input: FloodModel.Input, n: int = 1) -> tf.Tensor: predictions = tf.stack(tf.unstack(predictions.stack()), axis=1) # Drop channels dimension. return tf.squeeze(predictions, axis=-1) + @staticmethod def _get_temporal_window(temporal: tf.Tensor, t: int, n: int) -> tf.Tensor: @@ -413,11 +419,28 @@ def _get_temporal_window(temporal: tf.Tensor, t: int, n: int) -> tf.Tensor: Returns: Returns a zero-padded n-sized window at timestep t of shape (B, n, M) """ - B, _, M = temporal.shape - return tf.concat( + B = tf.shape(temporal)[0] + T_MAX = tf.shape(temporal)[1] + M = tf.shape(temporal)[2] + + zero_padding_shape = (B, tf.maximum(n - t, 0), M) + temporal_slice_start = tf.maximum(t - n, 0) + temporal_slice_end = tf.minimum(t, T_MAX) + temporal_slice = temporal[:, temporal_slice_start:temporal_slice_end] + + # Debug prints to inspect shapes and values + # print("Zero padding shape:", zero_padding_shape) + # print("Temporal slice start:", temporal_slice_start) + # print("Temporal slice end:", temporal_slice_end) + # print("Temporal slice shape:", temporal_slice.shape) + # print("Temporal slice values:", temporal_slice) + + result = tf.concat( [ - tf.zeros(shape=(B, tf.maximum(n - t, 0), M)), - temporal[:, tf.maximum(t - n, 0) : t], + tf.zeros(shape=zero_padding_shape), + temporal_slice, ], axis=1, ) + return result + From 1346ed0a8c53e69fb6c6db27c56c2b0cd080e839 Mon Sep 17 00:00:00 2001 From: prashantkul Date: Thu, 18 Jul 2024 17:39:27 +0000 Subject: [PATCH 2/4] changes to call stack/unstack tf operations --- usl_models/usl_models/flood_ml/model.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/usl_models/usl_models/flood_ml/model.py b/usl_models/usl_models/flood_ml/model.py index bb6b3801..378e573a 100644 --- a/usl_models/usl_models/flood_ml/model.py +++ b/usl_models/usl_models/flood_ml/model.py @@ -395,6 +395,7 @@ def call(self, full_input: FloodModel.Input) -> tf.Tensor: ) prediction = self.call_v1(input) predictions = predictions.write(t - 1, prediction) + # Append new predictions along time axis, drop the first. spatiotemporal = tf.concat( @@ -402,9 +403,12 @@ def call(self, full_input: FloodModel.Input) -> tf.Tensor: )[:, 1:] # Gather dense tensor out of TensorArray along the time axis. - predictions = tf.stack(tf.unstack(predictions.stack()), axis=1) + #predictions = tf.stack(tf.unstack(predictions.stack(), num=int(n.numpy())), axis=1) + predictions = predictions.gather(tf.range(1, n + 1)) + predictions = tf.transpose(predictions, perm=[1, 0, 2, 3, 4]) + predictions = tf.squeeze(predictions, axis=-1) # Drop channels dimension. - return tf.squeeze(predictions, axis=-1) + return predictions @staticmethod From ef901001dba97c5a51131dd290ed7f7dcc8fc24c Mon Sep 17 00:00:00 2001 From: prashantkul Date: Thu, 18 Jul 2024 19:40:34 +0000 Subject: [PATCH 3/4] changed call implementation to work with symbolic tensors for concrete function export --- usl_models/usl_models/flood_ml/model.py | 118 +++++++++++++++++------- 1 file changed, 85 insertions(+), 33 deletions(-) diff --git a/usl_models/usl_models/flood_ml/model.py b/usl_models/usl_models/flood_ml/model.py index 378e573a..4d5116f2 100644 --- a/usl_models/usl_models/flood_ml/model.py +++ b/usl_models/usl_models/flood_ml/model.py @@ -351,42 +351,90 @@ def call_v1(self, input: FloodModel.Input) -> tf.Tensor: return output - def call(self, full_input: FloodModel.Input) -> tf.Tensor: - """Runs the entire autoregressive model. + # def call(self, full_input: FloodModel.Input) -> tf.Tensor: + # """Runs the entire autoregressive model. + + # Args: + # full_input: A dictionary of input tensors. + # While `call` expects only input data for a single context window, + # `call_n` requires the full temporal tensor. + # n: Number of autoregressive iterations to run. + + # Returns: + # A tensor of all the flood predictions: [B, n, H, W]. + # """ + # spatiotemporal = full_input["spatiotemporal"] + # geospatial = full_input["geospatial"] + # temporal = full_input["temporal"] + # n = full_input["n"] + # n = tf.squeeze(n) # Remove any extra dimensions + # n = tf.cast(n, tf.int32) # Ensure it's an integer + + # B = spatiotemporal.shape[0] + # C = 1 # Channel dimension for spatiotemporal tensor + # F = constants.GEO_FEATURES + # N, M = self._params.n_flood_maps, self._params.m_rainfall + # T_MAX = constants.MAX_RAINFALL_DURATION + # H, W = self._spatial_height, self._spatial_width + + # tf.ensure_shape(spatiotemporal, (B, N, H, W, C)) + # tf.ensure_shape(geospatial, (B, H, W, F)) + # tf.ensure_shape(temporal, (B, T_MAX, M)) + + # # This array stores the n predictions. + # predictions = tf.TensorArray(tf.float32, size=n) + + # # We use 1-indexing for simplicity. Time step t represents the t-th flood + # # prediction. + # for t in range(1, n + 1): + # input = FloodModel.Input( + # geospatial=geospatial, + # temporal=self._get_temporal_window(temporal, t, N), + # spatiotemporal=spatiotemporal, + # n=n + # ) + # prediction = self.call_v1(input) + # predictions = predictions.write(t - 1, prediction) + - Args: - full_input: A dictionary of input tensors. - While `call` expects only input data for a single context window, - `call_n` requires the full temporal tensor. - n: Number of autoregressive iterations to run. + # # Append new predictions along time axis, drop the first. + # spatiotemporal = tf.concat( + # [spatiotemporal, tf.expand_dims(prediction, axis=1)], axis=1 + # )[:, 1:] - Returns: - A tensor of all the flood predictions: [B, n, H, W]. - """ + # # Gather dense tensor out of TensorArray along the time axis. + # #predictions = tf.stack(tf.unstack(predictions.stack(), num=int(n.numpy())), axis=1) + # predictions = predictions.gather(tf.range(1, n + 1)) + # predictions = tf.transpose(predictions, perm=[1, 0, 2, 3, 4]) + # predictions = tf.squeeze(predictions, axis=-1) + # # Drop channels dimension. + # return predictions + + def call(self, full_input: FloodModel.Input) -> tf.Tensor: spatiotemporal = full_input["spatiotemporal"] geospatial = full_input["geospatial"] temporal = full_input["temporal"] n = full_input["n"] - n = tf.squeeze(n) # Remove any extra dimensions - n = tf.cast(n, tf.int32) # Ensure it's an integer + n = tf.squeeze(n) + n = tf.cast(n, tf.int32) + + # Use tf.shape instead of .shape to get dynamic dimensions + B = tf.shape(spatiotemporal)[0] + N = tf.shape(spatiotemporal)[1] + H = tf.shape(spatiotemporal)[2] + W = tf.shape(spatiotemporal)[3] + C = tf.shape(spatiotemporal)[4] - B = spatiotemporal.shape[0] - C = 1 # Channel dimension for spatiotemporal tensor F = constants.GEO_FEATURES - N, M = self._params.n_flood_maps, self._params.m_rainfall + M = self._params.m_rainfall T_MAX = constants.MAX_RAINFALL_DURATION - H, W = self._spatial_height, self._spatial_width - - tf.ensure_shape(spatiotemporal, (B, N, H, W, C)) - tf.ensure_shape(geospatial, (B, H, W, F)) - tf.ensure_shape(temporal, (B, T_MAX, M)) - # This array stores the n predictions. - predictions = tf.TensorArray(tf.float32, size=n) + # Instead of ensure_shape, use tf.debugging.assert_equal for runtime checks + tf.debugging.assert_equal(tf.shape(spatiotemporal), [B, N, H, W, C]) + tf.debugging.assert_equal(tf.shape(geospatial), [B, H, W, F]) + tf.debugging.assert_equal(tf.shape(temporal), [B, T_MAX, M]) - # We use 1-indexing for simplicity. Time step t represents the t-th flood - # prediction. - for t in range(1, n + 1): + def loop_body(t, spatiotemporal, predictions): input = FloodModel.Input( geospatial=geospatial, temporal=self._get_temporal_window(temporal, t, N), @@ -396,18 +444,22 @@ def call(self, full_input: FloodModel.Input) -> tf.Tensor: prediction = self.call_v1(input) predictions = predictions.write(t - 1, prediction) - - # Append new predictions along time axis, drop the first. spatiotemporal = tf.concat( - [spatiotemporal, tf.expand_dims(prediction, axis=1)], axis=1 - )[:, 1:] + [spatiotemporal[:, 1:], tf.expand_dims(prediction, axis=1)], + axis=1 + ) + return t + 1, spatiotemporal, predictions + + _, _, predictions = tf.while_loop( + lambda t, *_: tf.less_equal(t, n), + loop_body, + [tf.constant(1), spatiotemporal, tf.TensorArray(tf.float32, size=n)], + maximum_iterations=n + ) - # Gather dense tensor out of TensorArray along the time axis. - #predictions = tf.stack(tf.unstack(predictions.stack(), num=int(n.numpy())), axis=1) - predictions = predictions.gather(tf.range(1, n + 1)) + predictions = predictions.stack() predictions = tf.transpose(predictions, perm=[1, 0, 2, 3, 4]) predictions = tf.squeeze(predictions, axis=-1) - # Drop channels dimension. return predictions From ca8d2a276ad27af5b55a36d7e44da704c2a1da77 Mon Sep 17 00:00:00 2001 From: prashantkul Date: Thu, 18 Jul 2024 21:27:51 +0000 Subject: [PATCH 4/4] batch predictor for local testing --- usl_models/custom_predictor.py | 105 ++++++++++ usl_models/usl_models/flood_ml/predictor.py | 218 ++++++++++++++++++++ 2 files changed, 323 insertions(+) create mode 100644 usl_models/custom_predictor.py create mode 100644 usl_models/usl_models/flood_ml/predictor.py diff --git a/usl_models/custom_predictor.py b/usl_models/custom_predictor.py new file mode 100644 index 00000000..c3fa42ad --- /dev/null +++ b/usl_models/custom_predictor.py @@ -0,0 +1,105 @@ +import sys, os +from pathlib import Path + +from google.cloud import aiplatform +from google.cloud.aiplatform.prediction import LocalModel +from usl_models.flood_ml.predictor import FloodModelPredictor +from usl_models.flood_ml.dataset import load_dataset +import json +import numpy as np +from google.cloud import firestore +from google.cloud import storage +import tensorflow as tf + +def tensor_to_json_serializable(tensor): + if isinstance(tensor, tf.Tensor): + return tensor.numpy().tolist() + elif isinstance(tensor, np.ndarray): + return tensor.tolist() + else: + return tensor + +def create_jsonl_file(sim_names: list): + batch_size = 0 + dataset = load_dataset(sim_names, batch_size=batch_size, max_chunks=8, + firestore_client=firestore.Client(project='climateiq-test'), + storage_client=storage.Client(project='climateiq-test')) + + inputs, labels = next(iter(dataset)) + print("Input shapes:") + for key, value in inputs.items(): + print(f"{key}: {value.shape}") + + print("\nLabel shape:", labels.shape) + + outfile = 'batch_pred_6.jsonl' + + # Convert inputs to JSON serializable format + json_serializable_inputs = { + key: tensor_to_json_serializable(value) + for key, value in inputs.items() + } + + # Write to JSONL file + with open(outfile, 'w') as f: + json.dump(json_serializable_inputs, f) + + print("JSONL file created successfully.") + + +def load_jsonl_to_numpy(file_path): + data = {} # Initialize an empty dictionary + + with open(file_path, 'r') as file: + for line_num, line in enumerate(file): # Enumerate to keep track of line number (batch index) + item = json.loads(line) + + # Create NumPy arrays and add batch dimension directly + for key in ['geospatial', 'temporal', 'spatiotemporal', 'n']: + arr = np.array(item[key], dtype=np.float32) + arr = np.expand_dims(arr, axis=0) # Add batch dimension + + if key in data: + data[key] = np.concatenate([data[key], arr], axis=0) + else: + data[key] = arr + + # Print shapes for debugging + print("Loaded data shapes:") + for key, value in data.items(): + print(f"{key}: {value.shape}") + + return data + + +def main(): + predictor = FloodModelPredictor() + sim_names = ["Manhattan-config_v1/Rainfall_Data_1.txt"] + use_local = True + # model_gcs_url = "gs://climateiq-vertexai/aiplatform-custom-training-2024-07-16-17:40:15.640/" + # create prediction container + #create_cotainer() + # # load model , model can be in GCS or local, present in "model" directory + # predictor.load(model_gcs_url) + #create_jsonl_file(sim_names=sim_names) + + # # if not use_local: + # # predictor.load(model_gcs_url) + # # else: + # # predictor.load('model/') + predictor.load('model/') + + + # # # load unbatched input data and batch, this is typically done by Vertex + batch_data_file = "batch_pred_6.jsonl" + instances_dict = load_jsonl_to_numpy(batch_data_file) + + print("Calling prediction..") + + # # call predict + + predictions = predictor.predict_now(instances_dict) + print("Predicitions successful, shape:", predictions.shape) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/usl_models/usl_models/flood_ml/predictor.py b/usl_models/usl_models/flood_ml/predictor.py new file mode 100644 index 00000000..e682893f --- /dev/null +++ b/usl_models/usl_models/flood_ml/predictor.py @@ -0,0 +1,218 @@ +import json + +import numpy as np +import tensorflow as tf + +from google.cloud.aiplatform.utils import prediction_utils +from google.cloud.aiplatform.prediction.predictor import Predictor +from google.cloud import storage +from usl_models.flood_ml.model import FloodModel, constants + + +class FloodModelPredictor(Predictor): + def __init__(self): + """Initializes the FloodModelPredictor.""" + self._model = None + + def load(self, artifacts_uri: str) -> None: + """Loads the model artifact. + Args: + artifacts_uri (str): + Required. The value of the environment variable AIP_STORAGE_URI. + Raises: + ValueError: If there's no required model files provided in the artifacts + uri. + """ + + # Load the saved model + loaded_model = tf.saved_model.load('concret_fn') + + self._model = loaded_model + + # Get the serving_default signature + serving_signature = self._model.signatures['serving_default'] + + # Print model input signature + print("Model input signature:", serving_signature.structured_input_signature) + + # Print model output signature + print("\nModel output signature:") + for output_name, output_tensor in serving_signature.structured_outputs.items(): + print(f" {output_name}: {output_tensor.shape}") + print("\n Model loaded successfully.") + + def preprocess(self, input_data) -> dict: + """Loads and preprocesses data from either a GCS URL or a dictionary of instances. + Args: + input_data: Either a GCS URL string or a dictionary containing an "instances" key with a list of instances. + Returns: + A dictionary where keys are the field names (e.g., 'geospatial', 'temporal', 'spatiotemporal') + and values are NumPy arrays with a batch dimension added. + """ + # load jsnol local file and create a dict + + + + + print("Entering preprocess step.") + data = {} + + if isinstance(input_data, str) and input_data.startswith("gs://"): + # Input is a GCS URL + storage_client = storage.Client() + + # Parse the GCS URL + bucket_name = input_data[5:].split('/')[0] + blob_name = '/'.join(input_data[5:].split('/')[1:]) + + # Get the bucket and blob + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(blob_name) + + # Download the blob content as a string + blob_string = blob.download_as_string().decode('utf-8') + + # Iterate over lines in the blob string + for line_num, line in enumerate(blob_string.splitlines()): + item = json.loads(line) + + # Create NumPy arrays and add batch dimension directly + for key in ['geospatial', 'temporal', 'spatiotemporal']: + arr = np.array(item[key], dtype=np.float32) + arr = np.expand_dims(arr, axis=0) + + if key in data: + data[key] = np.concatenate([data[key], arr], axis=0) + else: + data[key] = arr + + elif isinstance(input_data, dict) and "instances" in input_data: + # Input is a dictionary of instances + instances = input_data["instances"] + + # Batch the instances + for key in ['geospatial', 'temporal', 'spatiotemporal']: + data[key] = np.stack([np.array(instance[key], dtype=np.float32) for instance in instances], axis=0) + + else: + raise ValueError("Invalid input_data format. Expected a GCS URL or a dictionary with 'instances'.") + + # Print shapes for debugging + print("Loaded data shapes:") + for key, value in data.items(): + print(f"{key}: {value.shape}") + + return data + + @staticmethod + def _get_temporal_window(temporal: tf.Tensor, t: int, n: int) -> tf.Tensor: + """Returns a zero-padded n-sized window at timestep t.""" + + B = tf.shape(temporal)[0] + M = tf.shape(temporal)[2] + + # Use tf.maximum instead of max + pad_size = tf.maximum(n - t, 0) + start = tf.maximum(t - n, 0) + + return tf.concat( + [tf.zeros(shape=(B, pad_size, M), dtype=temporal.dtype), + temporal[:, start:t, :]], + axis=1 + ) + + def predict(self, data: dict, n=1): + """Runs the entire autoregressive model. + Args: + data: A dictionary of input tensors. + While `call` expects only input data for a single context window, + `call_n` requires the full temporal tensor. + n: Number of autoregressive iterations to run. + Returns: + A tensor of all the flood predictions: [B, n, H, W]. + """ + print("Entering predict step.") + if self._model is None: + raise ValueError("Model not loaded. Call load() first.") + + try: + spatiotemporal = data["spatiotemporal"] + geospatial = data["geospatial"] + temporal = data["temporal"] + + B = spatiotemporal.shape[0] + C = 1 # Channel dimension for spatiotemporal tensor + F = constants.GEO_FEATURES + N, M = constants.N_FLOOD_MAPS, constants.M_RAINFALL + T_MAX = constants.MAX_RAINFALL_DURATION + H, W = constants.MAP_HEIGHT, constants.MAP_WIDTH + + tf.ensure_shape(spatiotemporal, (B, N, H, W, C)) + tf.ensure_shape(geospatial, (B, H, W, F)) + tf.ensure_shape(temporal, (B, T_MAX, M)) + + # This array stores the n predictions. + predictions = tf.TensorArray(tf.float32, size=n) + + # We use 1-indexing for simplicity. Time step t represents the t-th flood + # prediction. + for t in range(1, n + 1): + input = FloodModel.Input( + geospatial=geospatial, + temporal=self._get_temporal_window(temporal, t, N), + spatiotemporal=spatiotemporal, + ) + # Get the prediction function from loaded model + predict_fn = self._model.signatures["serving_default"] + + # Make a prediction + prediction_dict = predict_fn(**input) + prediction = prediction_dict['output_1'] + + predictions = predictions.write(t - 1, prediction) + + + # Append new predictions along time axis, drop the first. + spatiotemporal = tf.concat( + [spatiotemporal, tf.expand_dims(prediction, axis=1)], axis=1 + )[:, 1:] + + predictions = tf.stack(tf.unstack(predictions.stack()),axis=1) + print("Prediction shape (before sequeeze): ", predictions.shape) + print("Prediction completed.") + # Drop channels dimension. + return tf.squeeze(predictions, axis=-1) + + except Exception as e: + raise RuntimeError(f"Error during prediction: {e}") + + def predict_now(self, instances_dict): + # Ensure n is a scalar of type int32 + instances_dict["n"] = tf.cast(tf.squeeze(instances_dict["n"]), tf.int32) + + # Prepare the input in the format expected by your model + model_input = { + "geospatial": instances_dict["geospatial"], + "temporal": instances_dict["temporal"], + "spatiotemporal": instances_dict["spatiotemporal"], + "n": instances_dict["n"] + } + + # Call the model's __call__ method with the correct input format + prediction = self._model(model_input, training=False) + + return prediction + + def postprocess(self, prediction_results: np.ndarray) -> dict: + """Converts numpy array to a dict. + Args: + prediction_results (np.ndarray): + Required. The prediction results. + Returns: + The postprocessed prediction results. + """ + return {tf.math.reduce_max(prediction_results, axis=1)} + + def health(self): + return 'Healthy', 200, {'Content-Type': 'text/plain'} +