diff --git a/.gitignore b/.gitignore index 97b79e4..1258a2b 100644 --- a/.gitignore +++ b/.gitignore @@ -41,5 +41,8 @@ vignettes/*.pdf #mac .DS_Store -#results from backend jobs -/results \ No newline at end of file +# test workspace +/test_workspace + +# jobInfos from backend jobs +/jobs \ No newline at end of file diff --git a/R/api.R b/R/api.R index c46acfb..132e16a 100644 --- a/R/api.R +++ b/R/api.R @@ -165,24 +165,187 @@ NULL job = newJob$run() format = job$output + + #TODO: IMPLEMENT CHECK, IF job$results == datacube + # if Datacube: write in desired format as is + # if data.frame (NetCDF): write whole data.frame as NetCDF + + # TODO: harmonize whole file save process do reduce redundancy + if (class(format) == "list") { if (format$title == "Network Common Data Form") { - file = gdalcubes::write_ncdf(job$results) + + # if no error occurs, job$results == datacube + tryCatch( + { + file = gdalcubes::write_ncdf(job$results) + break + }, + error = function(error) + { + message('An Error Occurred.') + message(toString(error)) + }, + warning = function(warning) { + message('A Warning Occurred') + message(toString(warning)) + }) + # when error then proceed with the "data.frame" variant + tryCatch( + { + + # convert data.frame in "spatial dataframe" + data = sf::st_as_sf(job$results) + + file = base::tempfile() + + # save whole file + sf::st_write(data, file, driver = "netCDF") + + res$status = 200 + res$body = readBin(file, "raw", n = file.info(file)$size) + + content_type = plumber:::getContentType(tools::file_ext(file)) + res$setHeader("Content-Type", content_type) + + return(res) + }, + error = function(error) + { + message('An Error Occurred.') + message(toString(error)) + }, + warning = function(warning) { + message('A Warning Occurred') + message(toString(warning)) + }) } else if (format$title == "GeoTiff") { file = gdalcubes::write_tif(job$results) } + else if (format$title == "R Data Set") + { + # THINK ABOUT PRETTIER SOLUTION + tryCatch( + { + job$results = gdalcubes::as_json(job$results) + message(class(job$results)) + }, + error = function(error) + { + message('An Error Occurred. Passed data was not of type "datacube".') + message(error) + }, + warning = function(warning) { + message('A Warning Occurred') + message(warning) + }, + finally = { + + # perform rest of the result + temp = base::tempfile() + base::saveRDS(job$results, temp) + res$status = 200 + res$body = readBin(temp, "raw", n = file.info(temp)$size) + + content_type = plumber:::getContentType(tools::file_ext(temp)) + res$setHeader("Content-Type", content_type) + + return(res) + }) + } + else { throwError("FormatUnsupported") } } else { if (format == "NetCDF") { - file = gdalcubes::write_ncdf(job$results) + + # if no error occurs, job$results == datacube + tryCatch( + { + file = gdalcubes::write_ncdf(job$results) + break + }, + error = function(error) + { + message('An Error Occurred.') + message(toString(error)) + }, + warning = function(warning) { + message('A Warning Occurred') + message(toString(warning)) + }) + # when error then proceed with the "data.frame" variant + tryCatch( + { + + # convert data.frame in "spatial dataframe" + data = sf::st_as_sf(job$results) + + file = base::tempfile() + + # save whole file + sf::st_write(data, file, driver = "netCDF") + + res$status = 200 + res$body = readBin(file, "raw", n = file.info(file)$size) + + content_type = plumber:::getContentType(tools::file_ext(file)) + res$setHeader("Content-Type", content_type) + + return(res) + }, + error = function(error) + { + message('An Error Occurred.') + message(toString(error)) + }, + warning = function(warning) { + message('A Warning Occurred') + message(toString(warning)) + }) + + } else if (format == "GTiff") { file = gdalcubes::write_tif(job$results) } + + else if (format == "RDS") { + + # THINK ABOUT PRETTIER SOLUTION + tryCatch( + { + job$results = gdalcubes::as_json(job$results) + message(class(job$results)) + }, + error = function(error) + { + message('An Error Occurred. Passed data was not of type "datacube".') + message(error) + }, + warning = function(warning) { + message('A Warning Occurred') + message(warning) + }, + finally = { + + # perform rest of the result + temp = base::tempfile() + base::saveRDS(job$results, temp) + res$status = 200 + res$body = readBin(temp, "raw", n = file.info(temp)$size) + + content_type = plumber:::getContentType(tools::file_ext(temp)) + res$setHeader("Content-Type", content_type) + + return(res) + }) + + } + else { throwError("FormatUnsupported") } @@ -314,5 +477,6 @@ addEndpoint = function() { Session$assignProcess(subtract) Session$assignProcess(multiply) Session$assignProcess(divide) - + Session$assignProcess(train_model) + Session$assignProcess(predict_model) } diff --git a/R/processes.R b/R/processes.R index a8028c1..7bdedba 100644 --- a/R/processes.R +++ b/R/processes.R @@ -124,10 +124,18 @@ load_collection <- Process$new( type = "array" ), optional = TRUE + ), + Parameter$new( + name = "resolution", + description = "Specify resolution for spatial resampling.", + schema = list( + type = "integer" + ), + optional = TRUE ) ), returns = eo_datacube, - operation = function(id, spatial_extent, crs = 4326, temporal_extent, bands = NULL, job) { + operation = function(id, spatial_extent, crs = 4326, temporal_extent, bands = NULL, resolution = 30, job) { # temporal extent preprocess t0 <- temporal_extent[[1]] t1 <- temporal_extent[[2]] @@ -190,7 +198,7 @@ load_collection <- Process$new( crs <- c("EPSG", crs) crs <- paste(crs, collapse = ":") v.overview <- gdalcubes::cube_view( - srs = crs, dx = 30, dy = 30, dt = "P15D", + srs = crs, dx = resolution, dy = resolution, dt = "P15D", aggregation = "median", resampling = "average", extent = list( t0 = t0, t1 = t1, @@ -206,6 +214,9 @@ load_collection <- Process$new( cube <- gdalcubes::select_bands(cube, bands) } + message(gdalcubes::dimensions(cube)) + + message("The data cube is created....") message(gdalcubes::as_json(cube)) return(cube) @@ -1140,3 +1151,519 @@ save_result <- Process$new( return(data) } ) + +# Train ML Model +train_model <- Process$new( + id = "train_model", + description = "Train a machine learning algorithm based on the provided training data on satellite imagery gathered from a datacube.", + categories = as.array("machine-learning", "cubes"), + summary = "train machine learning model.", + parameters = list( + Parameter$new( + name = "data", + description = "A data cube with bands.", + schema = list( + type = "object", + subtype = "raster-cube" + ) + ), + Parameter$new( + name = "model_type", + description = "Type of the model to be trained. Must be one of the following types: RF.", + schema = list( + type = "string" + ), + ), + Parameter$new( + name = "labeled_polygons", + description = "String containing the GeoJSON with Polygons. These contain class labels used to train the model.", + schema = list( + type = "string", + subtype = "GeoJSON" + ) + ), + Parameter$new( + name = "hyperparameters", + description = "List of Hyperparameters used for the model", + schema = list( + type = "list" + ), + optional = TRUE + ), + Parameter$new( + name = "save_model", + description = "Declare wether the computed model should be saved in the user workspace. Defaults to false.", + schema = list( + type = "boolean" + ), + optional = TRUE + ), + Parameter$new( + name = "model_id", + description = "Id under which the model should be stored. Defaults to NULL", + schema = list( + type = "string" + ), + optional = TRUE + ) + + ), + returns = list( + description = "The trained model.", + schema = list(type = "object", subtype = "caret-ml-model") + ), + operation = function(data, model_type, labeled_polygons, hyperparameters = NULL, save_model = FALSE, model_id = NULL, job) + { + # show call stack for debugging + message("train_model called...") + + message("\nCall parameters: ") + message("\ndata: ") + message(gdalcubes::as_json(data)) + message("\nmodel_type: ") + message(model_type) + + tryCatch({ + message("\nlabeled_polygons: ") + + # read GeoJSON data as sf + labeled_polygons = sf::st_read(labeled_polygons, quiet = TRUE) + + # change CRS to cube CRS + labeled_polygons = sf::st_transform(labeled_polygons, crs = gdalcubes::srs(data)) + + message("Training Polygons sucessfully loaded!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + }) + + message("\nhyperparameters: ") + if (is.null(hyperparameters)) + { + message("No Hyperparameters passed!") + } + else + { + for (name in names(hyperparameters)) + { + message(paste0(name, ": ", hyperparameters[name])) + } + } + + message("\nsave_model:") + message(save_model) + + message("\nmodel_id:") + print(model_id) # to also show "NULL" + + + # obvios boolean check for mor readibility + if (save_model == TRUE && is.null(model_id)) + { + message("If the model should be safed, a model_id needs to be given!") + stop("") + } + + tryCatch({ + message("\nExtract features...") + + # extract features from cube + features = gdalcubes::extract_geom(data, labeled_polygons) + + message("all features extracted!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + }) + + # add FID for merge with 'features' + labeled_polygons$FID = rownames(labeled_polygons) + + tryCatch({ + message("\nMerge features with training data...") + + # this df contains all information from the datacube and the labeled_polgons + training_df = merge(labeled_polygons, features, by = "FID") + + message("Merging complete!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + }) + + # make copy to filter out values not needed for training + training_df_filtered = training_df + training_df_filtered$time = NULL + training_df_filtered$geometry = NULL + + + #TODO: find reasonable threshold + if (nrow(training_df) > 10000) + { + tryCatch({ + message("\nReduce number of features...") + # data frame for later storage + training_df_reduced = data.frame() + + # from all data with the same FID (same polygon) take only 50% of the + # features for each training polygon as they are assumed to carry similar information + for (i in as.numeric(unique(training_df_filtered$FID))) + { + #TODO: find better "reducing" function + sub_df = training_df_filtered[training_df_filtered$FID == i,] + + # take 50% of sub_df rows + sub_df = sub_df[1:(nrow(sub_df)/2),] + + # append new rows + training_df_reduced = rbind(training_df_reduced, sub_df) + } + + # overwrite filtered df + training_df_filtered = training_df_reduced + + message("Reducing completed!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + }) + } + + # remove FID to not train model on FID + training_df_filtered$FID = NULL + + + tryCatch({ + message("\nSplit training Data...") + + train_row_numbers = caret::createDataPartition( + training_df_filtered$class, p=0.8, list=FALSE + ) + training_data = training_df_filtered[train_row_numbers,] + testing_data = training_df_filtered[-train_row_numbers,] + + message("Data splitting completed!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + }) + + # build specific model given by "model_type" + if (model_type == "RF") + { + # set seed for reproducibility while model training + #TODO possibly remove + set.seed(100) + + message("\nChecking hyperparameters for Random Forest...") + + if (!all(c("mtry", "ntree") %in% names(hyperparameters))) + { + message("'hyperparameters' has to contain 'mtry' and 'ntree'!") + stop("") + } + + message("hyperparameters for Random Forest checked!") + + # use fixed hyperparams given by the user + # (this may result in a lack of accuracy for the model) + if (!is.null(hyperparameters)) + { + + tryCatch({ + message("\nTrain Model with fixed hyperparameters...") + + # no parameters are tuned + trainCtrl <- caret::trainControl(method = "none", classProbs = TRUE) + + model <- caret::train( + class ~ ., + data = training_data, + method = "rf", + trControl = trainCtrl, + # only one model is passed (fixed hyperparams are given) + tuneGrid = expand.grid(mtry = hyperparameters$mtry), + ntree = hyperparameters$ntree) + + message("Model training finished!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + }) + + } + # else tune model hyperparameters + + } + + # save model to user workspace + if (save_model) + { + tryCatch({ + message("\nSaving model to user workspace...") + + saveRDS(model, paste0(Session$getConfig()$workspace.path, "/", model_id, ".rds")) + + message("Saving complete!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + }) + } + + return(model) + } +) + + +predict_model <- Process$new( + id = "predict_model", + description = "Perform a prediction on a datacube based on the given model.", + categories = as.array("cubes", "machine-learning"), + summary = "Predict data on datacube.", + parameters = list( + Parameter$new( + name = "data", + description = "The data to work with.", + schema = list( + type = "object", + subtype = "raster-cube" + ) + ), + Parameter$new( + name = "model_id", + description = "Id of the model that should be used for prediction. The model will be searches in the user workspace.", + schema = list( + type = "string" + ) + ), + Parameter$new( + name = "aoi_extend", + description = "Spatial extend of the area of interest (aoi) to be used for classification. Has to be in EPSG:3857", + schema = list( + list( + title = "Bounding box", + type = "object", + subtype = "bounding-box", + properties = list( + east = list( + description = "East (upper right corner, coordinate axis 1).", + type = "number" + ), + west = list( + description = "West lower left corner, coordinate axis 1).", + type = "number" + ), + north = list( + description = "North (upper right corner, coordinate axis 2).", + type = "number" + ), + south = list( + description = "South (lower left corner, coordinate axis 2).", + type = "number" + ) + ), + required = c("east", "west", "south", "north") + ) + ) + ) + ), + returns = list( + description = "Spatial data frame containing the geometry, class and class probability for each pixel", + schema = list(type = "data.frame") + ), + operation = function(data, model_id, aoi_extend, job) { + # show call stack for debugging + message("predict_model called...") + + message("\nCall parameters: ") + message("\ndata: ") + message(gdalcubes::as_json(data)) + + message("\nmodel_id:") + message(model_id) + + message("\naoi_extend:") + for (name in names(aoi_extend)) + { + message(paste0(name),": ", aoi_extend[name]) + } + + xmin = aoi_extend$west + ymin = aoi_extend$south + xmax = aoi_extend$east + ymax = aoi_extend$north + + + tryCatch({ + message("\nCreate AOI Polygons...") + + aoi_polygon_df = data.frame(x = c(xmin,xmax), y = c(ymin ,ymax)) + + poly <- aoi_polygon_df |> + # create sf_point object + sf::st_as_sf(coords = c("x", "y"), crs = 3857) |> sf::st_transform(gdalcubes::srs(data)) |> + sf::st_bbox() |> + sf::st_as_sfc() |> + # create sf_polygon object + sf::st_as_sf() + + + # get cube resolution + cube_resolution = gdalcubes::dimensions(data)$x$pixel_size + + # grid to rasterize the aoi polygon + grid = stars::st_as_stars(sf::st_bbox(poly), dx = cube_resolution, dy = cube_resolution) + + # aoi polygon rastern (in ein polygon pro pixel) + aoi_points = poly |> stars::st_rasterize(grid) |> sf::st_as_sf() + + message("AOI Polygons created!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + stop() + }) + + tryCatch({ + message("\nExtract features...") + + # extract features from cube + features = gdalcubes::extract_geom(data, aoi_points) + + # reset FID to prevent mismatch after extraction + features$FID = NULL + features$FID = rownames(features) + + message("All features extracted!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + stop() + }) + + tryCatch({ + message("\nAdd spatial information to data.frame...") + + # FID for later merge + aoi_points$FID = rownames(aoi_points) + + # remove old ID + aoi_points$ID = NULL + + message("Spatial information added!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + stop() + }) + + + tryCatch({ + message("\nMerge data.frame and aoi_points...") + + features = base::merge(features, aoi_points, by = "FID") + + # reset FID to prevent mismatch after merge + features$FID = rownames(features) + + message("Merge completed!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + stop() + }) + + tryCatch({ + message("\nPreparing prediction dataset...") + + # copy features to filter out unwanted data + features_filtered = features + features_filtered$time = NULL + features_filtered$FID = NULL + features_filtered$geometry = NULL + + message("Data preperation finished!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + stop() + }) + + tryCatch({ + message("\nPerform predicition...") + + # get model from user workspace + model = readRDS(paste0(Session$getConfig()$workspace.path, "/", model_id, ".rds")) + + # predict classes + predicted_classes = stats::predict(model, newdata = features_filtered) + + # get class probalilities + prediction_accuracys = stats::predict(model, newdata = features_filtered, type = "prob") + + # get column with only the highest class prob + max_accuracy_per_pixel = apply(prediction_accuracys, 1, base::max) + + message("Prediction completed!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + stop() + }) + + + tryCatch({ + message("\nCreate output dataframe...") + + # create data.frame of same length as features + output_dataframe = base::as.data.frame(base::matrix(NA, + nrow = nrow(features), + ncol = 1, + dimnames = list(c(), "FID"))) + + output_dataframe$FID = features$FID + output_dataframe$class = predicted_classes + output_dataframe$class_accuracys = max_accuracy_per_pixel + output_dataframe$geometry = features$geometry + + message("Output dataframe created!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + stop() + }) + + + return(output_dataframe) + } +) diff --git a/README.md b/README.md index de2a1cb..635cc50 100644 --- a/README.md +++ b/README.md @@ -89,11 +89,12 @@ docker-compose build --no-cache && docker-compose up ``` ## Development Notes: -While developing, you can skip rebuilding the docker container everytime. Instead you can run the server locally. -Just run "Rscript startLocal.R" inside this directory. +While developing, you can skip rebuilding the docker container everytime. Instead you can run the server locally. To run this server locally, you need RTools4.0. For easier setup, please open "openeocubes.Rproj". Here every build tool is already set up and you can just run "Rscript startLocal.R" inside this directory. This will compile this Repository as a R Package and start the server. +The script "statLocal.R" is not intended to be used on an AWS Instance. + ## Getting Started: ### Example 1: NDVI Script in R-Studio using OpenEO R-Client diff --git a/startLocal.R b/startLocal.R index 755f05f..24f22ef 100644 --- a/startLocal.R +++ b/startLocal.R @@ -4,16 +4,8 @@ remotes::install_local("./", dependencies = TRUE, force = TRUE) # Start service library(openeocubes) -aws.host <- Sys.getenv("AWSHOST") - -if (aws.host == "") { - aws.host <- NULL -} else { - message("AWS host port id is: ", aws.host) -} - - -config <- SessionConfig(api.port = 8000, host = "0.0.0.0", aws.ipv4 = aws.host) -config$workspace.path <- "/var/openeo/workspace" +config <- SessionConfig(api.port = 8000, host = "127.0.0.1") +# set workspace for testing +config$workspace.path = paste0(getwd(), "/test_workspace") createSessionInstance(config) Session$startSession()