Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changes to support call_n pred #136

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions usl_models/custom_predictor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import sys, os
from pathlib import Path

from google.cloud import aiplatform
from google.cloud.aiplatform.prediction import LocalModel
from usl_models.flood_ml.predictor import FloodModelPredictor
from usl_models.flood_ml.dataset import load_dataset
import json
import numpy as np
from google.cloud import firestore
from google.cloud import storage
import tensorflow as tf

def tensor_to_json_serializable(tensor):
if isinstance(tensor, tf.Tensor):
return tensor.numpy().tolist()
elif isinstance(tensor, np.ndarray):
return tensor.tolist()
else:
return tensor

def create_jsonl_file(sim_names: list):
batch_size = 0
dataset = load_dataset(sim_names, batch_size=batch_size, max_chunks=8,
firestore_client=firestore.Client(project='climateiq-test'),
storage_client=storage.Client(project='climateiq-test'))

inputs, labels = next(iter(dataset))
print("Input shapes:")
for key, value in inputs.items():
print(f"{key}: {value.shape}")

print("\nLabel shape:", labels.shape)

outfile = 'batch_pred_6.jsonl'

# Convert inputs to JSON serializable format
json_serializable_inputs = {
key: tensor_to_json_serializable(value)
for key, value in inputs.items()
}

# Write to JSONL file
with open(outfile, 'w') as f:
json.dump(json_serializable_inputs, f)

print("JSONL file created successfully.")


def load_jsonl_to_numpy(file_path):
data = {} # Initialize an empty dictionary

with open(file_path, 'r') as file:
for line_num, line in enumerate(file): # Enumerate to keep track of line number (batch index)
item = json.loads(line)

# Create NumPy arrays and add batch dimension directly
for key in ['geospatial', 'temporal', 'spatiotemporal', 'n']:
arr = np.array(item[key], dtype=np.float32)
arr = np.expand_dims(arr, axis=0) # Add batch dimension

if key in data:
data[key] = np.concatenate([data[key], arr], axis=0)
else:
data[key] = arr

# Print shapes for debugging
print("Loaded data shapes:")
for key, value in data.items():
print(f"{key}: {value.shape}")

return data


def main():
predictor = FloodModelPredictor()
sim_names = ["Manhattan-config_v1/Rainfall_Data_1.txt"]
use_local = True
# model_gcs_url = "gs://climateiq-vertexai/aiplatform-custom-training-2024-07-16-17:40:15.640/"
# create prediction container
#create_cotainer()
# # load model , model can be in GCS or local, present in "model" directory
# predictor.load(model_gcs_url)
#create_jsonl_file(sim_names=sim_names)

# # if not use_local:
# # predictor.load(model_gcs_url)
# # else:
# # predictor.load('model/')
predictor.load('model/')


# # # load unbatched input data and batch, this is typically done by Vertex
batch_data_file = "batch_pred_6.jsonl"
instances_dict = load_jsonl_to_numpy(batch_data_file)

print("Calling prediction..")

# # call predict

predictions = predictor.predict_now(instances_dict)
print("Predicitions successful, shape:", predictions.shape)

if __name__ == "__main__":
main()
5 changes: 5 additions & 0 deletions usl_models/usl_models/flood_ml/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def generator():
),
dtype=tf.float32,
),
n = tf.TensorSpec(shape=(), dtype=tf.int32)
),
tf.TensorSpec(
shape=(None, constants.MAP_HEIGHT, constants.MAP_WIDTH),
Expand Down Expand Up @@ -241,6 +242,9 @@ def _iter_model_inputs(
firestore_client, storage_client, sim_name
)

rainfall_config = metastore.get_temporal_feature_metadata(firestore_client, sim_name)
rainfall = rainfall_config["rainfall_duration"]

for i, (geospatial, labels) in enumerate(feature_label_gen):
if max_chunks is not None and i >= max_chunks:
return
Expand All @@ -256,6 +260,7 @@ def _iter_model_inputs(
1,
)
),
n = rainfall
)
yield model_input, labels

