Skip to content

Commit

Permalink
Numerous bug fixes in data pipeline. Switch to better AWS management …
Browse files Browse the repository at this point in the history
…of modis_ndvi_transformed. Remove requirement to store raw files which aren't needed after transformation and take up a huge amount of space.
  • Loading branch information
n8layman committed Sep 8, 2024
1 parent 81c70d0 commit 29272a0
Show file tree
Hide file tree
Showing 13 changed files with 6,530 additions and 629 deletions.
79 changes: 79 additions & 0 deletions R/AWS_fetch_folder.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Function to download files from S3 only if they don't exist locally, handling pagination
AWS_fetch_folder <- function(local_folder) {

# Check if AWS credentials and region are set in the environment
if (any(Sys.getenv(c("AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_REGION")) == "")) {
msg <- paste(
"AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_REGION environment variables",
"must all be set to access AWS. Please ensure they are configured correctly,",
"probably in the .env file or system environment."
)
stop(msg)
}

# Create an S3 client
s3 <- paws::s3()

# List all objects in the S3 bucket, handling pagination
s3_files <- c() # Initialize an empty list to hold all files
continuation_token <- NULL

repeat {
response <- s3$list_objects_v2(
Bucket = Sys.getenv("AWS_BUCKET_ID"),
Prefix = local_folder,
ContinuationToken = continuation_token)

# Append the files from this response to the main list
s3_files <- c(s3_files, map_vec(response$Contents, ~.x$Key))

# Check if there's a continuation token for further pages
if (!length(response$NextContinuationToken)) break # No more pages to fetch, exit the loop
continuation_token <- response$NextContinuationToken
}

# Check if S3 has files to download
if (length(s3_files) == 0) {
cat("No files found in the specified S3 bucket and prefix.\n")
return(NULL)
}

# List local files in your folder
local_files <- list.files(local_folder, recursive = TRUE, full.names = TRUE)
downloaded_files <- c()

# Loop through S3 files and download if they don't exist locally
for (file in s3_files) {

# Check if file already exists locally
if (!file %in% local_files) {

# Download the file from S3
s3_download <- s3$get_object(
Bucket = Sys.getenv("AWS_BUCKET_ID"),
Key = file)

# Write output to file
writeBin(s3_download$Body, con = file)

cat("Downloaded:", file, "\n")

# Create an error safe way to test if the parquet file can be read
error_safe_read_parquet <- possibly(arrow::read_parquet, NULL)

# Check if transformed file can be loaded.
# If not clean it up and also remove it from AWS
# It'll be picked up next time.
if(is.null(error_safe_read_parquet(file))) {
unlist(file)
s3$delete_object(
Bucket = Sys.getenv("AWS_BUCKET_ID"),
Key = file)
} else {
downloaded_files <- c(downloaded_files, file)
}
}
}

downloaded_files
}
5 changes: 4 additions & 1 deletion R/download_ecmwf_forecasts.R
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ download_ecmwf_forecasts <- function(ecmwf_forecasts_api_parameters,

wf_set_key(user = Sys.getenv("ECMWF_USERID"), key = Sys.getenv("ECMWF_TOKEN"), service = "cds")

safely(wf_request(user = Sys.getenv("ECMWF_USERID"), request = request, transfer = TRUE, path = download_directory))
wf_request(user = Sys.getenv("ECMWF_USERID"),
request = request,
transfer = TRUE,
path = download_directory)

return(file.path(download_directory, filename))
}
4 changes: 2 additions & 2 deletions R/preprocess_nasa_weather.R
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ preprocess_nasa_weather <- function(nasa_weather_downloaded,

nasa_weather_raw_directory <- unique(dirname(nasa_weather_downloaded))

open_dataset(nasa_weather_raw_directory) |>
arrow::open_dataset(nasa_weather_raw_directory) |>
distinct() |>
rename_all(tolower) |>
rename(relative_humidity = rh2m, temperature = t2m, precipitation= prectotcorr,
month = mm, day = dd, x = lon, y = lat, day_of_year = doy) |>
mutate(across(c(year, month, day, day_of_year), as.integer)) |>
mutate(date = lubridate::make_date(year, month, day)) |>
select(x, y, everything(), -yyyymmdd) |> # terra::rast - the first with x (or longitude) and the second with y (or latitude) coordinates
group_by(year) |>
group_by(year) |> # This group by is used to create the folder structure by arrow.
write_dataset(nasa_weather_pre_transformed_directory)

return(list.files(nasa_weather_pre_transformed_directory, full.names = TRUE, recursive = TRUE))
Expand Down
62 changes: 15 additions & 47 deletions R/set_ecmwf_api_parameter.R
Original file line number Diff line number Diff line change
Expand Up @@ -12,63 +12,31 @@ set_ecmwf_api_parameter <- function(years = 2005:2018,
variables = c("2m_dewpoint_temperature", "2m_temperature", "total_precipitation"),
product_types = c("monthly_mean", "monthly_maximum", "monthly_minimum", "monthly_standard_deviation"),
leadtime_months = c("1", "2", "3", "4", "5", "6")) {

# System 4 covers just sept/oct 2017
sys4 <- tibble(system = 4, year = list(2017), month = list(9:10))

# System 51 covers nov 2022 through present
# Setup to download just 2022 first (download fails when combining end of 2022 with beginning of 2023)
sys51_2022 <- tibble(system = 51, year = list(2022), month = list(11:12))

# Now get 2023 onward
sys51_dates <- seq(ymd("2023-01-01"), Sys.Date(), by = "month")

## Real-time forecasts are released once per month on the 13th at 12UTC
## Check if new forecast has been released this month
current_time <- as.POSIXlt(Sys.time(), tz = "UTC")
current_year <- year(current_time)
current_month <- month(current_time)
update_date <- ymd_hms(paste0(current_year,"-", current_month, "-13 12:00:00"))
if(current_time < update_date) sys51_dates <- sys51_dates[-length(sys51_dates)]

## Split into batches of 5 years for download limits (only will be applicable in 2027)
sys51_years <- unique(year(sys51_dates))
if(length(sys51_years) > 5){
sys51_years <- split(sys51_years, ceiling(sys51_years/5))
}else{
sys51_years <- list(sys51_years)
}
sys51 <- tibble(system = 51,
year = sys51_years,
month = list(unique(month(sys51_dates)))) |>
bind_rows(sys51_2022)

# System 5 covers everything else.
sys5_years <- 2005:2022
sys5 <- tibble(system = 5, year = split(sys5_years, ceiling(sys5_years/ 5)), month = list(1:12))

# Tibble to interate over rowwise for download
seasonal_forecast_parameters <- bind_rows(sys4, sys51, sys5)


# Setup spatial bounds
# N, W, S, E
spatial_bounds <- c("N" = unname(bbox_coords["ymax"]),
"W" = unname(bbox_coords["xmin"]),
"S" = unname(bbox_coords["ymin"]),
"E" = unname(bbox_coords["xmax"]))

# Add all other params
seasonal_forecast_parameters <- seasonal_forecast_parameters |>
# Up till last month.
dates <- seq.Date(ymd("2017-9-01"), floor_date(today() - months(1), "month"), by = "month")

seasonal_forecast_parameters <- tibble(year = year(dates),
month = month(dates)) |>
filter(year > 2017 | month < 11) |> # 2017 is strange
mutate(system = case_when( # Case when is sequential. The first match wins
year > 2021 ~ 51,
year > 2017 ~ 5,
year >= 2017 ~ 4)) |>
group_by(system, year) |>
summarize(month = list(month), .groups = "drop")

seasonal_forecast_parameters |>
mutate(spatial_bounds = list(spatial_bounds)) |>
mutate(variables = list(variables)) |>
mutate(product_types = list(product_types)) |>
mutate(leadtime_months = list(leadtime_months))


# will be easier to split by year
seasonal_forecast_parameters <- seasonal_forecast_parameters |>
unnest(year)

return(seasonal_forecast_parameters)

}
87 changes: 57 additions & 30 deletions R/transform_modis_ndvi.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,50 +3,77 @@
#' .. content for \details{} ..
#'
#' @title
#' @param modis_ndvi_downloaded_subset
#' @param modis_ndvi_token
#' @param modis_ndvi_bundle_request
#' @param continent_raster_template
#' @param modis_ndvi_transformed_directory
#' @param overwrite
#' @param local_folder
#' @return
#' @author Emma Mendelsohn
#' @author Nathan Layman
#' @export
transform_modis_ndvi <- function(modis_ndvi_downloaded_subset,
transform_modis_ndvi <- function(modis_ndvi_token,
modis_ndvi_bundle_request,
continent_raster_template,
modis_ndvi_transformed_directory,
overwrite = FALSE) {
local_folder) {

# Figure out raw file name and path
raw_file <- file.path(local_folder, basename(modis_ndvi_bundle_request$file_name))

# Extract start and end dates from the raw downloaded file name
filename <- basename(modis_ndvi_downloaded_subset)
year_doy <- sub(".*doy(\\d+).*", "\\1", filename)
year_doy <- sub(".*doy(\\d+).*", "\\1", basename(raw_file))
start_date <- as.Date(year_doy, format = "%Y%j") # confirmed this is start date through manual download tests

# Set filename for saving
save_filename <- glue::glue("transformed_modis_NDVI_{start_date}.gz.parquet")
message(paste0("Transforming ", save_filename))

# MOD13A2.061__1_km_16_days_NDVI_doy2004353_aid0001.tif' not recognized as a supported file format

# Check if file already exists
existing_files <- list.files(modis_ndvi_transformed_directory)
if(save_filename %in% existing_files & !overwrite){
message("file already exists, skipping transform")
return(file.path(modis_ndvi_transformed_directory, save_filename))

# Set transformed file name and path for saving
transformed_file <- file.path(local_folder, glue::glue("transformed_modis_NDVI_{start_date}.gz.parquet"))

# Create an error safe way to test if the parquet file can be read, if it exists
error_safe_read_parquet <- possibly(arrow::read_parquet, NULL)

# Check if transformed file already exists and can be loaded. If so return file name and path
if(!is.null(error_safe_read_parquet(transformed_file))) return(transformed_file)

# If not download temporary file
message(paste0("Downloading ", raw_file))

task_id <- unique(modis_ndvi_bundle_request$task_id)
file_id <- modis_ndvi_bundle_request$file_id

# Write the file to disk
response <- httr::GET(paste("https://appeears.earthdatacloud.nasa.gov/api/bundle/", task_id, '/', file_id, sep = ""),
httr::write_disk(raw_file, overwrite = TRUE), httr::progress(), httr::add_headers(Authorization = modis_ndvi_token))



# Verify rast can open the saved raster file. If not return NULL
error_safe_read_rast <- possibly(terra::rast, NULL)
raw_raster = error_safe_read_rast(raw_file)

if(is.null(raw_raster)) {
file.remove(raw_file)
return(NULL)
}

# Transform with template raster
transformed_raster <- transform_raster(raw_raster = rast(modis_ndvi_downloaded_subset),
template = rast(continent_raster_template))
# If it can transform the rast then delete the raw file
message(paste0("Transforming ", transformed_file))

# Convert to dataframe
dat_out <- as.data.frame(transformed_raster, xy = TRUE) |>
transformed_raster <- transform_raster(raw_raster = raw_raster,
template = rast(continent_raster_template)) |>
as.data.frame(transformed_raster, xy = TRUE) |>
as_tibble() |>
rename(ndvi = 3) |>
mutate(start_date = start_date)

# Save as parquet
write_parquet(dat_out, here::here(modis_ndvi_transformed_directory, save_filename), compression = "gzip", compression_level = 5)
# Save transformed rast as parquet
arrow::write_parquet(transformed_raster, here::here(modis_ndvi_transformed_directory, save_filename), compression = "gzip", compression_level = 5)

return(file.path(modis_ndvi_transformed_directory, save_filename))
# Clean up raw file
file.remove(raw_file)

# Test if parquet file can be loaded. If not clean up and return NULL
if(is.null(error_safe_read_parquet(transformed_file))) {
file.remove(transformed_file)
return(NULL)
}

}
# If it can be loaded remove return filename and path of transformed raster
return(transformed_file)
}
1 change: 0 additions & 1 deletion R/validate_forecasts_anomalies.R
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ validate_forecasts_anomalies <- function(forecasts_validate_directory,
!!paste0("anomaly_precipitation_scaled_recorded_", end) := (lead_precipitation_mean - historical_precipitation_mean)/historical_precipitation_sd) |>
select(-starts_with("lead"), -starts_with("historical"))


# Now calculate difference forecast v recorded
forecasts_anomalies |>
select(x, y, ends_with(as.character(end))) |>
Expand Down
Loading

0 comments on commit 29272a0

Please sign in to comment.