Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Work in Progress]: Multi process #293

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Imports:
forecast,
httr,
jsonlite,
parallel,
portalr,
runjags,
scoringRules,
Expand All @@ -43,6 +44,6 @@ Suggests:
markdown,
rmarkdown,
testthat
RoxygenNote: 7.2.0
RoxygenNote: 7.2.1
VignetteBuilder: knitr
Encoding: UTF-8
10 changes: 9 additions & 1 deletion NAMESPACE
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Generated by roxygen2: do not edit by hand

export("library(portalcasting)")
export(AutoArima)
export(ESSS)
export(NMME_urls)
Expand Down Expand Up @@ -29,7 +30,6 @@ export(evaluate_cast)
export(evaluate_casts)
export(file_ext)
export(fill_casts)
export(fill_data)
export(fill_dir)
export(fill_fits)
export(fill_models)
Expand Down Expand Up @@ -133,6 +133,14 @@ importFrom(jsonlite,fromJSON)
importFrom(jsonlite,serializeJSON)
importFrom(jsonlite,unserializeJSON)
importFrom(jsonlite,write_json)
importFrom(parallel,clusterExport)
importFrom(parallel,detectCores)
importFrom(parallel,makeCluster)
importFrom(parallel,mccollect)
importFrom(parallel,mclapply)
importFrom(parallel,mcparallel)
importFrom(parallel,parLapply)
importFrom(parallel,stopCluster)
importFrom(portalr,download_observations)
importFrom(portalr,get_future_moons)
importFrom(portalr,load_trapping_data)
Expand Down
101 changes: 88 additions & 13 deletions R/fill_dir.R
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#'
#' @param settings \code{list} of controls for the directory, with defaults set in \code{\link{directory_settings}}.
#'
#' @param multiprocess \code{character} (or \code{logical}) configuration for mulit-processing, can be any value from \code{unix}, \code{windows}, \code{TRUE}, \code{FALSE}. Default value is \code{FALSE}.
#'
#' @param quiet \code{logical} indicator if progress messages should be quieted.
#'
#' @param verbose \code{logical} indicator of whether or not to print out all of the information (and thus just the tidy messages).
Expand Down Expand Up @@ -52,11 +54,16 @@ fill_dir <- function (main = ".",
models = prefab_models(),
datasets = prefab_datasets(),
settings = directory_settings(),
multiprocess = FALSE,
quiet = FALSE,
verbose = FALSE) {

messageq("Filling directory with content: \n", quiet = quiet)

if (multiprocess == TRUE) {
multiprocess <- .Platform$OS.type
}

fill_raw(main = main,
settings = settings,
quiet = quiet,
Expand All @@ -76,6 +83,7 @@ fill_dir <- function (main = ".",
datasets = datasets,
models = models,
settings = settings,
multiprocess = multiprocess,
quiet = quiet,
verbose = verbose)

Expand All @@ -101,44 +109,111 @@ fill_dir <- function (main = ".",
#' @rdname directory-filling
#'
#' @export
#'
fill_data <- function (main = ".",
models = prefab_models(),
datasets = prefab_datasets(),
settings = directory_settings(),
quiet = FALSE,
verbose = FALSE) {
#'library(portalcasting)
fill_data <- function (main = ".",
models = prefab_models(),
datasets = prefab_datasets(),
settings = directory_settings(),
multiprocess = FALSE,
quiet = FALSE,
verbose = FALSE) {

messageq(" Writing data files ... ", quiet = quiet)

write_dataset_controls(main = main,
if (multiprocess == TRUE) {
multiprocess <- .Platform$OS.type
}

write_data_set_controls_f <- function() {

write_dataset_controls(main = main,
settings = settings,
datasets = datasets,
datasets = datasets,
multiprocess = multiprocess,
quiet = FALSE)

prep_rodents(main = main,
}

prep_rodents_f <- function() {

prep_rodents(main = main,
settings = settings,
datasets = datasets,
quiet = quiet,
verbose = verbose)

prep_moons(main = main,
}

prep_moons_f <- function() {

prep_moons(main = main,
settings = settings,
quiet = quiet,
verbose = verbose)

prep_covariates(main = main,
}

prep_covariates_f <- function() {

prep_covariates(main = main,
settings = settings,
quiet = quiet,
verbose = verbose)

}

prep_metadata(main = main,
prep_metadata_f <- function() {

prep_metadata(main = main,
datasets = datasets,
models = models,
settings = settings,
quiet = quiet,
verbose = verbose)

}

if(multiprocess == 'unix') {

write_data_set_controls_mc <- mcparallel(write_data_set_controls_f())
prep_rodents_mc <- mcparallel(prep_rodents_f())
prep_moons_f_mc <- mcparallel(prep_moons_f())
prep_covariates_mc <- mcparallel(prep_covariates_f())
prep_metadata_mc <- mcparallel(prep_metadata_f())
mccollect(list(
write_data_set_controls_mc,
prep_rodents_mc,
prep_moons_f_mc,
prep_covariates_mc,
prep_metadata_mc
))

} else if (multiprocess == 'windows') {

clusters <- makeCluster(detectCores() - 1, outfile = "")

clusterExport(cl=clusters, varlist=c('main', 'datasets', 'models', 'settings', 'quiet', 'verbose', 'multiprocess'), envir=environment())

parLapply(clusters, list(
write_data_set_controls_f,
prep_rodents_f,
prep_moons_f,
prep_covariates_f,
prep_metadata_f
), function(prep_function) {
prep_function()
})

stopCluster(clusters)

} else {
write_data_set_controls_f()
prep_rodents_f()
prep_moons_f()
prep_covariates_f()
prep_metadata_f()
}

messageq(" ... data preparing complete.", quiet = quiet)

invisible()
Expand Down
114 changes: 80 additions & 34 deletions R/portalcast.R
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#'
#' @param datasets \code{character} vector of dataset names to be created.
#'
#' @param multiprocess \code{character} (or \code{logical}) configuration for mulit-processing, can be any value from \code{unix}, \code{windows}, \code{TRUE}, \code{FALSE}. Default value is \code{FALSE}.
#'
#' @param quiet \code{logical} indicator if progress messages should be quieted.
#'
#' @param verbose \code{logical} indicator of whether or not to print out all of the information or not (and thus just the tidy messages).
Expand All @@ -36,15 +38,16 @@
#'
#' @export
#'
portalcast <- function (main = ".",
models = prefab_models(),
datasets = prefab_datasets(),
end_moons = NULL,
start_moon = 217,
cast_date = Sys.Date(),
settings = directory_settings(),
quiet = FALSE,
verbose = FALSE){
portalcast <- function (main = ".",
models = prefab_models(),
datasets = prefab_datasets(),
end_moons = NULL,
start_moon = 217,
cast_date = Sys.Date(),
settings = directory_settings(),
multiprocess = FALSE,
quiet = FALSE,
verbose = FALSE){

#
# the datasets here should come from the models selected
Expand All @@ -55,6 +58,20 @@ portalcast <- function (main = ".",

messageq(message_break(), "\nPreparing directory for casting\n", message_break(), "\nThis is portalcasting v", packageDescription("portalcasting", fields = "Version"), "\n", message_break(), quiet = quiet)

systemType <- .Platform$OS.type

if (multiprocess != TRUE) {
systemType <- multiprocess
}
if (multiprocess != FALSE) {
messageq(message_break(), "Running on ", systemType, " System", message_break())
if(systemType == "unix") {
messageq("Will be using Forking approach for parallel processing.", message_break())
} else {
messageq("Will be using Socket approach for parallel processing.", message_break())
}
}

moons <- read_moons(main = main,
settings = settings)

Expand All @@ -63,7 +80,7 @@ portalcast <- function (main = ".",
end_moons <- ifnull(end_moons, last_moon)
nend_moons <- length(end_moons)

for (i in 1:nend_moons) {
cast_f <- function(i) {

cast(main = main,
datasets = datasets,
Expand All @@ -72,21 +89,30 @@ portalcast <- function (main = ".",
start_moon = start_moon,
cast_date = cast_date,
settings = settings,
multiprocess = systemType,
quiet = quiet,
verbose = verbose)

}

if(multiprocess != FALSE) {
#This is breaking as of now since it just overwrites preparatory data.
mclapply(1:nend_moons, cast_f, mc.cores = detectCores())
} else {
lapply(1:nend_moons, cast_f)
}

if (end_moons[nend_moons] != last_moon) {
# this maybe should happen within cast?
messageq(message_break(), "\nResetting data to most up-to-date versions\n", message_break(), quiet = quiet)

fill_data(main = main,
datasets = datasets,
models = models,
settings = settings,
quiet = quiet,
verbose = verbose)
fill_data(main = main,
datasets = datasets,
models = models,
settings = settings,
multiprocess = systemType,
quiet = quiet,
verbose = verbose)

}

Expand All @@ -107,27 +133,15 @@ cast <- function (main = ".",
start_moon = 217,
cast_date = Sys.Date(),
settings = directory_settings(),
multiprocess = FALSE,
quiet = FALSE,
verbose = FALSE) {

moons <- read_moons(main = main,
settings = settings)

which_last_moon <- max(which(moons$newmoondate < cast_date))
last_moon <- moons$newmoonnumber[which_last_moon]
end_moon <- ifnull(end_moon, last_moon)

messageq(message_break(), "\nReadying data for forecast origin newmoon ", end_moon, "\n", message_break(), quiet = quiet)

if (end_moon != last_moon) {

fill_data(main = main,
datasets = datasets,
models = models,
settings = settings,
quiet = quiet,
verbose = verbose)


if (multiprocess == TRUE) {
multiprocess <- .Platform$OS.type
}

messageq(message_break(), "\nRunning models for forecast origin newmoon ", end_moon, "\n", message_break(), quiet = quiet)
Expand All @@ -138,13 +152,13 @@ cast <- function (main = ".",

nmodels <- length(models)

for (i in 1:nmodels) {
model_f <- function(i) {

model <- models_scripts[i]

messageq(message_break(), "\n -Running ", path_no_ext(basename(model)), "\n", message_break(), quiet = quiet)

run_status <- tryCatch(expr = source(model),
run_status <- tryCatch(expr = source(model, local=TRUE),
error = function(x){NA})

if (all(is.na(run_status))) {
Expand All @@ -159,6 +173,38 @@ cast <- function (main = ".",

}

if(multiprocess == 'unix') {

models_list <- lapply(1:nmodels, function(i) {
mcparallel(model_f(i))
})

mccollect(models_list)


} else if (multiprocess == 'windows') {

clusters <- makeCluster(detectCores() - 1, outfile = "")

clusterExport(cl=clusters, varlist=c('models_scripts'), envir=environment())

parLapply(clusters, 1:nmodels, function(i) {
model_f(i)
})

stopCluster(clusters)

} else {

for (i in 1:nmodels) {

model_f(i)

}

}


invisible()

}
Expand Down
Loading