Expand Down
157 changes: 118 additions & 39 deletions usl_models/usl_models/flood_ml/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class Input(TypedDict):
geospatial: tf.Tensor
temporal: tf.Tensor
spatiotemporal: tf.Tensor
n: int

def __init__(
self,
Expand Down Expand Up @@ -292,7 +293,7 @@ def __init__(
name="output_cnn",
)

def call(self, input: FloodModel.Input) -> tf.Tensor:
def call_v1(self, input: FloodModel.Input) -> tf.Tensor:
"""Makes a single forward pass on a batch of data.

The forward pass represents a single prediction on an input batch
Expand Down Expand Up @@ -350,56 +351,117 @@ def call(self, input: FloodModel.Input) -> tf.Tensor:

return output

def call_n(self, full_input: FloodModel.Input, n: int = 1) -> tf.Tensor:
"""Runs the entire autoregressive model.

Args:
full_input: A dictionary of input tensors.
While `call` expects only input data for a single context window,
`call_n` requires the full temporal tensor.
n: Number of autoregressive iterations to run.

Returns:
A tensor of all the flood predictions: [B, n, H, W].
"""
# def call(self, full_input: FloodModel.Input) -> tf.Tensor:
# """Runs the entire autoregressive model.

# Args:
# full_input: A dictionary of input tensors.
# While `call` expects only input data for a single context window,
# `call_n` requires the full temporal tensor.
# n: Number of autoregressive iterations to run.

# Returns:
# A tensor of all the flood predictions: [B, n, H, W].
# """
# spatiotemporal = full_input["spatiotemporal"]
# geospatial = full_input["geospatial"]
# temporal = full_input["temporal"]
# n = full_input["n"]
# n = tf.squeeze(n) # Remove any extra dimensions
# n = tf.cast(n, tf.int32) # Ensure it's an integer

# B = spatiotemporal.shape[0]
# C = 1 # Channel dimension for spatiotemporal tensor
# F = constants.GEO_FEATURES
# N, M = self._params.n_flood_maps, self._params.m_rainfall
# T_MAX = constants.MAX_RAINFALL_DURATION
# H, W = self._spatial_height, self._spatial_width

# tf.ensure_shape(spatiotemporal, (B, N, H, W, C))
# tf.ensure_shape(geospatial, (B, H, W, F))
# tf.ensure_shape(temporal, (B, T_MAX, M))

# # This array stores the n predictions.
# predictions = tf.TensorArray(tf.float32, size=n)

# # We use 1-indexing for simplicity. Time step t represents the t-th flood
# # prediction.
# for t in range(1, n + 1):
# input = FloodModel.Input(
# geospatial=geospatial,
# temporal=self._get_temporal_window(temporal, t, N),
# spatiotemporal=spatiotemporal,
# n=n
# )
# prediction = self.call_v1(input)
# predictions = predictions.write(t - 1, prediction)


# # Append new predictions along time axis, drop the first.
# spatiotemporal = tf.concat(
# [spatiotemporal, tf.expand_dims(prediction, axis=1)], axis=1
# )[:, 1:]

# # Gather dense tensor out of TensorArray along the time axis.
# #predictions = tf.stack(tf.unstack(predictions.stack(), num=int(n.numpy())), axis=1)
# predictions = predictions.gather(tf.range(1, n + 1))
# predictions = tf.transpose(predictions, perm=[1, 0, 2, 3, 4])
# predictions = tf.squeeze(predictions, axis=-1)
# # Drop channels dimension.
# return predictions

def call(self, full_input: FloodModel.Input) -> tf.Tensor:
spatiotemporal = full_input["spatiotemporal"]
geospatial = full_input["geospatial"]
temporal = full_input["temporal"]
n = full_input["n"]
n = tf.squeeze(n)
n = tf.cast(n, tf.int32)

# Use tf.shape instead of .shape to get dynamic dimensions
B = tf.shape(spatiotemporal)[0]
N = tf.shape(spatiotemporal)[1]
H = tf.shape(spatiotemporal)[2]
W = tf.shape(spatiotemporal)[3]
C = tf.shape(spatiotemporal)[4]

B = spatiotemporal.shape[0]
C = 1 # Channel dimension for spatiotemporal tensor
F = constants.GEO_FEATURES
N, M = self._params.n_flood_maps, self._params.m_rainfall
M = self._params.m_rainfall
T_MAX = constants.MAX_RAINFALL_DURATION
H, W = self._spatial_height, self._spatial_width

tf.ensure_shape(spatiotemporal, (B, N, H, W, C))
tf.ensure_shape(geospatial, (B, H, W, F))
tf.ensure_shape(temporal, (B, T_MAX, M))
# Instead of ensure_shape, use tf.debugging.assert_equal for runtime checks
tf.debugging.assert_equal(tf.shape(spatiotemporal), [B, N, H, W, C])
tf.debugging.assert_equal(tf.shape(geospatial), [B, H, W, F])
tf.debugging.assert_equal(tf.shape(temporal), [B, T_MAX, M])

# This array stores the n predictions.
predictions = tf.TensorArray(tf.float32, size=n)

# We use 1-indexing for simplicity. Time step t represents the t-th flood
# prediction.
for t in range(1, n + 1):
def loop_body(t, spatiotemporal, predictions):
input = FloodModel.Input(
geospatial=geospatial,
temporal=self._get_temporal_window(temporal, t, N),
spatiotemporal=spatiotemporal,
n=n
)
prediction = self.call(input)
prediction = self.call_v1(input)
predictions = predictions.write(t - 1, prediction)

# Append new predictions along time axis, drop the first.

spatiotemporal = tf.concat(
[spatiotemporal, tf.expand_dims(prediction, axis=1)], axis=1
)[:, 1:]
[spatiotemporal[:, 1:], tf.expand_dims(prediction, axis=1)],
axis=1
)
return t + 1, spatiotemporal, predictions

