Skip to content

Commit

Permalink
set up branched workflow and include dynamic lags
Browse files Browse the repository at this point in the history
  • Loading branch information
emmamendelsohn committed Sep 25, 2023
1 parent bfb8a44 commit 8f5f378
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 42 deletions.
86 changes: 56 additions & 30 deletions R/process_weather_data.R
Original file line number Diff line number Diff line change
Expand Up @@ -8,57 +8,83 @@
#' @return
#' @author Emma Mendelsohn
#' @export
process_weather_data <- function(nasa_weather_directory_dataset, nasa_weather_dataset, model_dates) {
process_weather_data <- function(nasa_weather_dataset, # enforce dependency
nasa_weather_directory_dataset,
nasa_weather_anomalies_directory_dataset,
model_dates,
model_dates_selected,
lag_intervals,
overwrite = FALSE) {

weather_dataset <- open_dataset(nasa_weather_directory_dataset) |> to_duckdb(table_name = "weather")
date_selected <- model_dates_selected
save_filename <- glue::glue("{date_selected}.gz.parquet")

existing_files <- list.files(nasa_weather_anomalies_directory_dataset)

message(paste0("Calculating anomalies for ", date_selected))

if(save_filename %in% existing_files & !overwrite) {
message("file already exists, skipping download")
return(file.path(nasa_weather_anomalies_directory_dataset, save_filename))
}

weather_dataset <- open_dataset(nasa_weather_directory_dataset) #|> to_duckdb(table_name = "weather")

# TODO this could go into create_nasa_weather_dataset() to avoid repeating it on each branch
weather_dataset <- weather_dataset |>
mutate(across(c(year, month, day, day_of_year), as.integer)) |>
mutate(year_day_of_year = paste(year, day_of_year, sep = "_")) |>
mutate(date = lubridate::make_date(year, month, day)) |>
select(x, y, date, year, month, day, day_of_year, year_day_of_year, relative_humidity, temperature, precipitation)

# generate the weather dataset - get the lagged anomolies for selected dates
# TODO: do this for each lag internal
outt <- map(model_dates$date[model_dates$select_date], function(date_selected){

row_select <- which(model_dates$date == date_selected)
lag_dates <- model_dates |> slice((row_select - 30):(row_select - 1))
# map over the lag intervals
row_select <- which(model_dates$date == date_selected)

lag_intervals_start <- c(1 , 1+lag_intervals[-length(lag_intervals)])
lag_intervals_end <- lag_intervals

anomalies <- map2(lag_intervals_start, lag_intervals_end, function(start, end){
lag_dates <- model_dates |> slice((row_select - start):(row_select - end))

# lag: calculate mean by pixel for the preceeding 30 days
# lag: calculate mean by pixel for the preceding x days
lagged_means <- weather_dataset |>
filter(date %in% !!lag_dates$date) |>
group_by(x, y) |>
summarize(lag_relative_humidity = mean(relative_humidity),
lag_temperature = mean(temperature),
lag_precipitation = mean(precipitation)) |>
summarize(!!paste0("lag_relative_humidity_", end) := mean(relative_humidity),
!!paste0("lag_temperature_", end) := mean(temperature),
!!paste0("lag_precipitation_", end) := mean(precipitation)) |>
ungroup()

# overall: calculate mean across the full dataset for the days of the year covered by the lag period
# note when 366 is included, we'll have less overall data going into the mean. This is okay since it's one of 30 values
# historical: calculate mean across the full dataset for the days of the year covered by the lag period
# note when 366 is included, we'll have less historical data going into the mean. This is okay since it's one of 30 values
# the same this would happen if we did this by date (we'd have sparse data for feb-29)
# it would be avoided if we did weighted monthly means
overall_means <- weather_dataset |>
historical_means <- weather_dataset |>
filter(day_of_year %in% !!lag_dates$day_of_year ) |>
group_by(x, y) |>
summarize(overall_relative_humidity = mean(relative_humidity),
overall_temperature = mean(temperature),
overall_precipitation = mean(precipitation)) |>
summarize(!!paste0("historical_relative_humidity_", end) := mean(relative_humidity),
!!paste0("historical_temperature_", end) := mean(temperature),
!!paste0("historical_precipitation_", end) := mean(precipitation)) |>
ungroup()

# anomaly
anomolies <- full_join(lagged_means, overall_means, by = c("x", "y")) |>
mutate(anomaly_relative_humidity = lag_relative_humidity - overall_relative_humidity,
anomaly_temperature = lag_temperature - overall_temperature,
anomaly_precipitation = lag_precipitation - overall_precipitation)

# get selected day info and pull in all calculated data
select_day_data <- weather_dataset |>
filter(date == !!date_selected) |>
full_join(anomolies, by = c("x", "y"))

return(select_day_data)

})
full_join(lagged_means, historical_means, by = c("x", "y")) |>
mutate(!!paste0("anomaly_relative_humidity_", end) := !!sym(paste0("lag_relative_humidity_", end)) - !!sym(paste0("historical_relative_humidity_", end)),
!!paste0("anomaly_temperature_", end) := !!sym(paste0("lag_temperature_", end)) - !!sym(paste0("historical_temperature_", end)),
!!paste0("anomaly_precipitation_", end) := !!sym(paste0("lag_precipitation_", end)) - !!sym(paste0("historical_precipitation_", end)))
}) |>
reduce(left_join, by = c("x", "y"))

# get selected day info and pull in all calculated data
date_selected_all_dat <- weather_dataset |>
filter(date == !!date_selected) |>
full_join(anomalies, by = c("x", "y"))

# Save as parquet
write_dataset(date_selected_all_dat, here::here(nasa_weather_anomalies_directory_dataset, save_filename), compression = "gzip", compression_level = 5)

return(file.path(nasa_weather_anomalies_directory_dataset, save_filename))


}
21 changes: 15 additions & 6 deletions _targets.R
Original file line number Diff line number Diff line change
Expand Up @@ -288,15 +288,24 @@ data_targets <- tar_plan(

tar_target(lag_intervals, c(30, 60, 90)),
tar_target(model_dates, set_model_dates(start_year = 2005, end_year = 2022, n_per_month = 2, lag_intervals, seed = 212)),
tar_target(model_dates_selected, model_dates |> filter(select_date) |> pull(date)),

tar_target(nasa_weather_anomalies_directory,
create_data_directory(directory_path = "data/nasa_weather_anomalies")),

tar_target(nasa_weather_anomalies_directory_dataset,
create_data_directory(directory_path = "data/nasa_weather_anomalies_dataset")),

# TODO take nasa_weather_directory_dataset and do full lag calcs in this function using duckdb, then collect into memory
tar_target(weather_data, process_weather_data(nasa_weather_directory_dataset,
nasa_weather_dataset, # enforce dependency
model_dates)),
tar_target(weather_data, process_weather_data(nasa_weather_dataset, # enforce dependency
nasa_weather_directory_dataset,
nasa_weather_anomalies_directory_dataset,
model_dates,
model_dates_selected,
lag_intervals,
overwrite = FALSE),
pattern = head(model_dates_selected, 2),
format = "file",
repository = "local",
cue = tar_cue("thorough")),

# tar_target(ndvi_data, process_ndvi_data(sentinel_ndvi_directory_dataset, sentinel_ndvi_dataset, model_dates_random_select))

)
Expand Down
15 changes: 9 additions & 6 deletions _targets/meta/meta
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
name|type|data|command|depend|seed|path|time|size|bytes|format|repository|iteration|parent|children|seconds|warnings|error
.Random.seed|object|81bdf40acce9cae3|||||||||||||||
.Random.seed|object|6f06c0f055e30ee3|||||||||||||||
all_targets|function|2dda5afbd1f92385|||||||||||||||
aws_bucket|object|d9cf2c5ff7cc1be4|||||||||||||||
cache_aws_branched_target|function|6e2abfa4969de1bf|||||||||||||||
continent_bounding_box|stem|cd4a4ddaa46d634a|bc00ded6218f3e10|d25ebcbef292030f|-1551480776|bucket=open-rvfcast-data*region=NULL*key=_targets/continent_bounding_box*endpoint=TlVMTA*version=|t19517.6854396s||650|qs|aws|vector|||0.001||
continent_polygon|stem|ab9fc6665b7f4674|8fdc308f6950b209|8fda1350e825d6f0|-1018833134|bucket=open-rvfcast-data*region=NULL*key=_targets/continent_polygon*endpoint=TlVMTA*version=|t19517.6854365748s||40351|qs|aws|vector|||0.046||
continent_raster_template|stem|f7447f449dab6127|59215772b6da1296|29b10148e1f5cc06|435814239|bucket=open-rvfcast-data*region=NULL*key=_targets/continent_raster_template*endpoint=TlVMTA*version=|t19613.7039914804s||17991|qs|aws|vector|||0.019|readValues raster has no values|
continent_raster_template|stem|f7447f449dab6127|59215772b6da1296|29b10148e1f5cc06|435814239|bucket=open-rvfcast-data*region=NULL*key=_targets/continent_raster_template*endpoint=TlVMTA*version=|t19625.8040321503s||17991|qs|aws|vector|||0.023|readValues raster has no values|
continent_raster_template_plot|stem|2bb339e1379dddd2|49ecb64805b051b3|0f190d250b943e48|-1126088679|bucket=open-rvfcast-data*region=NULL*key=_targets/continent_raster_template_plot*endpoint=TlVMTA*version=|t19520.796675583s||30|qs|aws|vector|||0.109|ignoring all but the first attribute|
country_bounding_boxes|stem|f3f68e55b6b77d77|30fec9e0e3d1a974|36097b0df0d06db0|1038934427|bucket=open-rvfcast-data*region=NULL*key=_targets/country_bounding_boxes*endpoint=TlVMTA*version=|t19517.695216638s||6273|qs|aws|vector|||0.024||
country_polygons|stem|ece9c72b7e99dba6|cb7eceb109efaff7|a2cdedccfeb044ec|1321015204|bucket=open-rvfcast-data*region=NULL*key=_targets/country_polygons*endpoint=TlVMTA*version=|t19517.6808684524s||17365|qs|aws|vector|||0.901||
Expand All @@ -18,7 +18,7 @@ create_modis_ndvi_dataset|function|bb9bcd506ae906bd|||||||||||||||
create_nasa_weather_dataset|function|c12b134b7be25c25|||||||||||||||
create_raster_template_plot|function|db738156a3247831|||||||||||||||
create_sentinel_ndvi_dataset|function|201d4eaf8c87d0c3|||||||||||||||
data_targets|object|1cebfd4c6eddf2cb|||||||||||||||
data_targets|object|aad6767c991d4096|||||||||||||||
define_bounding_boxes|function|e614caacc0592e73|||||||||||||||
define_country_regions|function|54808365a1bb460e|||||||||||||||
deploy_targets|object|1eb1bc8d77111ded|||||||||||||||
Expand Down Expand Up @@ -183,6 +183,7 @@ lag_intervals|stem|f4c9e8a4d588925c|6b4f81cd41a7b83e|a3dad144c40657ed|1055089432
make_model_data|function|df0e5631ac6d7b53|||||||||||||||
model_dates|stem|27f5b73d53fcb5e3|7646f39f9ba1810e|a8bb283b4980c84f|-204838086|bucket=open-rvfcast-data*region=NULL*key=_targets/model_dates*endpoint=TlVMTA*version=|t19615.7807563168s||80537|qs|aws|vector|||0.052||
model_dates_random_select|stem|ae6d12f9d280efdc|5719b67ece4afca5|41d39307354de6b6|-368262049|bucket=open-rvfcast-data*region=NULL*key=_targets/model_dates_random_select*endpoint=TlVMTA*version=|t19615.75211742s||76313|qs|aws|vector|||0.04||
model_dates_selected|stem|6c3acddd1dd8ab94|56502ed0c5bd7ce1|5ab6e5886391faa2|-1898424287|bucket=open-rvfcast-data*region=NULL*key=_targets/model_dates_selected*endpoint=TlVMTA*version=|t19625.7811853969s||2910|qs|aws|vector|||0.013||
model_targets|object|1eb1bc8d77111ded|||||||||||||||
modis_directory|stem|0404b408f5e5efef|c985137dd9b95cd4|ef46db3751d8e999|-671711443|bucket=project-dtra-ml-main*region=NULL*key=open-rvfcast/_targets/modis_directory*endpoint=TlVMTA*version=qdLdze87LwJZuMPusz2ovPXxe2rabGWb|t19493.6071375984s||55|qs|aws|vector|||0.001||
modis_ndvi_bundle|stem|error|a6c770fab6751fac|9d56e94e8363274f|-1456098296|bucket=open-rvfcast-data*region=NULL*key=_targets/modis_ndvi_bundle*endpoint=TlVMTA*version=|t19605.5645307177s||30|qs|aws|vector|||0.33||object task_id not found
Expand Down Expand Up @@ -3944,7 +3945,7 @@ preprocess_ecmwf_forecasts|function|033bd8a3c45b4d46|||||||||||||||
preprocess_nasa_weather|function|f5c92fafb420500d|||||||||||||||
preprocess_wahis_rvf_outbreaks|function|1739270cf02b72d6|||||||||||||||
process_ndvi_data|function|8a56ce9bd504bbec|||||||||||||||
process_weather_data|function|4742d21165b14ee6|||||||||||||||
process_weather_data|function|62e3a04160d9cd83|||||||||||||||
random_select_model_dates|function|75d79de28b5c2e87|||||||||||||||
read_transform_raster|function|f7518264efa394ed|||||||||||||||
report_targets|object|1eb1bc8d77111ded|||||||||||||||
Expand Down Expand Up @@ -4979,8 +4980,10 @@ test_targets|object|1eb1bc8d77111ded|||||||||||||||
transform_nasa_weather|function|e80c244fb32ef2bd|||||||||||||||
transform_raster|function|47f20ba2b9ef9722|||||||||||||||
transform_sentinel_ndvi|function|92a19330c7f2bff2|||||||||||||||
user_rprof|object|7e34b2a2c6cda37d|||||||||||||||
user_rprof|object|f5d6e573fd1bc8a3|||||||||||||||
wahis_rvf_outbreaks_preprocessed|stem|30ccd988b415d773|3ea98184b5887c93|275a59d310ff2a63|2127878318|bucket=open-rvfcast-data*region=NULL*key=_targets/wahis_rvf_outbreaks_preprocessed*endpoint=TlVMTA*version=|t19517.6952212142s||172965|qs|aws|vector|||0.043||
wahis_rvf_outbreaks_raw|stem|6fc7e6c7238977b3|b988ec4215d4213c|5ed4661ae3efb1aa|1933416983|bucket=open-rvfcast-data*region=NULL*key=_targets/wahis_rvf_outbreaks_raw*endpoint=TlVMTA*version=|t19517.6952047733s||173410|qs|aws|vector|||29.629||
wahis_rvf_query|function|9836433f6f1061fb|||||||||||||||
weather_data|stem|error|a5b7c71ecc8a6031|3160e0777a812aed|655573160|bucket=open-rvfcast-data*region=NULL*key=_targets/weather_data*endpoint=TlVMTA*version=|t19521.765027018s||30|qs|aws|vector|||0.003||unused argument model_dates_random_select
weather_data|pattern|5d7636ed3ef8bce6|494243a125cbc271||655573160||||29371159|file|local|vector||weather_data_94f732f8*weather_data_5e501efa|323.933||
weather_data_5e501efa|branch|7da64b6f42c158f3|650ca452b036d8a5|c5b75a7d4b91bf19|-504004841|data/nasa_weather_anomalies_dataset/2005-04-28.gz.parquet|t19625.8078600598s|839d3c31da852455|14917125|file|local|vector|weather_data||0||
weather_data_94f732f8|branch|ba91c0e4320203ae|650ca452b036d8a5|c9754ab7a773d644|-562021073|data/nasa_weather_anomalies_dataset/2005-04-14.gz.parquet|t19625.8058865201s|57a671ac1fe41409|14454034|file|local|vector|weather_data||0.024||

0 comments on commit 8f5f378

Please sign in to comment.