_, _, predictions = tf.while_loop(
lambda t, *_: tf.less_equal(t, n),
loop_body,
[tf.constant(1), spatiotemporal, tf.TensorArray(tf.float32, size=n)],
maximum_iterations=n
)

# Gather dense tensor out of TensorArray along the time axis.
predictions = tf.stack(tf.unstack(predictions.stack()), axis=1)
# Drop channels dimension.
return tf.squeeze(predictions, axis=-1)
predictions = predictions.stack()
predictions = tf.transpose(predictions, perm=[1, 0, 2, 3, 4])
predictions = tf.squeeze(predictions, axis=-1)
return predictions


@staticmethod
def _get_temporal_window(temporal: tf.Tensor, t: int, n: int) -> tf.Tensor:
Expand All @@ -413,11 +475,28 @@ def _get_temporal_window(temporal: tf.Tensor, t: int, n: int) -> tf.Tensor:
Returns:
Returns a zero-padded n-sized window at timestep t of shape (B, n, M)
"""
B, _, M = temporal.shape
return tf.concat(
B = tf.shape(temporal)[0]
T_MAX = tf.shape(temporal)[1]
M = tf.shape(temporal)[2]

zero_padding_shape = (B, tf.maximum(n - t, 0), M)
temporal_slice_start = tf.maximum(t - n, 0)
temporal_slice_end = tf.minimum(t, T_MAX)
temporal_slice = temporal[:, temporal_slice_start:temporal_slice_end]

# Debug prints to inspect shapes and values
# print("Zero padding shape:", zero_padding_shape)
# print("Temporal slice start:", temporal_slice_start)
# print("Temporal slice end:", temporal_slice_end)
# print("Temporal slice shape:", temporal_slice.shape)
# print("Temporal slice values:", temporal_slice)

result = tf.concat(
[
tf.zeros(shape=(B, tf.maximum(n - t, 0), M)),
temporal[:, tf.maximum(t - n, 0) : t],
tf.zeros(shape=zero_padding_shape),
temporal_slice,
],
axis=1,
)
return result

Loading